From e0e911f9e7da279436be08ed42d5a2da41a418c1 Mon Sep 17 00:00:00 2001 From: mountain Date: Wed, 19 Jul 2023 09:35:25 +0900 Subject: [PATCH] Squashed commit of the following: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit commit 29b2f258507d9e11e20a9693b86b3ae09e10d88c Author: mountain Date: Wed Jul 19 09:33:37 2023 +0900 타입 이름 변경 commit 256bfd030c294d2d7bea4ca2c3082f2d49ae9aef Author: mountain Date: Wed Jul 19 09:31:01 2023 +0900 redison 추가 commit 72a683fed2c024616b2171be1f6841a11150a3a8 Author: mountain Date: Tue Jul 18 19:51:24 2023 +0900 gob에 []any 추가 commit 89fa9e4ac585026c331d697929697af5defc6b9d Author: mountain Date: Tue Jul 18 17:45:12 2023 +0900 write control 수정 commit d724cc84fa94ab6cdd3d8bddd3ab08c99d0aef3a Author: mountain Date: Tue Jul 18 17:38:04 2023 +0900 redis pubsub 채널 이름에 디비 인덱스 추가 commit 8df248fa54908a8cb547813179e4269e25c7df87 Author: mountain Date: Tue Jul 18 17:20:47 2023 +0900 close를 writecontrol로 변경 commit 40a603522d4082f426a0d3818d852db53e458cd2 Author: mountain Date: Tue Jul 18 12:21:06 2023 +0900 conn에 msg를 쓰는 쓰레드 단일화 commit c21017d2cd8b2bd26bdbf6d4eca49d06b8462ce0 Author: mountain Date: Tue Jul 18 11:08:38 2023 +0900 redis call이 문제가 아니었음 commit 82abcddb497bdb95b0eff2d7a98b72cacf37bc9a Author: mountain Date: Tue Jul 18 11:04:15 2023 +0900 잦은 redis call 회피 commit 289af24a8ffaa55336bfabca151a71f6e1f82290 Author: mountain Date: Tue Jul 18 09:55:18 2023 +0900 room create 메시지 전송 commit 4b35e0e6386b1a27e4dec854d1fc2c755f20aeef Author: mountain Date: Tue Jul 18 09:45:27 2023 +0900 EventReceiver 인터페이스 추가 commit 29843802ff0eb6378af63b2a14de826a594f5a9e Author: mountain Date: Mon Jul 17 17:45:40 2023 +0900 gob 등록 commit 66aea48fb7322728d0f665e37b2dd818f441c26f Author: mountain Date: Sun Jul 16 18:39:11 2023 +0900 채널간 publish marshalling을 gob으로 변경 commit 8f6c87a8aeb86f8fb41ba7a5ff75e3e8ee9ede2c Author: mountain Date: Sun Jul 16 16:37:02 2023 +0900 redis option을 copy로 변경 commit f0f459332d1a62a938a9b9b7ca34e3d3a2f26e8c Author: mountain Date: Sat Jul 15 17:08:33 2023 +0900 wshandler에서 authcache제거하고 config 포맷 변경 --- authcollection.go | 8 +- redis.go | 412 ++++++++++++++++++++++++++++++++++++++++- reflect_config.go | 5 +- rpc/rpc.go | 1 + rpc/rpc_test.go | 2 +- wshandler/room.go | 10 +- wshandler/wshandler.go | 244 +++++++++++++++++------- 7 files changed, 592 insertions(+), 90 deletions(-) diff --git a/authcollection.go b/authcollection.go index a441ad3..c1e688a 100644 --- a/authcollection.go +++ b/authcollection.go @@ -30,7 +30,7 @@ type Authinfo struct { } const ( - sessionSyncChannelName = "session-sync-channel2" + sessionSyncChannelNamePrefix = "session-sync-channel2" ) type AuthinfoCell interface { @@ -99,6 +99,8 @@ func (ac *redisAuthCell) ToBytes() []byte { func newAuthCollectionWithRedis(redisClient *redis.Client, subctx context.Context, maingateURL string, apiToken string) *AuthCollection { sessionTTL := int64(3600) ac := MakeAuthCollection(time.Duration(sessionTTL * int64(time.Second))) + + sessionSyncChannelName := fmt.Sprintf("%s-%d", sessionSyncChannelNamePrefix, redisClient.Options().DB) pubsub := redisClient.Subscribe(subctx, sessionSyncChannelName) ctx, cancel := context.WithCancel(context.TODO()) go func(ctx context.Context, sub *redis.PubSub, authCache *AuthCollection) { @@ -205,7 +207,7 @@ func (acg *AuthCollectionGlobal) Reload(context context.Context) error { for r, url := range config.RegionStorage { if _, ok := oldval[r]; !ok { // 새로 생겼네 - redisClient, err := NewRedisClient(url.Redis.URL, url.Redis.Offset["session"]) + redisClient, err := NewRedisClient(url.Redis["session"]) if err != nil { return err } @@ -228,7 +230,7 @@ func NewAuthCollectionGlobal(context context.Context, apiToken string) (AuthColl output := make(map[string]*AuthCollection) for region, url := range config.RegionStorage { - redisClient, err := NewRedisClient(url.Redis.URL, url.Redis.Offset["session"]) + redisClient, err := NewRedisClient(url.Redis["session"]) if err != nil { return AuthCollectionGlobal{}, err } diff --git a/redis.go b/redis.go index 627042c..eac3f87 100644 --- a/redis.go +++ b/redis.go @@ -2,9 +2,11 @@ package gocommon import ( "context" - "net/url" + "encoding/json" + "fmt" "os" "strconv" + "strings" "github.com/go-redis/redis/v8" ) @@ -19,23 +21,30 @@ func newRedisClient(uri string, dbidxoffset int) *redis.Client { return redis.NewClient(option) } -func NewRedisClient(uri string, dbidx int) (*redis.Client, error) { +func NewRedisClient(uri string) (*redis.Client, error) { if !*devflag { - return newRedisClient(uri, dbidx), nil + return newRedisClient(uri, 0), nil } - rdb := newRedisClient(uri, 0) - devUrl, _ := url.Parse(uri) + option, err := redis.ParseURL(uri) + if err != nil { + return nil, err + } + zero := *option + zero.DB = 0 + rdb := redis.NewClient(&zero) + defer rdb.Close() + hostname, _ := os.Hostname() myidx, _ := rdb.HGet(context.Background(), "private_db", hostname).Result() if len(myidx) > 0 { - devUrl.Path = "/" + myidx - return newRedisClient(devUrl.String(), dbidx), nil + offset, _ := strconv.Atoi(myidx) + option.DB += offset + return redis.NewClient(option), nil } alldbs, err := rdb.HGetAll(context.Background(), "private_db").Result() if err != nil { - rdb.Close() return nil, err } @@ -53,6 +62,389 @@ func NewRedisClient(uri string, dbidx int) (*redis.Client, error) { return nil, err } - devUrl.Path = "/" + strconv.Itoa(newidx) - return newRedisClient(devUrl.String(), dbidx), nil + option.DB += newidx + return redis.NewClient(option), nil +} + +type RedisonSetOption = string +type RedisonGetOption = [2]any + +const ( + // JSONSET command Options + RedisonSetOptionNX RedisonSetOption = "NX" + RedisonSetOptionXX RedisonSetOption = "XX" +) + +var ( + RedisonGetOptionSPACE = RedisonGetOption{"SPACE", " "} + RedisonGetOptionINDENT = RedisonGetOption{"INDENT", "\t"} + RedisonGetOptionNEWLINE = RedisonGetOption{"NEWLINE", "\n"} + RedisonGetOptionNOESCAPE = RedisonGetOption{"NOESCAPE", ""} +) + +// gocommon으로 옮길 거 +type RedisonHandler struct { + *redis.Client + ctx context.Context +} + +func NewRedisonHandler(ctx context.Context, redisClient *redis.Client) *RedisonHandler { + return &RedisonHandler{ + Client: redisClient, + ctx: ctx, + } +} + +func respToArray[T any](resp any, err error) ([]T, error) { + if err != nil { + return nil, err + } + + resArr := resp.([]any) + v := make([]T, len(resArr)) + for i, e := range resArr { + v[i] = e.(T) + } + return v, nil +} + +func appendArgs[T any](args []any, ext ...T) []any { + for _, e := range ext { + args = append(args, e) + } + return args +} + +func (rh *RedisonHandler) JSONMSetRel(key string, prefixPath string, kv map[string]any) error { + if len(prefixPath) > 0 && !strings.HasSuffix(prefixPath, ".") { + prefixPath += "." + } + + pl := rh.Pipeline() + for path, obj := range kv { + b, err := json.Marshal(obj) + if err != nil { + return err + } + pl.Do(rh.ctx, "JSON.SET", key, prefixPath+path, b) + } + + cmders, err := pl.Exec(rh.ctx) + if err != nil { + return err + } + + for _, cmder := range cmders { + if cmder.Err() != nil { + return cmder.Err() + } + } + return nil +} + +func (rh *RedisonHandler) JSONMSet(key string, kv map[string]any) error { + return rh.JSONMSetRel(key, "", kv) +} + +func (rh *RedisonHandler) jsonSetMergeJSONSet(cmd, key, path string, obj any, opts ...RedisonSetOption) (bool, error) { + b, err := json.Marshal(obj) + if err != nil { + return false, err + } + + args := []any{ + "JSON.SET", + key, + path, + b, + } + if len(opts) > 0 { + args = append(args, opts[0]) + } + + res, err := rh.Do(rh.ctx, args...).Result() + if err != nil { + return false, err + } + + return res.(string) == "OK", nil +} + +func (rh *RedisonHandler) JSONSet(key, path string, obj any, opts ...RedisonSetOption) (bool, error) { + return rh.jsonSetMergeJSONSet("JSON.SET", key, path, obj, opts...) +} + +func (rh *RedisonHandler) JSONMerge(key, path string, obj any, opts ...RedisonSetOption) (bool, error) { + return rh.jsonSetMergeJSONSet("JSON.MERGE", key, path, obj, opts...) +} + +func (rh *RedisonHandler) JSONGet(key, path string, opts ...RedisonGetOption) (res any, err error) { + args := appendArgs[string]([]any{ + "JSON.GET", + key, + }, strings.Split(path, " ")...) + + for _, opt := range opts { + args = append(args, opt[:]...) + } + + return rh.Do(rh.ctx, args...).Result() +} + +func (rh *RedisonHandler) JSONGetString(key, path string) ([]string, error) { + return respToArray[string](rh.JSONResp(key, path)) +} + +func (rh *RedisonHandler) JSONGetInt64(key, path string) ([]int64, error) { + return respToArray[int64](rh.JSONResp(key, path)) +} + +func (rh *RedisonHandler) JSONMGet(path string, keys ...string) (res any, err error) { + args := appendArgs[string]([]any{ + "JSON.MGET", + path, + }, keys...) + return rh.Do(rh.ctx, args...).Result() +} + +func (rh *RedisonHandler) JSONMDel(key string, paths []string) error { + pl := rh.Pipeline() + for _, path := range paths { + args := []any{ + "JSON.DEL", + key, + path, + } + pl.Do(rh.ctx, args...) + } + _, err := pl.Exec(rh.ctx) + return err +} + +func (rh *RedisonHandler) JSONDel(key, path string) (int64, error) { + args := []any{ + "JSON.DEL", + key, + path, + } + resp, err := rh.Do(rh.ctx, args...).Result() + if err != nil { + return 0, err + } + + return resp.(int64), nil +} + +func (rh *RedisonHandler) JSONType(key, path string) ([]string, error) { + args := []any{ + "JSON.TYPE", + key, + path, + } + return respToArray[string](rh.Do(rh.ctx, args...).Result()) +} + +func (rh *RedisonHandler) JSONNumIncrBy(key, path string, number int) ([]any, error) { + args := []any{ + "JSON.NUMINCRBY", + key, + path, + number, + } + resp, err := rh.Do(rh.ctx, args...).Result() + if err != nil { + return nil, err + } + + return resp.([]any), nil +} + +func (rh *RedisonHandler) JSONNumMultBy(key, path string, number int) (res any, err error) { + args := []any{ + "JSON.NUMMULTBY", + key, + path, + number, + } + resp, err := rh.Do(rh.ctx, args...).Result() + if err != nil { + return nil, err + } + + return resp.([]any), nil +} + +func (rh *RedisonHandler) JSONStrAppend(key, path string, jsonstring string) ([]int64, error) { + args := []any{ + "JSON.STRAPPEND", + key, + path, + fmt.Sprintf(`'"%s"'`, jsonstring), + } + return respToArray[int64](rh.Do(rh.ctx, args...).Result()) +} + +func (rh *RedisonHandler) JSONStrLen(key, path string) (res []int64, err error) { + args := []any{ + "JSON.STRLEN", + key, + path, + } + return respToArray[int64](rh.Do(rh.ctx, args...).Result()) +} + +func (rh *RedisonHandler) JSONArrAppend(key, path string, values ...any) (int64, error) { + args := []any{ + "JSON.ARRAPPEND", + key, + path, + } + resp, err := rh.Do(rh.ctx, args...).Result() + if err != nil { + return 0, err + } + + return resp.(int64), nil +} + +func (rh *RedisonHandler) JSONArrLen(key, path string) ([]int64, error) { + args := []any{ + "JSON.ARRLEN", + key, + path, + } + return respToArray[int64](rh.Do(rh.ctx, args...).Result()) +} + +func (rh *RedisonHandler) JSONArrPop(key, path string, index int) (res any, err error) { + args := []any{ + "JSON.ARRPOP", + key, + path, + index, + } + resp, err := rh.Do(rh.ctx, args...).Result() + if err != nil { + return nil, err + } + return resp.([]any)[0], nil +} + +func appendValues(args []any, values ...any) []any { + for _, jsonValue := range values { + switch jsonValue := jsonValue.(type) { + case string: + args = append(args, fmt.Sprintf(`'"%s"'`, jsonValue)) + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: + args = append(args, jsonValue) + default: + bt, _ := json.Marshal(jsonValue) + args = append(args, bt) + } + } + return args +} + +func (rh *RedisonHandler) JSONArrIndex(key, path string, jsonValue any, optionalRange ...int) ([]int64, error) { + args := appendValues([]any{ + "JSON.ARRINDEX", + key, + path, + }, jsonValue) + + return respToArray[int64](rh.Do(rh.ctx, args...).Result()) +} + +func (rh *RedisonHandler) JSONArrTrim(key, path string, start, end int) (res any, err error) { + args := []any{ + "JSON.ARRTRIM", + key, + path, + start, + end, + } + return respToArray[int64](rh.Do(rh.ctx, args...).Result()) +} + +func (rh *RedisonHandler) JSONArrInsert(key, path string, index int, values ...any) (res any, err error) { + args := appendValues([]any{ + "JSON.ARRINSERT", + key, + path, + index, + }, values...) + + return respToArray[int64](rh.Do(rh.ctx, args...).Result()) +} + +func (rh *RedisonHandler) JSONObjKeys(key, path string) ([]string, error) { + args := []any{ + "JSON.OBJKEYS", + key, + path, + } + + res, err := rh.Do(rh.ctx, args...).Result() + if err != nil { + return nil, err + } + + resArr := res.([]any) + resArr = resArr[0].([]any) + slc := make([]string, len(resArr)) + + for i, r := range resArr { + slc[i] = r.(string) + } + + return slc, nil +} + +func (rh *RedisonHandler) JSONObjLen(key, path string) ([]int64, error) { + args := []any{ + "JSON.OBJLEN", + key, + } + + if path != "$" { + args = append(args, path) + } + + resp, err := rh.Do(rh.ctx, args...).Result() + if err != nil { + return nil, err + } + + switch resp := resp.(type) { + case []any: + return respToArray[int64](resp, nil) + + case int64: + return []int64{resp}, nil + } + + return []int64{0}, nil +} + +func (rh *RedisonHandler) JSONDebug(key, path string) (res any, err error) { + args := []any{ + "JSON.DEBUG", + "MEMORY", + key, + path, + } + return rh.Do(rh.ctx, args...).Result() +} + +func (rh *RedisonHandler) JSONForget(key, path string) (int64, error) { + return rh.JSONDel(key, path) +} + +func (rh *RedisonHandler) JSONResp(key, path string) (res any, err error) { + args := []any{ + "JSON.RESP", + key, + path, + } + return rh.Do(rh.ctx, args...).Result() } diff --git a/reflect_config.go b/reflect_config.go index c8db5ce..50dd200 100644 --- a/reflect_config.go +++ b/reflect_config.go @@ -65,10 +65,7 @@ func LoadConfig[T any](outptr *T) error { type StorageAddr struct { Mongo string - Redis struct { - URL string - Offset map[string]int - } + Redis map[string]string } type RegionStorageConfig struct { diff --git a/rpc/rpc.go b/rpc/rpc.go index 3bcff82..6a62991 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -68,6 +68,7 @@ func Start(ctx context.Context, redisClient *redis.Client) { hash.Write([]byte(inName)) } } + hash.Write([]byte(fmt.Sprintf("%d", redisClient.Options().DB))) } pubsubName := hex.EncodeToString(hash.Sum(nil))[:16] diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index ccc3071..765354f 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -34,7 +34,7 @@ func TestRpc(t *testing.T) { RegistReceiver(&tr) myctx, cancel := context.WithCancel(context.Background()) - redisClient, _ := gocommon.NewRedisClient("redis://192.168.8.94:6379", 0) + redisClient, _ := gocommon.NewRedisClient("redis://192.168.8.94:6379") go func() { for { tr.TestFunc("aaaa", "bbbb", 333) diff --git a/wshandler/room.go b/wshandler/room.go index db71571..786793d 100644 --- a/wshandler/room.go +++ b/wshandler/room.go @@ -14,16 +14,18 @@ type room struct { messageChan chan *UpstreamMessage name string destroyChan chan<- string + sendMsgChan chan<- send_msg_queue_elem } // 만약 destroyChan가 nil이면 room이 비어도 파괴되지 않는다. 영구 유지되는 room -func makeRoom(name string, destroyChan chan<- string) *room { +func makeRoom(name string, destroyChan chan<- string, sendMsgChan chan<- send_msg_queue_elem) *room { return &room{ inChan: make(chan *wsconn, 10), outChan: make(chan *wsconn, 10), messageChan: make(chan *UpstreamMessage, 100), name: name, destroyChan: destroyChan, + sendMsgChan: sendMsgChan, } } @@ -85,7 +87,11 @@ func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd b bt, _ := json.Marshal(ds) for _, conn := range *conns { - conn.Conn.WriteMessage(websocket.TextMessage, bt) + r.sendMsgChan <- send_msg_queue_elem{ + to: conn, + mt: websocket.TextMessage, + msg: bt, + } } } } diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 086ad06..a3189d4 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -1,7 +1,9 @@ package wshandler import ( + "bytes" "context" + "encoding/gob" "encoding/hex" "encoding/json" "fmt" @@ -9,6 +11,7 @@ import ( "net/http" "strings" "sync" + "time" "go.mongodb.org/mongo-driver/bson/primitive" "repositories.action2quare.com/ayo/gocommon" @@ -85,41 +88,56 @@ type Sender struct { Alias string } -type WebSocketMessageReceiver func(sender *Sender, messageType WebSocketMessageType, body io.Reader) +type EventReceiver interface { + OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) + OnRoomCreated(region, name string) + OnRoomDestroyed(region, name string) +} + +type send_msg_queue_elem struct { + to *wsconn + mt int + msg []byte +} type subhandler struct { - authCache *gocommon.AuthCollection redisMsgChanName string redisCmdChanName string redisSync *redis.Client connInOutChan chan *wsconn deliveryChan chan any localDeliveryChan chan any - callReceiver WebSocketMessageReceiver + sendMsgChan chan send_msg_queue_elem + callReceiver EventReceiver connWaitGroup sync.WaitGroup region string + receiverChain []EventReceiver } // WebsocketHandler : type WebsocketHandler struct { - authCaches map[string]*subhandler - RedisSync *redis.Client - receiverChain map[string][]WebSocketMessageReceiver + subhandlers map[string]*subhandler } type wsConfig struct { - SyncPipeline string `json:"ws_sync_pipeline"` + gocommon.RegionStorageConfig + Maingate string `json:"maingate_service_url"` } -func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *WebsocketHandler) { - var config wsConfig +var config wsConfig + +func init() { gocommon.LoadConfig(&config) + gob.Register(UpstreamMessage{}) + gob.Register(commandMessage{}) + gob.Register(map[string]any{}) + gob.Register(primitive.A{}) + gob.Register(primitive.M{}) + gob.Register(primitive.ObjectID{}) + gob.Register([]any{}) +} - redisSync, err := gocommon.NewRedisClient(config.SyncPipeline, 0) - if err != nil { - panic(err) - } - +func NewWebsocketHandler() (*WebsocketHandler, error) { // decoder := func(r io.Reader) *T { // if r == nil { // // 접속이 끊겼을 때. @@ -135,46 +153,93 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock // return &m // } - authCaches := make(map[string]*subhandler) - for _, region := range authglobal.Regions() { + subhandlers := make(map[string]*subhandler) + for region, cfg := range config.RegionStorage { + redisSync, err := gocommon.NewRedisClient(cfg.Redis["wshandler"]) + if err != nil { + return nil, err + } + + sendchan := make(chan send_msg_queue_elem, 1000) + go func() { + sender := func(elem *send_msg_queue_elem) { + defer func() { + r := recover() + if r != nil { + logger.Println(r) + } + }() + elem.to.WriteMessage(elem.mt, elem.msg) + } + + for elem := range sendchan { + sender(&elem) + } + }() + sh := &subhandler{ - authCache: authglobal.Get(region), - redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), - redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), + redisMsgChanName: fmt.Sprintf("_wsh_msg_%s_%d", region, redisSync.Options().DB), + redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s_%d", region, redisSync.Options().DB), redisSync: redisSync, connInOutChan: make(chan *wsconn), deliveryChan: make(chan any, 1000), localDeliveryChan: make(chan any, 100), + sendMsgChan: sendchan, region: region, } - authCaches[region] = sh + subhandlers[region] = sh } return &WebsocketHandler{ - authCaches: authCaches, - RedisSync: redisSync, - receiverChain: make(map[string][]WebSocketMessageReceiver), + subhandlers: subhandlers, + }, nil +} + +func (ws *WebsocketHandler) RegisterReceiver(region string, receiver EventReceiver) { + if sh := ws.subhandlers[region]; sh != nil { + sh.receiverChain = append(sh.receiverChain, receiver) } } -func (ws *WebsocketHandler) RegisterReceiver(region string, receiver WebSocketMessageReceiver) { - ws.receiverChain[region] = append(ws.receiverChain[region], receiver) +type nilReceiver struct{} + +func (r *nilReceiver) OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) { +} +func (r *nilReceiver) OnRoomCreated(region, name string) {} +func (r *nilReceiver) OnRoomDestroyed(region, name string) {} + +type chainReceiver struct { + chain []EventReceiver +} + +func (r *chainReceiver) OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) { + for _, cr := range r.chain { + cr.OnClientMessageReceived(sender, messageType, body) + } +} + +func (r *chainReceiver) OnRoomCreated(region, name string) { + for _, cr := range r.chain { + cr.OnRoomCreated(region, name) + } +} + +func (r *chainReceiver) OnRoomDestroyed(region, name string) { + for _, cr := range r.chain { + cr.OnRoomDestroyed(region, name) + } } func (ws *WebsocketHandler) Start(ctx context.Context) { - for region, sh := range ws.authCaches { - chain := ws.receiverChain[region] + for _, sh := range ws.subhandlers { + chain := sh.receiverChain if len(chain) == 0 { - sh.callReceiver = func(sender *Sender, messageType WebSocketMessageType, body io.Reader) {} + sh.callReceiver = &nilReceiver{} } else if len(chain) == 1 { sh.callReceiver = chain[0] } else { - sh.callReceiver = func(sender *Sender, messageType WebSocketMessageType, body io.Reader) { - for _, r := range chain { - r(sender, messageType, body) - } - } + sh.callReceiver = &chainReceiver{chain: sh.receiverChain} } go sh.mainLoop(ctx) @@ -182,13 +247,13 @@ func (ws *WebsocketHandler) Start(ctx context.Context) { } func (ws *WebsocketHandler) Cleanup() { - for _, sh := range ws.authCaches { + for _, sh := range ws.subhandlers { sh.connWaitGroup.Wait() } } func (ws *WebsocketHandler) RegisterHandlers(serveMux *http.ServeMux, prefix string) error { - for region, sh := range ws.authCaches { + for region, sh := range ws.subhandlers { if region == "default" { region = "" } @@ -204,26 +269,28 @@ func (ws *WebsocketHandler) RegisterHandlers(serveMux *http.ServeMux, prefix str } func (ws *WebsocketHandler) GetState(region string, accid primitive.ObjectID) string { - state, err := ws.RedisSync.Get(context.Background(), accid.Hex()).Result() - if err == redis.Nil { - return "" + if sh := ws.subhandlers[region]; sh != nil { + state, _ := sh.redisSync.Get(context.Background(), accid.Hex()).Result() + return state } - return state + return "" } func (ws *WebsocketHandler) SetState(region string, accid primitive.ObjectID, state string) { - ws.RedisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result() + if sh := ws.subhandlers[region]; sh != nil { + sh.redisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result() + } } func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMessage) { - sh := ws.authCaches[region] + sh := ws.subhandlers[region] if sh != nil { sh.localDeliveryChan <- msg } } func (ws *WebsocketHandler) EnterRoom(region string, room string, accid primitive.ObjectID) { - sh := ws.authCaches[region] + sh := ws.subhandlers[region] if sh != nil { sh.localDeliveryChan <- &commandMessage{ Cmd: commandType_JoinRoom, @@ -233,7 +300,7 @@ func (ws *WebsocketHandler) EnterRoom(region string, room string, accid primitiv } func (ws *WebsocketHandler) LeaveRoom(region string, room string, accid primitive.ObjectID) { - sh := ws.authCaches[region] + sh := ws.subhandlers[region] if sh != nil { sh.localDeliveryChan <- &commandMessage{ Cmd: commandType_LeaveRoom, @@ -260,16 +327,19 @@ func (sh *subhandler) mainLoop(ctx context.Context) { raw, err := pubsub.ReceiveMessage(ctx) if err == nil { + buffer := bytes.NewBuffer([]byte(raw.Payload)) + dec := gob.NewDecoder(buffer) + if raw.Channel == sh.redisMsgChanName { var msg UpstreamMessage - if err := json.Unmarshal([]byte(raw.Payload), &msg); err == nil { + if err := dec.Decode(&msg); err == nil { sh.deliveryChan <- &msg } else { logger.Println("decode UpstreamMessage failed :", err) } } else if raw.Channel == sh.redisCmdChanName { var cmd commandMessage - if err := json.Unmarshal([]byte(raw.Payload), &cmd); err == nil { + if err := dec.Decode(&cmd); err == nil { sh.deliveryChan <- &cmd } else { logger.Println("decode UpstreamMessage failed :", err) @@ -293,18 +363,23 @@ func (sh *subhandler) mainLoop(ctx context.Context) { findRoom := func(name string, create bool) *room { room := rooms[name] if room == nil && create { - room = makeRoom(name, roomDestroyChan) + room = makeRoom(name, roomDestroyChan, sh.sendMsgChan) rooms[name] = room room.start(ctx) + go sh.callReceiver.OnRoomCreated(sh.region, name) } return room } // 유저에게서 온 메세지, 소켓 연결/해체 처리 for { + buffer := bytes.NewBuffer(make([]byte, 0, 1024)) + buffer.Reset() + select { case destroyedRoom := <-roomDestroyChan: delete(rooms, destroyedRoom) + go sh.callReceiver.OnRoomDestroyed(sh.region, destroyedRoom) case usermsg := <-sh.localDeliveryChan: // 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널 @@ -322,13 +397,23 @@ func (sh *subhandler) mainLoop(ctx context.Context) { Body: usermsg.Body, Tag: usermsg.Tag, }) - - conn.WriteMessage(websocket.TextMessage, ds) + sh.sendMsgChan <- send_msg_queue_elem{ + to: conn, + mt: websocket.TextMessage, + msg: ds, + } break } } - if bt, err := json.Marshal(usermsg); err == nil { - sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, bt).Result() + + var err error + enc := gob.NewEncoder(buffer) + if err = enc.Encode(usermsg); err == nil { + _, err = sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, buffer.Bytes()).Result() + } + + if err != nil { + logger.Println("gob.Encode or publish failed :", err) } case *commandMessage: @@ -347,7 +432,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) { if conn != nil { if room := findRoom(roomName, false); room != nil { if conn.popRoom(room.out(conn)) == 0 { - conn.Close() + closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") + conn.WriteControl(websocket.CloseMessage, closeMsg, time.Time{}) } break } @@ -355,8 +441,13 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } // 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다 - if bt, err := json.Marshal(usermsg); err == nil { - sh.redisSync.Publish(context.Background(), sh.redisCmdChanName, bt).Result() + var err error + enc := gob.NewEncoder(buffer) + if err = enc.Encode(usermsg); err == nil { + _, err = sh.redisSync.Publish(context.Background(), sh.redisCmdChanName, buffer.Bytes()).Result() + } + if err != nil { + logger.Println("gob.Encode or Publish failed :", err) } } @@ -379,7 +470,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) { Body: usermsg.Body, Tag: usermsg.Tag, }) - conn.WriteMessage(websocket.TextMessage, ds) + sh.sendMsgChan <- send_msg_queue_elem{ + to: conn, + mt: websocket.TextMessage, + msg: ds, + } } } @@ -398,7 +493,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) { if conn != nil { if room := findRoom(roomName, false); room != nil { if conn.popRoom(room.out(conn)) == 0 { - conn.Close() + closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") + conn.WriteControl(websocket.CloseMessage, closeMsg, time.Time{}) } } } @@ -415,10 +511,10 @@ func (sh *subhandler) mainLoop(ctx context.Context) { room.out(c) } c.joinedRooms = nil - sh.callReceiver(c.sender, Disconnected, nil) + go sh.callReceiver.OnClientMessageReceived(c.sender, Disconnected, nil) } else { entireConns[c.sender.Accid.Hex()] = c - sh.callReceiver(c.sender, Connected, nil) + go sh.callReceiver.OnClientMessageReceived(c.sender, Connected, nil) } } } @@ -446,15 +542,15 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID } if messageType == websocket.CloseMessage { - sh.callReceiver(c.sender, CloseMessage, r) + sh.callReceiver.OnClientMessageReceived(c.sender, CloseMessage, r) break } if messageType == websocket.TextMessage { // 유저가 직접 보낸 메시지 - sh.callReceiver(c.sender, TextMessage, r) + sh.callReceiver.OnClientMessageReceived(c.sender, TextMessage, r) } else if messageType == websocket.BinaryMessage { - sh.callReceiver(c.sender, BinaryMessage, r) + sh.callReceiver.OnClientMessageReceived(c.sender, BinaryMessage, r) } } sh.redisSync.Del(context.Background(), accid.Hex()) @@ -529,17 +625,25 @@ func (sh *subhandler) upgrade(w http.ResponseWriter, r *http.Request) { }() sk := r.Header.Get("AS-X-SESSION") - auth := strings.Split(r.Header.Get("Authorization"), " ") - if len(auth) != 2 { - //TODO : 클라이언트는 BadRequest를 받으면 로그인 화면으로 돌아가야 한다. - w.WriteHeader(http.StatusBadRequest) + auth := r.Header.Get("Authorization") + + req, _ := http.NewRequest("GET", fmt.Sprintf("%s/query?sk=%s", config.Maingate, sk), nil) + req.Header.Add("Authorization", auth) + + client := http.Client{} + resp, err := client.Do(req) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + logger.Error("authorize query failed :", err) return } - authtoken := auth[1] + defer resp.Body.Close() - accid, success := sh.authCache.IsValid(sk, authtoken) - if !success { - w.WriteHeader(http.StatusUnauthorized) + var authinfo gocommon.Authinfo + dec := json.NewDecoder(resp.Body) + if err = dec.Decode(&authinfo); err != nil { + w.WriteHeader(http.StatusInternalServerError) + logger.Error("authorize query failed :", err) return } @@ -554,8 +658,8 @@ func (sh *subhandler) upgrade(w http.ResponseWriter, r *http.Request) { if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 { alias = v } else { - alias = accid.Hex() + alias = authinfo.Accid.Hex() } - upgrade_core(sh, conn, accid, alias) + upgrade_core(sh, conn, authinfo.Accid, alias) }