EventReceiver 인터페이스 추가

This commit is contained in:
2023-07-18 09:45:27 +09:00
parent 29843802ff
commit 4b35e0e638

View File

@ -87,7 +87,11 @@ type Sender struct {
Alias string Alias string
} }
type WebSocketMessageReceiver func(sender *Sender, messageType WebSocketMessageType, body io.Reader) type EventReceiver interface {
OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader)
OnRoomCreated(region, name string)
OnRoomDestroyed(region, name string)
}
type subhandler struct { type subhandler struct {
redisMsgChanName string redisMsgChanName string
@ -96,10 +100,10 @@ type subhandler struct {
connInOutChan chan *wsconn connInOutChan chan *wsconn
deliveryChan chan any deliveryChan chan any
localDeliveryChan chan any localDeliveryChan chan any
callReceiver WebSocketMessageReceiver callReceiver EventReceiver
connWaitGroup sync.WaitGroup connWaitGroup sync.WaitGroup
region string region string
receiverChain []WebSocketMessageReceiver receiverChain []EventReceiver
} }
// WebsocketHandler : // WebsocketHandler :
@ -165,25 +169,50 @@ func NewWebsocketHandler() (*WebsocketHandler, error) {
}, nil }, nil
} }
func (ws *WebsocketHandler) RegisterReceiver(region string, receiver WebSocketMessageReceiver) { func (ws *WebsocketHandler) RegisterReceiver(region string, receiver EventReceiver) {
if sh := ws.subhandlers[region]; sh != nil { if sh := ws.subhandlers[region]; sh != nil {
sh.receiverChain = append(sh.receiverChain, receiver) sh.receiverChain = append(sh.receiverChain, receiver)
} }
} }
type nilReceiver struct{}
func (r *nilReceiver) OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) {
}
func (r *nilReceiver) OnRoomCreated(region, name string) {}
func (r *nilReceiver) OnRoomDestroyed(region, name string) {}
type chainReceiver struct {
chain []EventReceiver
}
func (r *chainReceiver) OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) {
for _, cr := range r.chain {
cr.OnClientMessageReceived(sender, messageType, body)
}
}
func (r *chainReceiver) OnRoomCreated(region, name string) {
for _, cr := range r.chain {
cr.OnRoomCreated(region, name)
}
}
func (r *chainReceiver) OnRoomDestroyed(region, name string) {
for _, cr := range r.chain {
cr.OnRoomDestroyed(region, name)
}
}
func (ws *WebsocketHandler) Start(ctx context.Context) { func (ws *WebsocketHandler) Start(ctx context.Context) {
for _, sh := range ws.subhandlers { for _, sh := range ws.subhandlers {
chain := sh.receiverChain chain := sh.receiverChain
if len(chain) == 0 { if len(chain) == 0 {
sh.callReceiver = func(sender *Sender, messageType WebSocketMessageType, body io.Reader) {} sh.callReceiver = &nilReceiver{}
} else if len(chain) == 1 { } else if len(chain) == 1 {
sh.callReceiver = chain[0] sh.callReceiver = chain[0]
} else { } else {
sh.callReceiver = func(sender *Sender, messageType WebSocketMessageType, body io.Reader) { sh.callReceiver = &chainReceiver{chain: sh.receiverChain}
for _, r := range chain {
r(sender, messageType, body)
}
}
} }
go sh.mainLoop(ctx) go sh.mainLoop(ctx)
@ -322,6 +351,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
select { select {
case destroyedRoom := <-roomDestroyChan: case destroyedRoom := <-roomDestroyChan:
delete(rooms, destroyedRoom) delete(rooms, destroyedRoom)
go sh.callReceiver.OnRoomDestroyed(sh.region, destroyedRoom)
case usermsg := <-sh.localDeliveryChan: case usermsg := <-sh.localDeliveryChan:
// 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널 // 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널
@ -444,10 +474,10 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
room.out(c) room.out(c)
} }
c.joinedRooms = nil c.joinedRooms = nil
sh.callReceiver(c.sender, Disconnected, nil) go sh.callReceiver.OnClientMessageReceived(c.sender, Disconnected, nil)
} else { } else {
entireConns[c.sender.Accid.Hex()] = c entireConns[c.sender.Accid.Hex()] = c
sh.callReceiver(c.sender, Connected, nil) go sh.callReceiver.OnClientMessageReceived(c.sender, Connected, nil)
} }
} }
} }
@ -475,15 +505,15 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID
} }
if messageType == websocket.CloseMessage { if messageType == websocket.CloseMessage {
sh.callReceiver(c.sender, CloseMessage, r) sh.callReceiver.OnClientMessageReceived(c.sender, CloseMessage, r)
break break
} }
if messageType == websocket.TextMessage { if messageType == websocket.TextMessage {
// 유저가 직접 보낸 메시지 // 유저가 직접 보낸 메시지
sh.callReceiver(c.sender, TextMessage, r) sh.callReceiver.OnClientMessageReceived(c.sender, TextMessage, r)
} else if messageType == websocket.BinaryMessage { } else if messageType == websocket.BinaryMessage {
sh.callReceiver(c.sender, BinaryMessage, r) sh.callReceiver.OnClientMessageReceived(c.sender, BinaryMessage, r)
} }
} }
sh.redisSync.Del(context.Background(), accid.Hex()) sh.redisSync.Del(context.Background(), accid.Hex())