Squashed commit of the following:

commit 29b2f25850
Author: mountain <mountain@action2quare.com>
Date:   Wed Jul 19 09:33:37 2023 +0900

    타입 이름 변경

commit 256bfd030c
Author: mountain <mountain@action2quare.com>
Date:   Wed Jul 19 09:31:01 2023 +0900

    redison 추가

commit 72a683fed2
Author: mountain <mountain@action2quare.com>
Date:   Tue Jul 18 19:51:24 2023 +0900

    gob에 []any 추가

commit 89fa9e4ac5
Author: mountain <mountain@action2quare.com>
Date:   Tue Jul 18 17:45:12 2023 +0900

    write control 수정

commit d724cc84fa
Author: mountain <mountain@action2quare.com>
Date:   Tue Jul 18 17:38:04 2023 +0900

    redis pubsub 채널 이름에 디비 인덱스 추가

commit 8df248fa54
Author: mountain <mountain@action2quare.com>
Date:   Tue Jul 18 17:20:47 2023 +0900

    close를 writecontrol로 변경

commit 40a603522d
Author: mountain <mountain@action2quare.com>
Date:   Tue Jul 18 12:21:06 2023 +0900

    conn에 msg를 쓰는 쓰레드 단일화

commit c21017d2cd
Author: mountain <mountain@action2quare.com>
Date:   Tue Jul 18 11:08:38 2023 +0900

    redis call이 문제가 아니었음

commit 82abcddb49
Author: mountain <mountain@action2quare.com>
Date:   Tue Jul 18 11:04:15 2023 +0900

    잦은 redis call 회피

commit 289af24a8f
Author: mountain <mountain@action2quare.com>
Date:   Tue Jul 18 09:55:18 2023 +0900

    room create 메시지 전송

commit 4b35e0e638
Author: mountain <mountain@action2quare.com>
Date:   Tue Jul 18 09:45:27 2023 +0900

    EventReceiver 인터페이스 추가

commit 29843802ff
Author: mountain <mountain@action2quare.com>
Date:   Mon Jul 17 17:45:40 2023 +0900

    gob 등록

commit 66aea48fb7
Author: mountain <mountain@action2quare.com>
Date:   Sun Jul 16 18:39:11 2023 +0900

    채널간 publish marshalling을 gob으로 변경

commit 8f6c87a8ae
Author: mountain <mountain@action2quare.com>
Date:   Sun Jul 16 16:37:02 2023 +0900

    redis option을 copy로 변경

commit f0f459332d
Author: mountain <mountain@action2quare.com>
Date:   Sat Jul 15 17:08:33 2023 +0900

    wshandler에서 authcache제거하고 config 포맷 변경
This commit is contained in:
2023-07-19 09:35:25 +09:00
parent 269fa0f870
commit e0e911f9e7
7 changed files with 592 additions and 90 deletions

View File

