From f0f459332d1a62a938a9b9b7ca34e3d3a2f26e8c Mon Sep 17 00:00:00 2001 From: mountain Date: Sat, 15 Jul 2023 17:08:33 +0900 Subject: [PATCH] =?UTF-8?q?wshandler=EC=97=90=EC=84=9C=20authcache?= =?UTF-8?q?=EC=A0=9C=EA=B1=B0=ED=95=98=EA=B3=A0=20config=20=ED=8F=AC?= =?UTF-8?q?=EB=A7=B7=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- authcollection.go | 4 +- redis.go | 26 ++++++----- reflect_config.go | 5 +-- rpc/rpc_test.go | 2 +- wshandler/wshandler.go | 97 +++++++++++++++++++++++------------------- 5 files changed, 74 insertions(+), 60 deletions(-) diff --git a/authcollection.go b/authcollection.go index a441ad3..b5ff8ef 100644 --- a/authcollection.go +++ b/authcollection.go @@ -205,7 +205,7 @@ func (acg *AuthCollectionGlobal) Reload(context context.Context) error { for r, url := range config.RegionStorage { if _, ok := oldval[r]; !ok { // 새로 생겼네 - redisClient, err := NewRedisClient(url.Redis.URL, url.Redis.Offset["session"]) + redisClient, err := NewRedisClient(url.Redis["session"]) if err != nil { return err } @@ -228,7 +228,7 @@ func NewAuthCollectionGlobal(context context.Context, apiToken string) (AuthColl output := make(map[string]*AuthCollection) for region, url := range config.RegionStorage { - redisClient, err := NewRedisClient(url.Redis.URL, url.Redis.Offset["session"]) + redisClient, err := NewRedisClient(url.Redis["session"]) if err != nil { return AuthCollectionGlobal{}, err } diff --git a/redis.go b/redis.go index 627042c..8c5ddef 100644 --- a/redis.go +++ b/redis.go @@ -2,7 +2,6 @@ package gocommon import ( "context" - "net/url" "os" "strconv" @@ -19,23 +18,30 @@ func newRedisClient(uri string, dbidxoffset int) *redis.Client { return redis.NewClient(option) } -func NewRedisClient(uri string, dbidx int) (*redis.Client, error) { +func NewRedisClient(uri string) (*redis.Client, error) { if !*devflag { - return newRedisClient(uri, dbidx), nil + return newRedisClient(uri, 0), nil } - rdb := newRedisClient(uri, 0) - devUrl, _ := url.Parse(uri) + option, err := redis.ParseURL(uri) + if err != nil { + return nil, err + } + zero := option + zero.DB = 0 + rdb := redis.NewClient(zero) + defer rdb.Close() + hostname, _ := os.Hostname() myidx, _ := rdb.HGet(context.Background(), "private_db", hostname).Result() if len(myidx) > 0 { - devUrl.Path = "/" + myidx - return newRedisClient(devUrl.String(), dbidx), nil + offset, _ := strconv.Atoi(myidx) + option.DB += offset + return redis.NewClient(option), nil } alldbs, err := rdb.HGetAll(context.Background(), "private_db").Result() if err != nil { - rdb.Close() return nil, err } @@ -53,6 +59,6 @@ func NewRedisClient(uri string, dbidx int) (*redis.Client, error) { return nil, err } - devUrl.Path = "/" + strconv.Itoa(newidx) - return newRedisClient(devUrl.String(), dbidx), nil + option.DB += newidx + return redis.NewClient(option), nil } diff --git a/reflect_config.go b/reflect_config.go index c8db5ce..50dd200 100644 --- a/reflect_config.go +++ b/reflect_config.go @@ -65,10 +65,7 @@ func LoadConfig[T any](outptr *T) error { type StorageAddr struct { Mongo string - Redis struct { - URL string - Offset map[string]int - } + Redis map[string]string } type RegionStorageConfig struct { diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index ccc3071..765354f 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -34,7 +34,7 @@ func TestRpc(t *testing.T) { RegistReceiver(&tr) myctx, cancel := context.WithCancel(context.Background()) - redisClient, _ := gocommon.NewRedisClient("redis://192.168.8.94:6379", 0) + redisClient, _ := gocommon.NewRedisClient("redis://192.168.8.94:6379") go func() { for { tr.TestFunc("aaaa", "bbbb", 333) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 086ad06..5408313 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -88,7 +88,6 @@ type Sender struct { type WebSocketMessageReceiver func(sender *Sender, messageType WebSocketMessageType, body io.Reader) type subhandler struct { - authCache *gocommon.AuthCollection redisMsgChanName string redisCmdChanName string redisSync *redis.Client @@ -98,28 +97,26 @@ type subhandler struct { callReceiver WebSocketMessageReceiver connWaitGroup sync.WaitGroup region string + receiverChain []WebSocketMessageReceiver } // WebsocketHandler : type WebsocketHandler struct { - authCaches map[string]*subhandler - RedisSync *redis.Client - receiverChain map[string][]WebSocketMessageReceiver + subhandlers map[string]*subhandler } type wsConfig struct { - SyncPipeline string `json:"ws_sync_pipeline"` + gocommon.RegionStorageConfig + Maingate string `json:"maingate_service_url"` } -func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *WebsocketHandler) { - var config wsConfig +var config wsConfig + +func init() { gocommon.LoadConfig(&config) +} - redisSync, err := gocommon.NewRedisClient(config.SyncPipeline, 0) - if err != nil { - panic(err) - } - +func NewWebsocketHandler() (*WebsocketHandler, error) { // decoder := func(r io.Reader) *T { // if r == nil { // // 접속이 끊겼을 때. @@ -135,10 +132,14 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock // return &m // } - authCaches := make(map[string]*subhandler) - for _, region := range authglobal.Regions() { + subhandlers := make(map[string]*subhandler) + for region, cfg := range config.RegionStorage { + redisSync, err := gocommon.NewRedisClient(cfg.Redis["wshandler"]) + if err != nil { + return nil, err + } + sh := &subhandler{ - authCache: authglobal.Get(region), redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), redisSync: redisSync, @@ -148,23 +149,23 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock region: region, } - authCaches[region] = sh + subhandlers[region] = sh } return &WebsocketHandler{ - authCaches: authCaches, - RedisSync: redisSync, - receiverChain: make(map[string][]WebSocketMessageReceiver), - } + subhandlers: subhandlers, + }, nil } func (ws *WebsocketHandler) RegisterReceiver(region string, receiver WebSocketMessageReceiver) { - ws.receiverChain[region] = append(ws.receiverChain[region], receiver) + if sh := ws.subhandlers[region]; sh != nil { + sh.receiverChain = append(sh.receiverChain, receiver) + } } func (ws *WebsocketHandler) Start(ctx context.Context) { - for region, sh := range ws.authCaches { - chain := ws.receiverChain[region] + for _, sh := range ws.subhandlers { + chain := sh.receiverChain if len(chain) == 0 { sh.callReceiver = func(sender *Sender, messageType WebSocketMessageType, body io.Reader) {} } else if len(chain) == 1 { @@ -182,13 +183,13 @@ func (ws *WebsocketHandler) Start(ctx context.Context) { } func (ws *WebsocketHandler) Cleanup() { - for _, sh := range ws.authCaches { + for _, sh := range ws.subhandlers { sh.connWaitGroup.Wait() } } func (ws *WebsocketHandler) RegisterHandlers(serveMux *http.ServeMux, prefix string) error { - for region, sh := range ws.authCaches { + for region, sh := range ws.subhandlers { if region == "default" { region = "" } @@ -204,26 +205,28 @@ func (ws *WebsocketHandler) RegisterHandlers(serveMux *http.ServeMux, prefix str } func (ws *WebsocketHandler) GetState(region string, accid primitive.ObjectID) string { - state, err := ws.RedisSync.Get(context.Background(), accid.Hex()).Result() - if err == redis.Nil { - return "" + if sh := ws.subhandlers[region]; sh != nil { + state, _ := sh.redisSync.Get(context.Background(), accid.Hex()).Result() + return state } - return state + return "" } func (ws *WebsocketHandler) SetState(region string, accid primitive.ObjectID, state string) { - ws.RedisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result() + if sh := ws.subhandlers[region]; sh != nil { + sh.redisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result() + } } func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMessage) { - sh := ws.authCaches[region] + sh := ws.subhandlers[region] if sh != nil { sh.localDeliveryChan <- msg } } func (ws *WebsocketHandler) EnterRoom(region string, room string, accid primitive.ObjectID) { - sh := ws.authCaches[region] + sh := ws.subhandlers[region] if sh != nil { sh.localDeliveryChan <- &commandMessage{ Cmd: commandType_JoinRoom, @@ -233,7 +236,7 @@ func (ws *WebsocketHandler) EnterRoom(region string, room string, accid primitiv } func (ws *WebsocketHandler) LeaveRoom(region string, room string, accid primitive.ObjectID) { - sh := ws.authCaches[region] + sh := ws.subhandlers[region] if sh != nil { sh.localDeliveryChan <- &commandMessage{ Cmd: commandType_LeaveRoom, @@ -529,17 +532,25 @@ func (sh *subhandler) upgrade(w http.ResponseWriter, r *http.Request) { }() sk := r.Header.Get("AS-X-SESSION") - auth := strings.Split(r.Header.Get("Authorization"), " ") - if len(auth) != 2 { - //TODO : 클라이언트는 BadRequest를 받으면 로그인 화면으로 돌아가야 한다. - w.WriteHeader(http.StatusBadRequest) + auth := r.Header.Get("Authorization") + + req, _ := http.NewRequest("GET", fmt.Sprintf("%s/query?sk=%s", config.Maingate, sk), nil) + req.Header.Add("Authorization", auth) + + client := http.Client{} + resp, err := client.Do(req) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + logger.Error("authorize query failed :", err) return } - authtoken := auth[1] + defer resp.Body.Close() - accid, success := sh.authCache.IsValid(sk, authtoken) - if !success { - w.WriteHeader(http.StatusUnauthorized) + var authinfo gocommon.Authinfo + dec := json.NewDecoder(resp.Body) + if err = dec.Decode(&authinfo); err != nil { + w.WriteHeader(http.StatusInternalServerError) + logger.Error("authorize query failed :", err) return } @@ -554,8 +565,8 @@ func (sh *subhandler) upgrade(w http.ResponseWriter, r *http.Request) { if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 { alias = v } else { - alias = accid.Hex() + alias = authinfo.Accid.Hex() } - upgrade_core(sh, conn, accid, alias) + upgrade_core(sh, conn, authinfo.Accid, alias) }