conn에 msg를 쓰는 쓰레드 단일화

This commit is contained in:
2023-07-18 12:21:06 +09:00
parent c21017d2cd
commit 40a603522d
2 changed files with 44 additions and 6 deletions

View File

@ -14,16 +14,18 @@ type room struct {
messageChan chan *UpstreamMessage messageChan chan *UpstreamMessage
name string name string
destroyChan chan<- string destroyChan chan<- string
sendMsgChan chan<- send_msg_queue_elem
} }
// 만약 destroyChan가 nil이면 room이 비어도 파괴되지 않는다. 영구 유지되는 room // 만약 destroyChan가 nil이면 room이 비어도 파괴되지 않는다. 영구 유지되는 room
func makeRoom(name string, destroyChan chan<- string) *room { func makeRoom(name string, destroyChan chan<- string, sendMsgChan chan<- send_msg_queue_elem) *room {
return &room{ return &room{
inChan: make(chan *wsconn, 10), inChan: make(chan *wsconn, 10),
outChan: make(chan *wsconn, 10), outChan: make(chan *wsconn, 10),
messageChan: make(chan *UpstreamMessage, 100), messageChan: make(chan *UpstreamMessage, 100),
name: name, name: name,
destroyChan: destroyChan, destroyChan: destroyChan,
sendMsgChan: sendMsgChan,
} }
} }
@ -85,7 +87,11 @@ func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd b
bt, _ := json.Marshal(ds) bt, _ := json.Marshal(ds)
for _, conn := range *conns { for _, conn := range *conns {
conn.Conn.WriteMessage(websocket.TextMessage, bt) r.sendMsgChan <- send_msg_queue_elem{
to: conn,
mt: websocket.TextMessage,
msg: bt,
}
} }
} }
} }

View File

@ -93,6 +93,12 @@ type EventReceiver interface {
OnRoomDestroyed(region, name string) OnRoomDestroyed(region, name string)
} }
type send_msg_queue_elem struct {
to *wsconn
mt int
msg []byte
}
type subhandler struct { type subhandler struct {
redisMsgChanName string redisMsgChanName string
redisCmdChanName string redisCmdChanName string
@ -100,6 +106,7 @@ type subhandler struct {
connInOutChan chan *wsconn connInOutChan chan *wsconn
deliveryChan chan any deliveryChan chan any
localDeliveryChan chan any localDeliveryChan chan any
sendMsgChan chan send_msg_queue_elem
callReceiver EventReceiver callReceiver EventReceiver
connWaitGroup sync.WaitGroup connWaitGroup sync.WaitGroup
region string region string
@ -151,6 +158,23 @@ func NewWebsocketHandler() (*WebsocketHandler, error) {
return nil, err return nil, err
} }
sendchan := make(chan send_msg_queue_elem, 1000)
go func() {
sender := func(elem *send_msg_queue_elem) {
defer func() {
r := recover()
if r != nil {
logger.Println(r)
}
}()
elem.to.WriteMessage(elem.mt, elem.msg)
}
for elem := range sendchan {
sender(&elem)
}
}()
sh := &subhandler{ sh := &subhandler{
redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region),
redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region),
@ -158,6 +182,7 @@ func NewWebsocketHandler() (*WebsocketHandler, error) {
connInOutChan: make(chan *wsconn), connInOutChan: make(chan *wsconn),
deliveryChan: make(chan any, 1000), deliveryChan: make(chan any, 1000),
localDeliveryChan: make(chan any, 100), localDeliveryChan: make(chan any, 100),
sendMsgChan: sendchan,
region: region, region: region,
} }
@ -336,7 +361,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
findRoom := func(name string, create bool) *room { findRoom := func(name string, create bool) *room {
room := rooms[name] room := rooms[name]
if room == nil && create { if room == nil && create {
room = makeRoom(name, roomDestroyChan) room = makeRoom(name, roomDestroyChan, sh.sendMsgChan)
rooms[name] = room rooms[name] = room
room.start(ctx) room.start(ctx)
go sh.callReceiver.OnRoomCreated(sh.region, name) go sh.callReceiver.OnRoomCreated(sh.region, name)
@ -370,8 +395,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
Body: usermsg.Body, Body: usermsg.Body,
Tag: usermsg.Tag, Tag: usermsg.Tag,
}) })
sh.sendMsgChan <- send_msg_queue_elem{
conn.WriteMessage(websocket.TextMessage, ds) to: conn,
mt: websocket.TextMessage,
msg: ds,
}
break break
} }
} }
@ -439,7 +467,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
Body: usermsg.Body, Body: usermsg.Body,
Tag: usermsg.Tag, Tag: usermsg.Tag,
}) })
conn.WriteMessage(websocket.TextMessage, ds) sh.sendMsgChan <- send_msg_queue_elem{
to: conn,
mt: websocket.TextMessage,
msg: ds,
}
} }
} }