@ -1,7 +1,9 @@
package wshandler
import (
"bytes"
"context"
"encoding/gob"
"encoding/hex"
"encoding/json"
"fmt"
@ -9,6 +11,7 @@ import (
"net/http"
"strings"
"sync"
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
@ -85,41 +88,56 @@ type Sender struct {
Alias string
}
type WebSocketMessageReceiver func(sender *Sender, messageType WebSocketMessageType, body io.Reader)
type EventReceiver interface {
OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader)
OnRoomCreated(region, name string)
OnRoomDestroyed(region, name string)
}
type send_msg_queue_elem struct {
to *wsconn
mt int
msg []byte
}
type subhandler struct {
authCache *gocommon.AuthCollection
redisMsgChanName string
redisCmdChanName string
redisSync *redis.Client
connInOutChan chan *wsconn
deliveryChan chan any
localDeliveryChan chan any
callReceiver WebSocketMessageReceiver
sendMsgChan chan send_msg_queue_elem
callReceiver EventReceiver
connWaitGroup sync.WaitGroup
region string
receiverChain []EventReceiver
}
// WebsocketHandler :
type WebsocketHandler struct {
authCaches map[string]*subhandler
RedisSync *redis.Client
receiverChain map[string][]WebSocketMessageReceiver
subhandlers map[string]*subhandler
}
type wsConfig struct {
SyncPipeline string `json:"ws_sync_pipeline"`
gocommon.RegionStorageConfig
Maingate string `json:"maingate_service_url"`
}
func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *WebsocketHandler) {
var config wsConfig
var config wsConfig
func init() {
gocommon.LoadConfig(&config)
gob.Register(UpstreamMessage{})
gob.Register(commandMessage{})
gob.Register(map[string]any{})
gob.Register(primitive.A{})
gob.Register(primitive.M{})
gob.Register(primitive.ObjectID{})
gob.Register([]any{})
}
redisSync, err := gocommon.NewRedisClient(config.SyncPipeline, 0)
if err != nil {
panic(err)
}
func NewWebsocketHandler() (*WebsocketHandler, error) {
// decoder := func(r io.Reader) *T {
// if r == nil {
// // 접속이 끊겼을 때.
@ -135,46 +153,93 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock
// return &m
// }
authCaches := make(map[string]*subhandler)
for _, region := range authglobal.Regions() {
subhandlers := make(map[string]*subhandler)
for region, cfg := range config.RegionStorage {
redisSync, err := gocommon.NewRedisClient(cfg.Redis["wshandler"])
if err != nil {
return nil, err
}
sendchan := make(chan send_msg_queue_elem, 1000)
go func() {
sender := func(elem *send_msg_queue_elem) {
defer func() {
r := recover()
if r != nil {
logger.Println(r)
}
}()
elem.to.WriteMessage(elem.mt, elem.msg)
}
for elem := range sendchan {
sender(&elem)
}
}()
sh := &subhandler{
authCache: authglobal.Get(region),
redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region),
redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region),
redisMsgChanName: fmt.Sprintf("_wsh_msg_%s_%d", region, redisSync.Options().DB),
redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s_%d", region, redisSync.Options().DB),
redisSync: redisSync,
connInOutChan: make(chan *wsconn),
deliveryChan: make(chan any, 1000),
localDeliveryChan: make(chan any, 100),
sendMsgChan: sendchan,
region: region,
}
authCaches[region] = sh
subhandlers[region] = sh
}
return &WebsocketHandler{
authCaches: authCaches,
RedisSync: redisSync,
receiverChain: make(map[string][]WebSocketMessageReceiver),
subhandlers: subhandlers,
}, nil
}
func (ws *WebsocketHandler) RegisterReceiver(region string, receiver EventReceiver) {
if sh := ws.subhandlers[region]; sh != nil {
sh.receiverChain = append(sh.receiverChain, receiver)
}
}
func (ws *WebsocketHandler) RegisterReceiver(region string, receiver WebSocketMessageReceiver) {
ws.receiverChain[region] = append(ws.receiverChain[region], receiver)
type nilReceiver struct{}
func (r *nilReceiver) OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) {
}
func (r *nilReceiver) OnRoomCreated(region, name string) {}
func (r *nilReceiver) OnRoomDestroyed(region, name string) {}
type chainReceiver struct {
chain []EventReceiver
}
func (r *chainReceiver) OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) {
for _, cr := range r.chain {
cr.OnClientMessageReceived(sender, messageType, body)
}
}
func (r *chainReceiver) OnRoomCreated(region, name string) {
for _, cr := range r.chain {
cr.OnRoomCreated(region, name)
}
}
func (r *chainReceiver) OnRoomDestroyed(region, name string) {
for _, cr := range r.chain {
cr.OnRoomDestroyed(region, name)
}
}
func (ws *WebsocketHandler) Start(ctx context.Context) {
for region, sh := range ws.authCaches {
chain := ws.receiverChain[region]
for _, sh := range ws.subhandlers {
chain := sh.receiverChain
if len(chain) == 0 {
sh.callReceiver = func(sender *Sender, messageType WebSocketMessageType, body io.Reader) {}
sh.callReceiver = &nilReceiver{}
} else if len(chain) == 1 {
sh.callReceiver = chain[0]
} else {
sh.callReceiver = func(sender *Sender, messageType WebSocketMessageType, body io.Reader) {
for _, r := range chain {
r(sender, messageType, body)
}
}
sh.callReceiver = &chainReceiver{chain: sh.receiverChain}
}
go sh.mainLoop(ctx)
@ -182,13 +247,13 @@ func (ws *WebsocketHandler) Start(ctx context.Context) {
}
func (ws *WebsocketHandler) Cleanup() {
for _, sh := range ws.authCaches {
for _, sh := range ws.subhandlers {
sh.connWaitGroup.Wait()
}
}
func (ws *WebsocketHandler) RegisterHandlers(serveMux *http.ServeMux, prefix string) error {
for region, sh := range ws.authCaches {
for region, sh := range ws.subhandlers {
if region == "default" {
region = ""
}
@ -204,26 +269,28 @@ func (ws *WebsocketHandler) RegisterHandlers(serveMux *http.ServeMux, prefix str
}
func (ws *WebsocketHandler) GetState(region string, accid primitive.ObjectID) string {
state, err := ws.RedisSync.Get(context.Background(), accid.Hex()).Result()
if err == redis.Nil {
return ""
if sh := ws.subhandlers[region]; sh != nil {
state, _ := sh.redisSync.Get(context.Background(), accid.Hex()).Result()
return state
}
return state
return ""
}
func (ws *WebsocketHandler) SetState(region string, accid primitive.ObjectID, state string) {
ws.RedisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result()
if sh := ws.subhandlers[region]; sh != nil {
sh.redisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result()
}
}
func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMessage) {
sh := ws.authCaches[region]
sh := ws.subhandlers[region]
if sh != nil {
sh.localDeliveryChan <- msg
}
}
func (ws *WebsocketHandler) EnterRoom(region string, room string, accid primitive.ObjectID) {
sh := ws.authCaches[region]
sh := ws.subhandlers[region]
if sh != nil {
sh.localDeliveryChan <- &commandMessage{
Cmd: commandType_JoinRoom,
@ -233,7 +300,7 @@ func (ws *WebsocketHandler) EnterRoom(region string, room string, accid primitiv
}
func (ws *WebsocketHandler) LeaveRoom(region string, room string, accid primitive.ObjectID) {
sh := ws.authCaches[region]
sh := ws.subhandlers[region]
if sh != nil {
sh.localDeliveryChan <- &commandMessage{
Cmd: commandType_LeaveRoom,
@ -260,16 +327,19 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
raw, err := pubsub.ReceiveMessage(ctx)
if err == nil {
buffer := bytes.NewBuffer([]byte(raw.Payload))
dec := gob.NewDecoder(buffer)
if raw.Channel == sh.redisMsgChanName {
var msg UpstreamMessage
if err := json.Unmarshal([]byte(raw.Payload), &msg); err == nil {
if err := dec.Decode(&msg); err == nil {
sh.deliveryChan <- &msg
} else {
logger.Println("decode UpstreamMessage failed :", err)
}
} else if raw.Channel == sh.redisCmdChanName {
var cmd commandMessage
if err := json.Unmarshal([]byte(raw.Payload), &cmd); err == nil {
if err := dec.Decode(&cmd); err == nil {
sh.deliveryChan <- &cmd
} else {
logger.Println("decode UpstreamMessage failed :", err)
@ -293,18 +363,23 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
findRoom := func(name string, create bool) *room {
room := rooms[name]
if room == nil && create {
room = makeRoom(name, roomDestroyChan)
room = makeRoom(name, roomDestroyChan, sh.sendMsgChan)
rooms[name] = room
room.start(ctx)
go sh.callReceiver.OnRoomCreated(sh.region, name)
}
return room
}
// 유저에게서 온 메세지, 소켓 연결/해체 처리
for {
buffer := bytes.NewBuffer(make([]byte, 0, 1024))
buffer.Reset()
select {
case destroyedRoom := <-roomDestroyChan:
delete(rooms, destroyedRoom)
go sh.callReceiver.OnRoomDestroyed(sh.region, destroyedRoom)
case usermsg := <-sh.localDeliveryChan:
// 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널
@ -322,13 +397,23 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
Body: usermsg.Body,
Tag: usermsg.Tag,
})
conn.WriteMessage(websocket.TextMessage, ds)
sh.sendMsgChan <- send_msg_queue_elem{
to: conn,
mt: websocket.TextMessage,
msg: ds,
}
break
}
}
if bt, err := json.Marshal(usermsg); err == nil {
sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, bt).Result()
var err error
enc := gob.NewEncoder(buffer)
if err = enc.Encode(usermsg); err == nil {
_, err = sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, buffer.Bytes()).Result()
}
if err != nil {
logger.Println("gob.Encode or publish failed :", err)
}
case *commandMessage:
@ -347,7 +432,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
if conn != nil {
if room := findRoom(roomName, false); room != nil {
if conn.popRoom(room.out(conn)) == 0 {
conn.Close()
closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
conn.WriteControl(websocket.CloseMessage, closeMsg, time.Time{})
}
break
}
@ -355,8 +441,13 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
}
// 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다
if bt, err := json.Marshal(usermsg); err == nil {
sh.redisSync.Publish(context.Background(), sh.redisCmdChanName, bt).Result()
var err error
enc := gob.NewEncoder(buffer)
if err = enc.Encode(usermsg); err == nil {
_, err = sh.redisSync.Publish(context.Background(), sh.redisCmdChanName, buffer.Bytes()).Result()
}
if err != nil {
logger.Println("gob.Encode or Publish failed :", err)
}
}
@ -379,7 +470,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
Body: usermsg.Body,
Tag: usermsg.Tag,
})
conn.WriteMessage(websocket.TextMessage, ds)
sh.sendMsgChan <- send_msg_queue_elem{
to: conn,
mt: websocket.TextMessage,
msg: ds,
}
}
}
@ -398,7 +493,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
if conn != nil {
if room := findRoom(roomName, false); room != nil {
if conn.popRoom(room.out(conn)) == 0 {
conn.Close()
closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
conn.WriteControl(websocket.CloseMessage, closeMsg, time.Time{})
}
}
}
@ -415,10 +511,10 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
room.out(c)
}
c.joinedRooms = nil
sh.callReceiver(c.sender, Disconnected, nil)
go sh.callReceiver.OnClientMessageReceived(c.sender, Disconnected, nil)
} else {
entireConns[c.sender.Accid.Hex()] = c
sh.callReceiver(c.sender, Connected, nil)
go sh.callReceiver.OnClientMessageReceived(c.sender, Connected, nil)
}
}
}
@ -446,15 +542,15 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID
}
if messageType == websocket.CloseMessage {
sh.callReceiver(c.sender, CloseMessage, r)
sh.callReceiver.OnClientMessageReceived(c.sender, CloseMessage, r)
break
}
if messageType == websocket.TextMessage {
// 유저가 직접 보낸 메시지
sh.callReceiver(c.sender, TextMessage, r)
sh.callReceiver.OnClientMessageReceived(c.sender, TextMessage, r)
} else if messageType == websocket.BinaryMessage {
sh.callReceiver(c.sender, BinaryMessage, r)
sh.callReceiver.OnClientMessageReceived(c.sender, BinaryMessage, r)
}
}
sh.redisSync.Del(context.Background(), accid.Hex())
@ -529,17 +625,25 @@ func (sh *subhandler) upgrade(w http.ResponseWriter, r *http.Request) {
}()
sk := r.Header.Get("AS-X-SESSION")
auth := strings.Split(r.Header.Get("Authorization"), " ")
if len(auth) != 2 {
//TODO : 클라이언트는 BadRequest를 받으면 로그인 화면으로 돌아가야 한다.
w.WriteHeader(http.StatusBadRequest)
auth := r.Header.Get("Authorization")
req, _ := http.NewRequest("GET", fmt.Sprintf("%s/query?sk=%s", config.Maingate, sk), nil)
req.Header.Add("Authorization", auth)
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.Error("authorize query failed :", err)
return
}
authtoken := auth[1]
defer resp.Body.Close()
accid, success := sh.authCache.IsValid(sk, authtoken)
if !success {
w.WriteHeader(http.StatusUnauthorized)
var authinfo gocommon.Authinfo
dec := json.NewDecoder(resp.Body)
if err = dec.Decode(&authinfo); err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.Error("authorize query failed :", err)
return
}
@ -554,8 +658,8 @@ func (sh *subhandler) upgrade(w http.ResponseWriter, r *http.Request) {
if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 {
alias = v
} else {
alias = accid.Hex()
alias = authinfo.Accid.Hex()
}
upgrade_core(sh, conn, accid, alias)
upgrade_core(sh, conn, authinfo.Accid, alias)
}