connection 도 room을 알고 있게 됨
This commit is contained in:
@ -13,14 +13,17 @@ type room struct {
|
|||||||
outChan chan *wsconn
|
outChan chan *wsconn
|
||||||
messageChan chan *UpstreamMessage
|
messageChan chan *UpstreamMessage
|
||||||
name string
|
name string
|
||||||
|
destroyChan chan<- string
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeRoom(name string) *room {
|
// 만약 destroyChan가 nil이면 room이 비어도 파괴되지 않는다. 영구 유지되는 room
|
||||||
|
func makeRoom(name string, destroyChan chan<- string) *room {
|
||||||
return &room{
|
return &room{
|
||||||
inChan: make(chan *wsconn, 10),
|
inChan: make(chan *wsconn, 10),
|
||||||
outChan: make(chan *wsconn, 10),
|
outChan: make(chan *wsconn, 10),
|
||||||
messageChan: make(chan *UpstreamMessage, 100),
|
messageChan: make(chan *UpstreamMessage, 100),
|
||||||
name: name,
|
name: name,
|
||||||
|
destroyChan: destroyChan,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,12 +31,14 @@ func (r *room) broadcast(msg *UpstreamMessage) {
|
|||||||
r.messageChan <- msg
|
r.messageChan <- msg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *room) in(conn *wsconn) {
|
func (r *room) in(conn *wsconn) *room {
|
||||||
r.inChan <- conn
|
r.inChan <- conn
|
||||||
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *room) out(conn *wsconn) {
|
func (r *room) out(conn *wsconn) *room {
|
||||||
r.outChan <- conn
|
r.outChan <- conn
|
||||||
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *room) start(ctx context.Context) {
|
func (r *room) start(ctx context.Context) {
|
||||||
@ -66,6 +71,10 @@ func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd b
|
|||||||
|
|
||||||
case conn := <-r.outChan:
|
case conn := <-r.outChan:
|
||||||
delete((*conns), conn.sender.Accid.Hex())
|
delete((*conns), conn.sender.Accid.Hex())
|
||||||
|
if len(*conns) == 0 && r.destroyChan != nil {
|
||||||
|
r.destroyChan <- r.name
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
case msg := <-r.messageChan:
|
case msg := <-r.messageChan:
|
||||||
ds := DownstreamMessage{
|
ds := DownstreamMessage{
|
||||||
|
|||||||
@ -9,7 +9,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
"repositories.action2quare.com/ayo/gocommon"
|
"repositories.action2quare.com/ayo/gocommon"
|
||||||
@ -24,7 +23,22 @@ var noSessionFlag = flagx.Bool("nosession", false, "nosession=[true|false]")
|
|||||||
|
|
||||||
type wsconn struct {
|
type wsconn struct {
|
||||||
*websocket.Conn
|
*websocket.Conn
|
||||||
sender *Sender
|
sender *Sender
|
||||||
|
joinedRooms []*room
|
||||||
|
}
|
||||||
|
|
||||||
|
func (conn *wsconn) popRoom(r *room) int {
|
||||||
|
for i, jr := range conn.joinedRooms {
|
||||||
|
if jr == r {
|
||||||
|
conn.joinedRooms = append(conn.joinedRooms[:i], conn.joinedRooms[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return len(conn.joinedRooms)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (conn *wsconn) pushRoom(r *room) {
|
||||||
|
conn.joinedRooms = append(conn.joinedRooms, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
type UpstreamMessage struct {
|
type UpstreamMessage struct {
|
||||||
@ -44,9 +58,8 @@ type DownstreamMessage struct {
|
|||||||
type commandType string
|
type commandType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
commandType_JoinRoom = commandType("join_room")
|
commandType_JoinRoom = commandType("join_room")
|
||||||
commandType_LeaveRoom = commandType("leave_room")
|
commandType_LeaveRoom = commandType("leave_room")
|
||||||
commandType_WriteControl = commandType("write_control")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type commandMessage struct {
|
type commandMessage struct {
|
||||||
@ -209,20 +222,6 @@ func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMess
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WebsocketHandler) SendCloseMessage(region string, target string, text string) {
|
|
||||||
sh := ws.authCaches[region]
|
|
||||||
if sh != nil {
|
|
||||||
sh.localDeliveryChan <- &commandMessage{
|
|
||||||
Cmd: commandType_WriteControl,
|
|
||||||
Args: []any{
|
|
||||||
target,
|
|
||||||
int(websocket.CloseMessage),
|
|
||||||
websocket.FormatCloseMessage(websocket.CloseNormalClosure, text),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ws *WebsocketHandler) EnterRoom(region string, room string, accid primitive.ObjectID) {
|
func (ws *WebsocketHandler) EnterRoom(region string, room string, accid primitive.ObjectID) {
|
||||||
sh := ws.authCaches[region]
|
sh := ws.authCaches[region]
|
||||||
if sh != nil {
|
if sh != nil {
|
||||||
@ -290,10 +289,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
|
|||||||
|
|
||||||
entireConns := make(map[string]*wsconn)
|
entireConns := make(map[string]*wsconn)
|
||||||
rooms := make(map[string]*room)
|
rooms := make(map[string]*room)
|
||||||
|
roomDestroyChan := make(chan string, 1000)
|
||||||
findRoom := func(name string, create bool) *room {
|
findRoom := func(name string, create bool) *room {
|
||||||
room := rooms[name]
|
room := rooms[name]
|
||||||
if room == nil && create {
|
if room == nil && create {
|
||||||
room = makeRoom(name)
|
room = makeRoom(name, roomDestroyChan)
|
||||||
rooms[name] = room
|
rooms[name] = room
|
||||||
room.start(ctx)
|
room.start(ctx)
|
||||||
}
|
}
|
||||||
@ -303,6 +303,9 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
|
|||||||
// 유저에게서 온 메세지, 소켓 연결/해체 처리
|
// 유저에게서 온 메세지, 소켓 연결/해체 처리
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case destroyedRoom := <-roomDestroyChan:
|
||||||
|
delete(rooms, destroyedRoom)
|
||||||
|
|
||||||
case usermsg := <-sh.localDeliveryChan:
|
case usermsg := <-sh.localDeliveryChan:
|
||||||
// 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널
|
// 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널
|
||||||
// 없으면 publish한다.
|
// 없으면 publish한다.
|
||||||
@ -334,7 +337,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
|
|||||||
accid := usermsg.Args[1].(primitive.ObjectID)
|
accid := usermsg.Args[1].(primitive.ObjectID)
|
||||||
conn := entireConns[accid.Hex()]
|
conn := entireConns[accid.Hex()]
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
findRoom(roomName, true).in(conn)
|
conn.pushRoom(findRoom(roomName, true).in(conn))
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
} else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 {
|
} else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 {
|
||||||
@ -343,17 +346,12 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
|
|||||||
conn := entireConns[accid.Hex()]
|
conn := entireConns[accid.Hex()]
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
if room := findRoom(roomName, false); room != nil {
|
if room := findRoom(roomName, false); room != nil {
|
||||||
room.out(conn)
|
if conn.popRoom(room.out(conn)) == 0 {
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if usermsg.Cmd == commandType_WriteControl && len(usermsg.Args) == 2 {
|
|
||||||
accid := usermsg.Args[0].(string)
|
|
||||||
conn := entireConns[accid]
|
|
||||||
if conn != nil {
|
|
||||||
conn.WriteControl(usermsg.Args[1].(int), usermsg.Args[2].([]byte), time.Time{})
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다
|
// 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다
|
||||||
@ -391,7 +389,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
|
|||||||
accid := usermsg.Args[1].(primitive.ObjectID)
|
accid := usermsg.Args[1].(primitive.ObjectID)
|
||||||
conn := entireConns[accid.Hex()]
|
conn := entireConns[accid.Hex()]
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
findRoom(roomName, true).in(conn)
|
conn.pushRoom(findRoom(roomName, true).in(conn))
|
||||||
}
|
}
|
||||||
} else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 {
|
} else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 {
|
||||||
roomName := usermsg.Args[0].(string)
|
roomName := usermsg.Args[0].(string)
|
||||||
@ -399,7 +397,9 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
|
|||||||
conn := entireConns[accid.Hex()]
|
conn := entireConns[accid.Hex()]
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
if room := findRoom(roomName, false); room != nil {
|
if room := findRoom(roomName, false); room != nil {
|
||||||
room.out(conn)
|
if conn.popRoom(room.out(conn)) == 0 {
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -411,9 +411,10 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
|
|||||||
case c := <-sh.connInOutChan:
|
case c := <-sh.connInOutChan:
|
||||||
if c.Conn == nil {
|
if c.Conn == nil {
|
||||||
delete(entireConns, c.sender.Accid.Hex())
|
delete(entireConns, c.sender.Accid.Hex())
|
||||||
for _, room := range rooms {
|
for _, room := range c.joinedRooms {
|
||||||
room.out(c)
|
room.out(c)
|
||||||
}
|
}
|
||||||
|
c.joinedRooms = nil
|
||||||
sh.callReceiver(c.sender, Disconnected, nil)
|
sh.callReceiver(c.sender, Disconnected, nil)
|
||||||
} else {
|
} else {
|
||||||
entireConns[c.sender.Accid.Hex()] = c
|
entireConns[c.sender.Accid.Hex()] = c
|
||||||
|
|||||||
Reference in New Issue
Block a user