From edd2f7aab52ec9041473a7ade3a681a7a2f21f4d Mon Sep 17 00:00:00 2001 From: mountain Date: Wed, 5 Jul 2023 11:27:44 +0900 Subject: [PATCH 01/26] =?UTF-8?q?=EB=B0=A9=EC=97=90=20=EB=A9=94=EC=84=B8?= =?UTF-8?q?=EC=A7=80=EB=A5=BC=20=EB=B8=8C=EB=A1=9C=EB=93=9C=EC=BA=90?= =?UTF-8?q?=EC=8A=A4=ED=8C=85=ED=95=98=EB=8A=94=20=EA=B8=B0=EB=8A=A5=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/room.go | 82 ++++++ wshandler/wshandler.go | 611 +++++++++++++++-------------------------- 2 files changed, 299 insertions(+), 394 deletions(-) create mode 100644 wshandler/room.go diff --git a/wshandler/room.go b/wshandler/room.go new file mode 100644 index 0000000..a443e32 --- /dev/null +++ b/wshandler/room.go @@ -0,0 +1,82 @@ +package wshandler + +import ( + "context" + + "github.com/gorilla/websocket" + "repositories.action2quare.com/ayo/gocommon/logger" +) + +type room struct { + inChan chan *Richconn + outChan chan *Richconn + messageChan chan *UpstreamMessage + name string +} + +func makeRoom(name string) *room { + return &room{ + inChan: make(chan *Richconn, 10), + outChan: make(chan *Richconn, 10), + messageChan: make(chan *UpstreamMessage, 100), + name: name, + } +} + +func (r *room) broadcast(msg *UpstreamMessage) { + r.messageChan <- msg +} + +func (r *room) in(conn *Richconn) { + r.inChan <- conn +} + +func (r *room) out(conn *Richconn) { + r.outChan <- conn +} + +func (r *room) start(ctx context.Context) { + go func(ctx context.Context) { + conns := make(map[string]*Richconn) + normal := false + for !normal { + normal = r.loop(ctx, &conns) + } + }(ctx) +} + +func (r *room) loop(ctx context.Context, conns *map[string]*Richconn) (normalEnd bool) { + defer func() { + s := recover() + if s != nil { + logger.Error(s) + normalEnd = false + } + }() + + a, b, c := []byte(`{"alias":"`), []byte(`","body":"`), []byte(`"}`) + + for { + select { + case <-ctx.Done(): + return true + + case conn := <-r.inChan: + (*conns)[conn.alias] = conn + + case conn := <-r.outChan: + delete((*conns), conn.alias) + + case msg := <-r.messageChan: + for _, conn := range *conns { + writer, _ := conn.NextWriter(websocket.TextMessage) + writer.Write(a) + writer.Write([]byte(msg.Alias)) + writer.Write(b) + writer.Write(msg.Body) + writer.Write(c) + writer.Close() + } + } + } +} diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 0ad7fb0..977ac58 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -7,100 +7,54 @@ import ( "fmt" "io" "net/http" - "os" "strings" "sync" - common "repositories.action2quare.com/ayo/gocommon" + "go.mongodb.org/mongo-driver/bson/primitive" + "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/logger" "github.com/go-redis/redis/v8" "github.com/gorilla/websocket" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" ) var noSessionFlag = flagx.Bool("nosession", false, "nosession=[true|false]") -const ( - connStateCachePrefix = "conn_state_" - connStateScript = ` - local hosts = redis.call('keys',KEYS[1]) - for index, key in ipairs(hosts) do - local ok = redis.call('hexists', key, KEYS[2]) - if ok == 1 then - return redis.call('hget', key, KEYS[2]) - end - end - return "" - ` -) - -var ConnStateCacheKey = func() string { - hn, _ := os.Hostname() - return connStateCachePrefix + hn -}() - type Richconn struct { *websocket.Conn - lock sync.Mutex - alias primitive.ObjectID - tags []string - onClose map[string]func() + closeFuncLock sync.Mutex + alias string + onClose map[string]func() } -func (rc *Richconn) AddTag(name, val string) { - rc.lock.Lock() - defer rc.lock.Unlock() - - prefix := name + "=" - for i, tag := range rc.tags { - if strings.HasPrefix(tag, prefix) { - rc.tags[i] = prefix + val - return - } - } - rc.tags = append(rc.tags, prefix+val) +type UpstreamMessage struct { + Alias string + Accid primitive.ObjectID + Target string + Body []byte } -func (rc *Richconn) GetTag(name string) string { - rc.lock.Lock() - defer rc.lock.Unlock() - - prefix := name + "=" - for _, tag := range rc.tags { - if strings.HasPrefix(tag, prefix) { - return tag[len(prefix):] - } - } - return "" +type DownstreamMessage struct { + Alias string + Body string } -func (rc *Richconn) RemoveTag(name string, val string) { - rc.lock.Lock() - defer rc.lock.Unlock() +type CommandType string - whole := fmt.Sprintf("%s=%s", name, val) - for i, tag := range rc.tags { - if tag == whole { - if i == 0 && len(rc.tags) == 1 { - rc.tags = nil - } else { - lastidx := len(rc.tags) - 1 - if i < lastidx { - rc.tags[i] = rc.tags[lastidx] - } - rc.tags = rc.tags[:lastidx] - } - return - } - } +const ( + CommandType_JoinChannel = CommandType("join_channel") + CommandType_LeaveChannel = CommandType("leave_channel") +) + +type CommandMessage struct { + Cmd CommandType + Args []string } func (rc *Richconn) RegistOnCloseFunc(name string, f func()) { - rc.lock.Lock() - defer rc.lock.Unlock() + rc.closeFuncLock.Lock() + defer rc.closeFuncLock.Unlock() if rc.onClose == nil { f() @@ -110,8 +64,8 @@ func (rc *Richconn) RegistOnCloseFunc(name string, f func()) { } func (rc *Richconn) HasOnCloseFunc(name string) bool { - rc.lock.Lock() - defer rc.lock.Unlock() + rc.closeFuncLock.Lock() + defer rc.closeFuncLock.Unlock() if rc.onClose == nil { return false @@ -122,8 +76,8 @@ func (rc *Richconn) HasOnCloseFunc(name string) bool { } func (rc *Richconn) UnregistOnCloseFunc(name string) (out func()) { - rc.lock.Lock() - defer rc.lock.Unlock() + rc.closeFuncLock.Lock() + defer rc.closeFuncLock.Unlock() if rc.onClose == nil { return @@ -133,52 +87,27 @@ func (rc *Richconn) UnregistOnCloseFunc(name string) (out func()) { return } +func (rc *Richconn) Closed() { + rc.closeFuncLock.Lock() + defer rc.closeFuncLock.Unlock() + for _, f := range rc.onClose { + f() + } +} + func (rc *Richconn) WriteBytes(data []byte) error { - rc.lock.Lock() - defer rc.lock.Unlock() return rc.WriteMessage(websocket.TextMessage, data) } -type DeliveryMessage struct { - Alias primitive.ObjectID - Body []byte - Command string - Conn *Richconn -} - -func (dm *DeliveryMessage) Parse(out any) error { - return json.Unmarshal(dm.Body, out) -} - -func (dm *DeliveryMessage) MarshalBinary() (data []byte, err error) { - return append(dm.Alias[:], dm.Body...), nil -} - -func (dm *DeliveryMessage) UnmarshalBinary(data []byte) error { - copy(dm.Alias[:], data[:12]) - dm.Body = data[12:] - return nil -} - -type tagconn struct { - rc *Richconn - state string -} -type tagconnsmap = map[primitive.ObjectID]*tagconn -type tagconns struct { - sync.Mutex - tagconnsmap -} - type subhandler struct { - sync.Mutex - authCache *common.AuthCollection - conns map[primitive.ObjectID]*Richconn - aliases map[primitive.ObjectID]primitive.ObjectID - tags map[primitive.ObjectID]*tagconns - deliveryChan chan DeliveryMessage - url string - redisSync *redis.Client + authCache *gocommon.AuthCollection + redisMsgChanName string + redisCmdChanName string + redisSync *redis.Client + connsLock sync.Mutex + connectedAlias map[string]bool + connInOutChan chan *Richconn + deliveryChan chan any } // WebsocketHandler : @@ -191,28 +120,30 @@ type wsConfig struct { SyncPipeline string `json:"ws_sync_pipeline"` } -func NewWebsocketHandler(authglobal common.AuthCollectionGlobal) (wsh *WebsocketHandler) { +func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *WebsocketHandler) { + var config wsConfig + gocommon.LoadConfig(&config) + + redisSync, err := gocommon.NewRedisClient(config.SyncPipeline, 0) + if err != nil { + panic(err) + } + authCaches := make(map[string]*subhandler) for _, region := range authglobal.Regions() { sh := &subhandler{ - authCache: authglobal.Get(region), - conns: make(map[primitive.ObjectID]*Richconn), - aliases: make(map[primitive.ObjectID]primitive.ObjectID), - tags: make(map[primitive.ObjectID]*tagconns), - deliveryChan: make(chan DeliveryMessage, 1000), + authCache: authglobal.Get(region), + redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), + redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), + redisSync: redisSync, + connectedAlias: make(map[string]bool), + connInOutChan: make(chan *Richconn), + deliveryChan: make(chan any, 1000), } authCaches[region] = sh } - var config wsConfig - common.LoadConfig(&config) - - redisSync, err := common.NewRedisClient(config.SyncPipeline, 0) - if err != nil { - panic(err) - } - return &WebsocketHandler{ authCaches: authCaches, RedisSync: redisSync, @@ -220,292 +151,190 @@ func NewWebsocketHandler(authglobal common.AuthCollectionGlobal) (wsh *Websocket } func (ws *WebsocketHandler) Destructor() { - if ws.RedisSync != nil { - ws.RedisSync.Del(context.Background(), ConnStateCacheKey) - } } -func (ws *WebsocketHandler) DeliveryChannel(region string) <-chan DeliveryMessage { - return ws.authCaches[region].deliveryChan -} - -func (ws *WebsocketHandler) Conn(region string, alias primitive.ObjectID) *Richconn { +func (ws *WebsocketHandler) IsConnected(region string, alias string) bool { if sh := ws.authCaches[region]; sh != nil { - return sh.conns[alias] + return sh.connected(alias) } - return nil + return false } -func (ws *WebsocketHandler) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, rc *Richconn, hint string) error { - if sh := ws.authCaches[region]; sh != nil { - sh.joinTag(tag, tid, rc, hint) - } - return nil -} - -func (ws *WebsocketHandler) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID) error { - if sh := ws.authCaches[region]; sh != nil { - sh.leaveTag(tag, tid) - } - return nil -} - -func (ws *WebsocketHandler) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error { - if sh := ws.authCaches[region]; sh != nil { - sh.setStateInTag(tag, tid, state, hint) - } - return nil -} - -func (ws *WebsocketHandler) BroadcastRaw(region string, tag primitive.ObjectID, raw []byte) { - if sh := ws.authCaches[region]; sh != nil { - if cs := sh.cloneTag(tag); len(cs) > 0 { - go func(raw []byte) { - for _, c := range cs { - if c != nil { - c.WriteBytes(raw) - } - } - }(raw) - } - } -} - -func (ws *WebsocketHandler) Broadcast(region string, tag primitive.ObjectID, doc bson.M) { - raw, _ := json.Marshal(doc) - ws.BroadcastRaw(region, tag, raw) -} - -var onlineQueryScriptHash string - func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { - ws.RedisSync.Del(context.Background(), ConnStateCacheKey) - - scriptHash, err := ws.RedisSync.ScriptLoad(context.Background(), connStateScript).Result() - if err != nil { - return err - } - onlineQueryScriptHash = scriptHash for region, sh := range ws.authCaches { if region == "default" { region = "" } - sh.url = common.MakeHttpHandlerPattern(prefix, region, "ws") - sh.redisSync = ws.RedisSync + url := gocommon.MakeHttpHandlerPattern(prefix, region, "ws") if *noSessionFlag { - serveMux.HandleFunc(sh.url, sh.upgrade_nosession) + serveMux.HandleFunc(url, sh.upgrade_nosession) } else { - serveMux.HandleFunc(sh.url, sh.upgrade) + serveMux.HandleFunc(url, sh.upgrade) } + + go sh.mainLoop(ctx) } return nil } -func (sh *subhandler) cloneTag(tag primitive.ObjectID) (out []*Richconn) { - sh.Lock() - cs := sh.tags[tag] - sh.Unlock() +func (sh *subhandler) setConnected(alias string, connected bool) { + sh.connsLock.Lock() + defer sh.connsLock.Unlock() - if cs == nil { - return nil - } - - cs.Lock() - defer cs.Unlock() - - out = make([]*Richconn, 0, len(cs.tagconnsmap)) - for _, c := range cs.tagconnsmap { - out = append(out, c.rc) - } - return -} - -func (sh *subhandler) joinTag(tag primitive.ObjectID, tid primitive.ObjectID, rc *Richconn, hint string) { - sh.Lock() - cs := sh.tags[tag] - if cs == nil { - cs = &tagconns{ - tagconnsmap: make(map[primitive.ObjectID]*tagconn), - } - } - sh.Unlock() - - cs.Lock() - states := make([]bson.M, 0, len(cs.tagconnsmap)) - for tid, conn := range cs.tagconnsmap { - states = append(states, bson.M{ - "_id": tid, - "_hint": hint, - "state": conn.state, - }) - } - - cs.tagconnsmap[tid] = &tagconn{rc: rc} - cs.Unlock() - - sh.Lock() - sh.tags[tag] = cs - sh.Unlock() - - if len(states) > 0 { - s, _ := json.Marshal(states) - rc.WriteBytes(s) - } -} - -func (sh *subhandler) leaveTag(tag primitive.ObjectID, tid primitive.ObjectID) { - sh.Lock() - defer sh.Unlock() - - cs := sh.tags[tag] - if cs == nil { - return - } - - delete(cs.tagconnsmap, tid) - if len(cs.tagconnsmap) == 0 { - delete(sh.tags, tag) + if connected { + sh.connectedAlias[alias] = true } else { - sh.tags[tag] = cs + delete(sh.connectedAlias, alias) } } -func (sh *subhandler) setStateInTag(tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) { - sh.Lock() - cs := sh.tags[tag] - sh.Unlock() +func (sh *subhandler) connected(alias string) bool { + sh.connsLock.Lock() + defer sh.connsLock.Unlock() - if cs == nil { - return - } + _, ok := sh.connectedAlias[alias] + return ok +} - cs.Lock() - defer cs.Unlock() - - if tagconn := cs.tagconnsmap[tid]; tagconn != nil { - tagconn.state = state - - var clone []*Richconn - for _, c := range cs.tagconnsmap { - clone = append(clone, c.rc) +func (sh *subhandler) mainLoop(ctx context.Context) { + defer func() { + s := recover() + if s != nil { + logger.Error(s) } - raw, _ := json.Marshal(map[string]any{ - "_id": tid, - "_hint": hint, - "state": state, - }) - go func(raw []byte) { - for _, c := range clone { - c.WriteBytes(raw) - } - }(raw) - } -} + }() -func (wsh *WebsocketHandler) GetState(alias primitive.ObjectID) (string, error) { - state, err := wsh.RedisSync.EvalSha(context.Background(), onlineQueryScriptHash, []string{ - connStateCachePrefix + "*", alias.Hex(), - }).Result() - - if err != nil { - return "", err - } - - return state.(string), nil -} - -func (wsh *WebsocketHandler) IsOnline(alias primitive.ObjectID) (bool, error) { - state, err := wsh.GetState(alias) - if err != nil { - logger.Error("IsOnline failed. err :", err) - return false, err - } - return len(state) > 0, nil -} - -func (sh *subhandler) closeConn(accid primitive.ObjectID) { - sh.Lock() - defer sh.Unlock() - - if alias, ok := sh.aliases[accid]; ok { - if old := sh.conns[alias]; old != nil { - old.Close() - } - } -} - -func (sh *subhandler) addConn(conn *Richconn, accid primitive.ObjectID) { - sh.Lock() - defer sh.Unlock() - - sh.conns[conn.alias] = conn - sh.aliases[accid] = conn.alias -} - -func upgrade_core(sh *subhandler, conn *websocket.Conn, initState string, accid primitive.ObjectID, alias primitive.ObjectID) { - sh.closeConn(accid) - - newconn := sh.makeRichConn(alias, conn) - sh.addConn(newconn, accid) - sh.redisSync.HSet(context.Background(), ConnStateCacheKey, alias.Hex(), initState).Result() - - go func(c *Richconn, accid primitive.ObjectID, deliveryChan chan<- DeliveryMessage) { + go func() { + var pubsub *redis.PubSub for { - mt, p, err := c.ReadMessage() + if pubsub == nil { + pubsub = sh.redisSync.Subscribe(ctx, sh.redisMsgChanName, sh.redisCmdChanName) + } + + raw, err := pubsub.ReceiveMessage(ctx) + if err == nil { + if raw.Channel == sh.redisMsgChanName { + var msg UpstreamMessage + if err := json.Unmarshal([]byte(raw.Payload), &msg); err == nil { + sh.deliveryChan <- &msg + } else { + logger.Println("decode UpstreamMessage failed :", err) + } + } else if raw.Channel == sh.redisCmdChanName { + var cmd CommandMessage + if err := json.Unmarshal([]byte(raw.Payload), &cmd); err == nil { + sh.deliveryChan <- &cmd + } else { + logger.Println("decode UpstreamMessage failed :", err) + } + } + } else { + logger.Println("pubsub.ReceiveMessage failed :", err) + pubsub.Close() + pubsub = nil + + if ctx.Err() != nil { + break + } + } + } + }() + + entireConns := make(map[string]*Richconn) + rooms := make(map[string]*room) + findRoom := func(name string, create bool) *room { + room := rooms[name] + if room == nil && create { + room = makeRoom(name) + rooms[name] = room + room.start(ctx) + } + return room + } + + for { + select { + case usermsg := <-sh.deliveryChan: + switch usermsg := usermsg.(type) { + case *UpstreamMessage: + target := usermsg.Target + if target[0] == '#' { + // 룸에 브로드캐스팅 + roomName := target[1:] + if room := findRoom(roomName, false); room != nil { + room.broadcast(usermsg) + } + } else if target[0] == '@' { + // TODO : 특정 유저에게만 + } + + case *CommandMessage: + if usermsg.Cmd == CommandType_JoinChannel && len(usermsg.Args) == 2 { + alias := usermsg.Args[0] + roomName := usermsg.Args[1] + + conn := entireConns[alias] + if conn != nil { + findRoom(roomName, true).in(conn) + } + } else if usermsg.Cmd == CommandType_JoinChannel && len(usermsg.Args) == 2 { + alias := usermsg.Args[0] + roomName := usermsg.Args[1] + + conn := entireConns[alias] + if conn != nil { + if room := findRoom(roomName, false); room != nil { + room.out(conn) + } + } + } + + default: + logger.Println("usermsg is unknown type") + } + + case c := <-sh.connInOutChan: + if c.Conn == nil { + delete(entireConns, c.alias) + sh.setConnected(c.alias, false) + for _, room := range rooms { + room.out(c) + } + c.Closed() + } else { + sh.setConnected(c.alias, true) + entireConns[c.alias] = c + } + } + } +} + +func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID, alias string) { + newconn := sh.makeRichConn(alias, conn) + sh.connInOutChan <- newconn + + go func(c *Richconn, accid primitive.ObjectID, deliveryChan chan<- any) { + for { + messageType, r, err := c.NextReader() + + // 웹소켓에서 직접 메시지를 받지 않는다. + if r != nil { + io.Copy(io.Discard, r) + } if err != nil { c.Close() break } - switch mt { - case websocket.BinaryMessage: - msg := DeliveryMessage{ - Alias: c.alias, - Body: p, - Conn: c, - } - deliveryChan <- msg - - case websocket.TextMessage: - msg := string(p) - opcodes := strings.Split(msg, ";") - for _, opcode := range opcodes { - if strings.HasPrefix(opcode, "ps:") { - sh.redisSync.HSet(context.Background(), ConnStateCacheKey, alias.Hex(), opcode[3:]).Result() - } else if strings.HasPrefix(opcode, "cmd:") { - cmd := opcode[4:] - msg := DeliveryMessage{ - Alias: c.alias, - Command: cmd, - Conn: c, - } - deliveryChan <- msg - } - } - + if messageType == websocket.CloseMessage { + break } } - sh.redisSync.HDel(context.Background(), ConnStateCacheKey, c.alias.Hex()).Result() - - sh.Lock() - delete(sh.conns, c.alias) - delete(sh.aliases, accid) - sh.Unlock() - - var funcs []func() - c.lock.Lock() - for _, f := range c.onClose { - funcs = append(funcs, f) - } - c.onClose = nil - c.lock.Unlock() - - for _, f := range funcs { - f() - } + c.Conn = nil + sh.connInOutChan <- c }(newconn, accid, sh.deliveryChan) } @@ -551,17 +380,14 @@ func (sh *subhandler) upgrade_nosession(w http.ResponseWriter, r *http.Request) return } - alias := accid + var alias string if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 { - alias = common.ParseObjectID(v) + alias = v + } else { + alias = accid.Hex() } - initState := r.Header.Get("As-X-Tavern-InitialState") - if len(initState) == 0 { - initState = "online" - } - - upgrade_core(sh, conn, initState, accid, alias) + upgrade_core(sh, conn, accid, alias) } func (sh *subhandler) upgrade(w http.ResponseWriter, r *http.Request) { @@ -597,20 +423,17 @@ func (sh *subhandler) upgrade(w http.ResponseWriter, r *http.Request) { return } - alias := accid + var alias string if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 { - alias = common.ParseObjectID(v) + alias = v + } else { + alias = accid.Hex() } - initState := r.Header.Get("As-X-Tavern-InitialState") - if len(initState) == 0 { - initState = "online" - } - - upgrade_core(sh, conn, initState, accid, alias) + upgrade_core(sh, conn, accid, alias) } -func (sh *subhandler) makeRichConn(alias primitive.ObjectID, conn *websocket.Conn) *Richconn { +func (sh *subhandler) makeRichConn(alias string, conn *websocket.Conn) *Richconn { rc := Richconn{ Conn: conn, alias: alias, From 822681bf74cf877562ab40eeb641f68745b0b28d Mon Sep 17 00:00:00 2001 From: mountain Date: Wed, 5 Jul 2023 22:26:57 +0900 Subject: [PATCH 02/26] =?UTF-8?q?close=20func=EB=A5=BC=20=EB=B0=96?= =?UTF-8?q?=EC=9C=BC=EB=A1=9C=20=EC=98=AE=EA=B9=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/room.go | 16 +++--- wshandler/wshandler.go | 122 +++++++++++++++-------------------------- 2 files changed, 53 insertions(+), 85 deletions(-) diff --git a/wshandler/room.go b/wshandler/room.go index a443e32..f74fb45 100644 --- a/wshandler/room.go +++ b/wshandler/room.go @@ -8,16 +8,16 @@ import ( ) type room struct { - inChan chan *Richconn - outChan chan *Richconn + inChan chan *wsconn + outChan chan *wsconn messageChan chan *UpstreamMessage name string } func makeRoom(name string) *room { return &room{ - inChan: make(chan *Richconn, 10), - outChan: make(chan *Richconn, 10), + inChan: make(chan *wsconn, 10), + outChan: make(chan *wsconn, 10), messageChan: make(chan *UpstreamMessage, 100), name: name, } @@ -27,17 +27,17 @@ func (r *room) broadcast(msg *UpstreamMessage) { r.messageChan <- msg } -func (r *room) in(conn *Richconn) { +func (r *room) in(conn *wsconn) { r.inChan <- conn } -func (r *room) out(conn *Richconn) { +func (r *room) out(conn *wsconn) { r.outChan <- conn } func (r *room) start(ctx context.Context) { go func(ctx context.Context) { - conns := make(map[string]*Richconn) + conns := make(map[string]*wsconn) normal := false for !normal { normal = r.loop(ctx, &conns) @@ -45,7 +45,7 @@ func (r *room) start(ctx context.Context) { }(ctx) } -func (r *room) loop(ctx context.Context, conns *map[string]*Richconn) (normalEnd bool) { +func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd bool) { defer func() { s := recover() if s != nil { diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 977ac58..6836b77 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -21,11 +21,10 @@ import ( var noSessionFlag = flagx.Bool("nosession", false, "nosession=[true|false]") -type Richconn struct { +type wsconn struct { *websocket.Conn - closeFuncLock sync.Mutex - alias string - onClose map[string]func() + alias string + accid primitive.ObjectID } type UpstreamMessage struct { @@ -43,8 +42,8 @@ type DownstreamMessage struct { type CommandType string const ( - CommandType_JoinChannel = CommandType("join_channel") - CommandType_LeaveChannel = CommandType("leave_channel") + CommandType_JoinRoom = CommandType("join_room") + CommandType_LeaveRoom = CommandType("leave_room") ) type CommandMessage struct { @@ -52,53 +51,6 @@ type CommandMessage struct { Args []string } -func (rc *Richconn) RegistOnCloseFunc(name string, f func()) { - rc.closeFuncLock.Lock() - defer rc.closeFuncLock.Unlock() - - if rc.onClose == nil { - f() - return - } - rc.onClose[name] = f -} - -func (rc *Richconn) HasOnCloseFunc(name string) bool { - rc.closeFuncLock.Lock() - defer rc.closeFuncLock.Unlock() - - if rc.onClose == nil { - return false - } - - _, ok := rc.onClose[name] - return ok -} - -func (rc *Richconn) UnregistOnCloseFunc(name string) (out func()) { - rc.closeFuncLock.Lock() - defer rc.closeFuncLock.Unlock() - - if rc.onClose == nil { - return - } - out = rc.onClose[name] - delete(rc.onClose, name) - return -} - -func (rc *Richconn) Closed() { - rc.closeFuncLock.Lock() - defer rc.closeFuncLock.Unlock() - for _, f := range rc.onClose { - f() - } -} - -func (rc *Richconn) WriteBytes(data []byte) error { - return rc.WriteMessage(websocket.TextMessage, data) -} - type subhandler struct { authCache *gocommon.AuthCollection redisMsgChanName string @@ -106,8 +58,10 @@ type subhandler struct { redisSync *redis.Client connsLock sync.Mutex connectedAlias map[string]bool - connInOutChan chan *Richconn + connInOutChan chan *wsconn deliveryChan chan any + + callReceiver func(primitive.ObjectID, string, io.Reader) } // WebsocketHandler : @@ -120,7 +74,7 @@ type wsConfig struct { SyncPipeline string `json:"ws_sync_pipeline"` } -func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *WebsocketHandler) { +func NewWebsocketHandler[T any](authglobal gocommon.AuthCollectionGlobal, receiver func(primitive.ObjectID, string, *T)) (wsh *WebsocketHandler) { var config wsConfig gocommon.LoadConfig(&config) @@ -129,6 +83,21 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock panic(err) } + decoder := func(r io.Reader) *T { + if r == nil { + // 접속이 끊겼을 때. + return nil + } + var m T + dec := json.NewDecoder(r) + if err := dec.Decode(&m); err != nil { + logger.Println(err) + } + + // decoding 실패하더라도 빈 *T를 내보냄 + return &m + } + authCaches := make(map[string]*subhandler) for _, region := range authglobal.Regions() { sh := &subhandler{ @@ -137,8 +106,11 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), redisSync: redisSync, connectedAlias: make(map[string]bool), - connInOutChan: make(chan *Richconn), + connInOutChan: make(chan *wsconn), deliveryChan: make(chan any, 1000), + callReceiver: func(accid primitive.ObjectID, alias string, r io.Reader) { + receiver(accid, alias, decoder(r)) + }, } authCaches[region] = sh @@ -205,6 +177,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } }() + // redis channel에서 유저가 보낸 메시지를 읽는 go rountine go func() { var pubsub *redis.PubSub for { @@ -241,7 +214,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } }() - entireConns := make(map[string]*Richconn) + entireConns := make(map[string]*wsconn) rooms := make(map[string]*room) findRoom := func(name string, create bool) *room { room := rooms[name] @@ -253,6 +226,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { return room } + // 유저에게서 온 메세지, 소켓 연결/해체 처리 for { select { case usermsg := <-sh.deliveryChan: @@ -270,7 +244,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } case *CommandMessage: - if usermsg.Cmd == CommandType_JoinChannel && len(usermsg.Args) == 2 { + if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { alias := usermsg.Args[0] roomName := usermsg.Args[1] @@ -278,7 +252,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { if conn != nil { findRoom(roomName, true).in(conn) } - } else if usermsg.Cmd == CommandType_JoinChannel && len(usermsg.Args) == 2 { + } else if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { alias := usermsg.Args[0] roomName := usermsg.Args[1] @@ -301,7 +275,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { for _, room := range rooms { room.out(c) } - c.Closed() + sh.callReceiver(c.accid, c.alias, nil) } else { sh.setConnected(c.alias, true) entireConns[c.alias] = c @@ -311,18 +285,16 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID, alias string) { - newconn := sh.makeRichConn(alias, conn) + newconn := &wsconn{ + Conn: conn, + alias: alias, + accid: accid, + } sh.connInOutChan <- newconn - go func(c *Richconn, accid primitive.ObjectID, deliveryChan chan<- any) { + go func(c *wsconn, accid primitive.ObjectID, deliveryChan chan<- any) { for { messageType, r, err := c.NextReader() - - // 웹소켓에서 직접 메시지를 받지 않는다. - if r != nil { - io.Copy(io.Discard, r) - } - if err != nil { c.Close() break @@ -331,6 +303,11 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID if messageType == websocket.CloseMessage { break } + + if messageType == websocket.TextMessage { + // 유저가 직접 보낸 메시지 + sh.callReceiver(accid, c.alias, r) + } } c.Conn = nil @@ -432,12 +409,3 @@ func (sh *subhandler) upgrade(w http.ResponseWriter, r *http.Request) { upgrade_core(sh, conn, accid, alias) } - -func (sh *subhandler) makeRichConn(alias string, conn *websocket.Conn) *Richconn { - rc := Richconn{ - Conn: conn, - alias: alias, - onClose: make(map[string]func()), - } - return &rc -} From 7ef3e68d161e26cf2ec142c8b039f18afb081683 Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 6 Jul 2023 00:01:45 +0900 Subject: [PATCH 03/26] =?UTF-8?q?GetState=20=ED=95=A8=EC=88=98=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 38 +++++++++----------------------------- 1 file changed, 9 insertions(+), 29 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 6836b77..ae73ddf 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -8,7 +8,6 @@ import ( "io" "net/http" "strings" - "sync" "go.mongodb.org/mongo-driver/bson/primitive" "repositories.action2quare.com/ayo/gocommon" @@ -52,12 +51,11 @@ type CommandMessage struct { } type subhandler struct { + name string authCache *gocommon.AuthCollection redisMsgChanName string redisCmdChanName string redisSync *redis.Client - connsLock sync.Mutex - connectedAlias map[string]bool connInOutChan chan *wsconn deliveryChan chan any @@ -101,11 +99,11 @@ func NewWebsocketHandler[T any](authglobal gocommon.AuthCollectionGlobal, receiv authCaches := make(map[string]*subhandler) for _, region := range authglobal.Regions() { sh := &subhandler{ + name: region, authCache: authglobal.Get(region), redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), redisSync: redisSync, - connectedAlias: make(map[string]bool), connInOutChan: make(chan *wsconn), deliveryChan: make(chan any, 1000), callReceiver: func(accid primitive.ObjectID, alias string, r io.Reader) { @@ -125,13 +123,6 @@ func NewWebsocketHandler[T any](authglobal gocommon.AuthCollectionGlobal, receiv func (ws *WebsocketHandler) Destructor() { } -func (ws *WebsocketHandler) IsConnected(region string, alias string) bool { - if sh := ws.authCaches[region]; sh != nil { - return sh.connected(alias) - } - return false -} - func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { for region, sh := range ws.authCaches { if region == "default" { @@ -150,23 +141,12 @@ func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http return nil } -func (sh *subhandler) setConnected(alias string, connected bool) { - sh.connsLock.Lock() - defer sh.connsLock.Unlock() - - if connected { - sh.connectedAlias[alias] = true - } else { - delete(sh.connectedAlias, alias) +func (ws *WebsocketHandler) GetState(region string, accid primitive.ObjectID) string { + state, err := ws.RedisSync.HGet(context.Background(), region, accid.Hex()).Result() + if err == redis.Nil { + return "" } -} - -func (sh *subhandler) connected(alias string) bool { - sh.connsLock.Lock() - defer sh.connsLock.Unlock() - - _, ok := sh.connectedAlias[alias] - return ok + return state } func (sh *subhandler) mainLoop(ctx context.Context) { @@ -271,13 +251,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) { case c := <-sh.connInOutChan: if c.Conn == nil { delete(entireConns, c.alias) - sh.setConnected(c.alias, false) for _, room := range rooms { room.out(c) } sh.callReceiver(c.accid, c.alias, nil) } else { - sh.setConnected(c.alias, true) entireConns[c.alias] = c } } @@ -293,6 +271,7 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID sh.connInOutChan <- newconn go func(c *wsconn, accid primitive.ObjectID, deliveryChan chan<- any) { + sh.redisSync.HSet(context.Background(), sh.name, accid.Hex(), "online") for { messageType, r, err := c.NextReader() if err != nil { @@ -309,6 +288,7 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID sh.callReceiver(accid, c.alias, r) } } + sh.redisSync.HDel(context.Background(), sh.name, accid.Hex()) c.Conn = nil sh.connInOutChan <- c From 2b54d69b9e7055b0a336d7cdb3bbaecf5147bdea Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 6 Jul 2023 12:35:39 +0900 Subject: [PATCH 04/26] =?UTF-8?q?receiver=20=ED=83=80=EC=9E=85=20=EB=B3=80?= =?UTF-8?q?=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 55 ++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index ae73ddf..e8597e3 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -50,6 +50,20 @@ type CommandMessage struct { Args []string } +type WebSocketMessageType int + +const ( + TextMessage = WebSocketMessageType(websocket.TextMessage) + BinaryMessage = WebSocketMessageType(websocket.BinaryMessage) + CloseMessage = WebSocketMessageType(websocket.CloseMessage) + PingMessage = WebSocketMessageType(websocket.PingMessage) + PongMessage = WebSocketMessageType(websocket.PongMessage) + Connected = WebSocketMessageType(100) + Disconnected = WebSocketMessageType(101) +) + +type WebSocketMessageReceiver func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) + type subhandler struct { name string authCache *gocommon.AuthCollection @@ -58,8 +72,7 @@ type subhandler struct { redisSync *redis.Client connInOutChan chan *wsconn deliveryChan chan any - - callReceiver func(primitive.ObjectID, string, io.Reader) + callReceiver WebSocketMessageReceiver } // WebsocketHandler : @@ -72,7 +85,7 @@ type wsConfig struct { SyncPipeline string `json:"ws_sync_pipeline"` } -func NewWebsocketHandler[T any](authglobal gocommon.AuthCollectionGlobal, receiver func(primitive.ObjectID, string, *T)) (wsh *WebsocketHandler) { +func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal, receiver WebSocketMessageReceiver) (wsh *WebsocketHandler) { var config wsConfig gocommon.LoadConfig(&config) @@ -81,20 +94,20 @@ func NewWebsocketHandler[T any](authglobal gocommon.AuthCollectionGlobal, receiv panic(err) } - decoder := func(r io.Reader) *T { - if r == nil { - // 접속이 끊겼을 때. - return nil - } - var m T - dec := json.NewDecoder(r) - if err := dec.Decode(&m); err != nil { - logger.Println(err) - } + // decoder := func(r io.Reader) *T { + // if r == nil { + // // 접속이 끊겼을 때. + // return nil + // } + // var m T + // dec := json.NewDecoder(r) + // if err := dec.Decode(&m); err != nil { + // logger.Println(err) + // } - // decoding 실패하더라도 빈 *T를 내보냄 - return &m - } + // // decoding 실패하더라도 빈 *T를 내보냄 + // return &m + // } authCaches := make(map[string]*subhandler) for _, region := range authglobal.Regions() { @@ -106,9 +119,7 @@ func NewWebsocketHandler[T any](authglobal gocommon.AuthCollectionGlobal, receiv redisSync: redisSync, connInOutChan: make(chan *wsconn), deliveryChan: make(chan any, 1000), - callReceiver: func(accid primitive.ObjectID, alias string, r io.Reader) { - receiver(accid, alias, decoder(r)) - }, + callReceiver: receiver, } authCaches[region] = sh @@ -254,7 +265,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { for _, room := range rooms { room.out(c) } - sh.callReceiver(c.accid, c.alias, nil) + sh.callReceiver(c.accid, c.alias, Connected, nil) } else { entireConns[c.alias] = c } @@ -285,7 +296,9 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID if messageType == websocket.TextMessage { // 유저가 직접 보낸 메시지 - sh.callReceiver(accid, c.alias, r) + sh.callReceiver(accid, c.alias, TextMessage, r) + } else if messageType == websocket.BinaryMessage { + sh.callReceiver(accid, c.alias, BinaryMessage, r) } } sh.redisSync.HDel(context.Background(), sh.name, accid.Hex()) From 8c94fc6e292211680239908072b012a8fa4a1343 Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 6 Jul 2023 13:07:57 +0900 Subject: [PATCH 05/26] =?UTF-8?q?=EC=86=8C=EC=BC=93=20=EC=A2=85=EB=A3=8C?= =?UTF-8?q?=20=EC=B2=98=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index e8597e3..381c5b4 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "strings" + "sync" "go.mongodb.org/mongo-driver/bson/primitive" "repositories.action2quare.com/ayo/gocommon" @@ -65,7 +66,6 @@ const ( type WebSocketMessageReceiver func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) type subhandler struct { - name string authCache *gocommon.AuthCollection redisMsgChanName string redisCmdChanName string @@ -73,6 +73,7 @@ type subhandler struct { connInOutChan chan *wsconn deliveryChan chan any callReceiver WebSocketMessageReceiver + connWaitGroup sync.WaitGroup } // WebsocketHandler : @@ -112,7 +113,6 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal, receiver WebS authCaches := make(map[string]*subhandler) for _, region := range authglobal.Regions() { sh := &subhandler{ - name: region, authCache: authglobal.Get(region), redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), @@ -131,7 +131,10 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal, receiver WebS } } -func (ws *WebsocketHandler) Destructor() { +func (ws *WebsocketHandler) Cleanup() { + for _, sh := range ws.authCaches { + sh.connWaitGroup.Wait() + } } func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { @@ -153,13 +156,17 @@ func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http } func (ws *WebsocketHandler) GetState(region string, accid primitive.ObjectID) string { - state, err := ws.RedisSync.HGet(context.Background(), region, accid.Hex()).Result() + state, err := ws.RedisSync.Get(context.Background(), accid.Hex()).Result() if err == redis.Nil { return "" } return state } +func (ws *WebsocketHandler) SetState(region string, accid primitive.ObjectID, state string) { + ws.RedisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result() +} + func (sh *subhandler) mainLoop(ctx context.Context) { defer func() { s := recover() @@ -281,8 +288,9 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID } sh.connInOutChan <- newconn + sh.connWaitGroup.Add(1) go func(c *wsconn, accid primitive.ObjectID, deliveryChan chan<- any) { - sh.redisSync.HSet(context.Background(), sh.name, accid.Hex(), "online") + sh.redisSync.Set(context.Background(), accid.Hex(), "online", 0) for { messageType, r, err := c.NextReader() if err != nil { @@ -301,7 +309,8 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID sh.callReceiver(accid, c.alias, BinaryMessage, r) } } - sh.redisSync.HDel(context.Background(), sh.name, accid.Hex()) + sh.redisSync.Del(context.Background(), accid.Hex()) + sh.connWaitGroup.Done() c.Conn = nil sh.connInOutChan <- c From 4bdd72152e6e0395c155eb92e645476dfb5e0aab Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 6 Jul 2023 14:18:11 +0900 Subject: [PATCH 06/26] =?UTF-8?q?receiver=20=EC=9D=B4=EB=8F=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 381c5b4..144abbf 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -86,7 +86,7 @@ type wsConfig struct { SyncPipeline string `json:"ws_sync_pipeline"` } -func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal, receiver WebSocketMessageReceiver) (wsh *WebsocketHandler) { +func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *WebsocketHandler) { var config wsConfig gocommon.LoadConfig(&config) @@ -119,7 +119,6 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal, receiver WebS redisSync: redisSync, connInOutChan: make(chan *wsconn), deliveryChan: make(chan any, 1000), - callReceiver: receiver, } authCaches[region] = sh @@ -137,7 +136,7 @@ func (ws *WebsocketHandler) Cleanup() { } } -func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { +func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string, receiver WebSocketMessageReceiver) error { for region, sh := range ws.authCaches { if region == "default" { region = "" @@ -148,6 +147,7 @@ func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http } else { serveMux.HandleFunc(url, sh.upgrade) } + sh.callReceiver = receiver go sh.mainLoop(ctx) } From 5dc9d4dca4c39394f7dc4953ed307db90c4e513a Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 6 Jul 2023 14:29:53 +0900 Subject: [PATCH 07/26] =?UTF-8?q?=EC=86=8C=EC=BC=93=20=EB=A9=94=EC=84=B8?= =?UTF-8?q?=EC=A7=80=20=EC=88=98=EC=8B=A0=20=ED=95=A8=EC=88=98=20=EB=93=B1?= =?UTF-8?q?=EB=A1=9D=20=ED=83=80=EC=9D=B4=EB=B0=8D=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 144abbf..60eb526 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -78,8 +78,9 @@ type subhandler struct { // WebsocketHandler : type WebsocketHandler struct { - authCaches map[string]*subhandler - RedisSync *redis.Client + authCaches map[string]*subhandler + RedisSync *redis.Client + ClientMessageReceiver WebSocketMessageReceiver } type wsConfig struct { @@ -125,8 +126,9 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock } return &WebsocketHandler{ - authCaches: authCaches, - RedisSync: redisSync, + authCaches: authCaches, + RedisSync: redisSync, + ClientMessageReceiver: func(primitive.ObjectID, string, WebSocketMessageType, io.Reader) {}, } } @@ -136,7 +138,7 @@ func (ws *WebsocketHandler) Cleanup() { } } -func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string, receiver WebSocketMessageReceiver) error { +func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { for region, sh := range ws.authCaches { if region == "default" { region = "" @@ -147,8 +149,7 @@ func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http } else { serveMux.HandleFunc(url, sh.upgrade) } - sh.callReceiver = receiver - + sh.callReceiver = ws.ClientMessageReceiver go sh.mainLoop(ctx) } From 20803c67edfc53e74414357157722f4e3adefd79 Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 6 Jul 2023 16:47:36 +0900 Subject: [PATCH 08/26] =?UTF-8?q?SendUpstreamMessage=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 60eb526..e606797 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -168,6 +169,23 @@ func (ws *WebsocketHandler) SetState(region string, accid primitive.ObjectID, st ws.RedisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result() } +var errNoRegion = errors.New("region is not valid") + +func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMessage) error { + sh := ws.authCaches[region] + if sh == nil { + return errNoRegion + } + + bt, err := json.Marshal(msg) + if err != nil { + return err + } + + _, err = sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, bt).Result() + return err +} + func (sh *subhandler) mainLoop(ctx context.Context) { defer func() { s := recover() @@ -240,6 +258,10 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } } else if target[0] == '@' { // TODO : 특정 유저에게만 + conn := entireConns[target[1:]] + if conn != nil { + conn.WriteMessage(websocket.TextMessage, usermsg.Body) + } } case *CommandMessage: From ba746d03fab656a00920698afe44536cbc3fa1b2 Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 6 Jul 2023 17:30:57 +0900 Subject: [PATCH 09/26] =?UTF-8?q?local=20channel=EB=A1=9C=20=EB=A8=BC?= =?UTF-8?q?=EC=A0=80=20message=EB=A5=BC=20=EB=B3=B4=EB=82=B4=EB=B4=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 97 +++++++++++++++++++++++++++++------------- 1 file changed, 67 insertions(+), 30 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index e606797..b62193b 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -67,14 +66,15 @@ const ( type WebSocketMessageReceiver func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) type subhandler struct { - authCache *gocommon.AuthCollection - redisMsgChanName string - redisCmdChanName string - redisSync *redis.Client - connInOutChan chan *wsconn - deliveryChan chan any - callReceiver WebSocketMessageReceiver - connWaitGroup sync.WaitGroup + authCache *gocommon.AuthCollection + redisMsgChanName string + redisCmdChanName string + redisSync *redis.Client + connInOutChan chan *wsconn + deliveryChan chan any + localDeliveryChan chan any + callReceiver WebSocketMessageReceiver + connWaitGroup sync.WaitGroup } // WebsocketHandler : @@ -115,12 +115,13 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock authCaches := make(map[string]*subhandler) for _, region := range authglobal.Regions() { sh := &subhandler{ - authCache: authglobal.Get(region), - redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), - redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), - redisSync: redisSync, - connInOutChan: make(chan *wsconn), - deliveryChan: make(chan any, 1000), + authCache: authglobal.Get(region), + redisMsgChanName: fmt.Sprintf("_wsh_msg_%s", region), + redisCmdChanName: fmt.Sprintf("_wsh_cmd_%s", region), + redisSync: redisSync, + connInOutChan: make(chan *wsconn), + deliveryChan: make(chan any, 1000), + localDeliveryChan: make(chan any, 100), } authCaches[region] = sh @@ -169,21 +170,11 @@ func (ws *WebsocketHandler) SetState(region string, accid primitive.ObjectID, st ws.RedisSync.SetArgs(context.Background(), accid.Hex(), state, redis.SetArgs{Mode: "XX"}).Result() } -var errNoRegion = errors.New("region is not valid") - -func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMessage) error { +func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMessage) { sh := ws.authCaches[region] - if sh == nil { - return errNoRegion + if sh != nil { + sh.localDeliveryChan <- msg } - - bt, err := json.Marshal(msg) - if err != nil { - return err - } - - _, err = sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, bt).Result() - return err } func (sh *subhandler) mainLoop(ctx context.Context) { @@ -246,6 +237,53 @@ func (sh *subhandler) mainLoop(ctx context.Context) { // 유저에게서 온 메세지, 소켓 연결/해체 처리 for { select { + case usermsg := <-sh.localDeliveryChan: + // 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널 + // 없으면 publish한다. + switch usermsg := usermsg.(type) { + case *UpstreamMessage: + target := usermsg.Target + if target[0] == '@' { + conn := entireConns[target[1:]] + if conn != nil { + // 이 경우 아니면 publish 해야 함 + conn.WriteMessage(websocket.TextMessage, usermsg.Body) + break + } + } + if bt, err := json.Marshal(usermsg); err == nil { + sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, bt).Result() + } + + case *CommandMessage: + if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { + alias := usermsg.Args[0] + roomName := usermsg.Args[1] + + conn := entireConns[alias] + if conn != nil { + findRoom(roomName, true).in(conn) + break + } + } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { + alias := usermsg.Args[0] + roomName := usermsg.Args[1] + + conn := entireConns[alias] + if conn != nil { + if room := findRoom(roomName, false); room != nil { + room.out(conn) + break + } + } + } + + // 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다 + if bt, err := json.Marshal(usermsg); err == nil { + sh.redisSync.Publish(context.Background(), sh.redisCmdChanName, bt).Result() + } + } + case usermsg := <-sh.deliveryChan: switch usermsg := usermsg.(type) { case *UpstreamMessage: @@ -257,7 +295,6 @@ func (sh *subhandler) mainLoop(ctx context.Context) { room.broadcast(usermsg) } } else if target[0] == '@' { - // TODO : 특정 유저에게만 conn := entireConns[target[1:]] if conn != nil { conn.WriteMessage(websocket.TextMessage, usermsg.Body) @@ -273,7 +310,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { if conn != nil { findRoom(roomName, true).in(conn) } - } else if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { + } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { alias := usermsg.Args[0] roomName := usermsg.Args[1] From b85d271efe6849e48e0f9229086351324b087c4c Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 6 Jul 2023 20:37:32 +0900 Subject: [PATCH 10/26] =?UTF-8?q?SendCloseMessage=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 44 +++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index b62193b..cd270a5 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -9,6 +9,7 @@ import ( "net/http" "strings" "sync" + "time" "go.mongodb.org/mongo-driver/bson/primitive" "repositories.action2quare.com/ayo/gocommon" @@ -42,13 +43,14 @@ type DownstreamMessage struct { type CommandType string const ( - CommandType_JoinRoom = CommandType("join_room") - CommandType_LeaveRoom = CommandType("leave_room") + CommandType_JoinRoom = CommandType("join_room") + CommandType_LeaveRoom = CommandType("leave_room") + CommandType_WriteControl = CommandType("write_control") ) type CommandMessage struct { Cmd CommandType - Args []string + Args []any } type WebSocketMessageType int @@ -177,6 +179,20 @@ func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMess } } +func (ws *WebsocketHandler) SendCloseMessage(region string, target string, text string) { + sh := ws.authCaches[region] + if sh != nil { + sh.localDeliveryChan <- &CommandMessage{ + Cmd: CommandType_WriteControl, + Args: []any{ + target, + int(websocket.CloseMessage), + websocket.FormatCloseMessage(websocket.CloseNormalClosure, text), + }, + } + } +} + func (sh *subhandler) mainLoop(ctx context.Context) { defer func() { s := recover() @@ -257,8 +273,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) { case *CommandMessage: if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { - alias := usermsg.Args[0] - roomName := usermsg.Args[1] + alias := usermsg.Args[0].(string) + roomName := usermsg.Args[1].(string) conn := entireConns[alias] if conn != nil { @@ -266,8 +282,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) { break } } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { - alias := usermsg.Args[0] - roomName := usermsg.Args[1] + alias := usermsg.Args[0].(string) + roomName := usermsg.Args[1].(string) conn := entireConns[alias] if conn != nil { @@ -276,6 +292,12 @@ func (sh *subhandler) mainLoop(ctx context.Context) { break } } + } else if usermsg.Cmd == CommandType_WriteControl && len(usermsg.Args) == 2 { + alias := usermsg.Args[0].(string) + conn := entireConns[alias] + if conn != nil { + conn.WriteControl(usermsg.Args[1].(int), usermsg.Args[2].([]byte), time.Time{}) + } } // 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다 @@ -303,16 +325,16 @@ func (sh *subhandler) mainLoop(ctx context.Context) { case *CommandMessage: if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { - alias := usermsg.Args[0] - roomName := usermsg.Args[1] + alias := usermsg.Args[0].(string) + roomName := usermsg.Args[1].(string) conn := entireConns[alias] if conn != nil { findRoom(roomName, true).in(conn) } } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { - alias := usermsg.Args[0] - roomName := usermsg.Args[1] + alias := usermsg.Args[0].(string) + roomName := usermsg.Args[1].(string) conn := entireConns[alias] if conn != nil { From 550374ef6f91e32c98d2069ba1e01898bd0d53f8 Mon Sep 17 00:00:00 2001 From: mountain Date: Mon, 10 Jul 2023 09:42:34 +0900 Subject: [PATCH 11/26] =?UTF-8?q?rpc=20=ED=8C=A8=ED=82=A4=EC=A7=80=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rpc.go | 190 +++++++++++++++++++++++++++++++++++++++++ wshandler/wshandler.go | 1 + 2 files changed, 191 insertions(+) create mode 100644 rpc/rpc.go diff --git a/rpc/rpc.go b/rpc/rpc.go new file mode 100644 index 0000000..69e3515 --- /dev/null +++ b/rpc/rpc.go @@ -0,0 +1,190 @@ +package rpc + +import ( + "bytes" + "context" + "encoding/gob" + "errors" + "fmt" + "reflect" + "runtime" + "strings" + "time" + + "github.com/go-redis/redis/v8" + "go.mongodb.org/mongo-driver/bson/primitive" + "repositories.action2quare.com/ayo/gocommon/logger" +) + +type Receiver interface { + TargetExists(primitive.ObjectID) bool +} + +type receiverManifest struct { + r Receiver + methods map[string]reflect.Method +} + +type rpcEngine struct { + receivers map[string]receiverManifest + publish func([]byte) error +} + +var engine = rpcEngine{ + receivers: make(map[string]receiverManifest), +} + +func RegistReceiver(ptr Receiver) { + rname := reflect.TypeOf(ptr).Elem().Name() + rname = fmt.Sprintf("(*%s)", rname) + + methods := make(map[string]reflect.Method) + for i := 0; i < reflect.TypeOf(ptr).NumMethod(); i++ { + method := reflect.TypeOf(ptr).Method(i) + methods[method.Name] = method + } + engine.receivers[rname] = receiverManifest{ + r: ptr, + methods: methods, + } +} + +func Start(ctx context.Context, redisClient *redis.Client) { + if engine.publish != nil { + return + } + + pubsubName := primitive.NewObjectID().Hex()[6:] + engine.publish = func(s []byte) error { + _, err := redisClient.Publish(ctx, pubsubName, s).Result() + return err + } + + go engine.loop(ctx, redisClient, pubsubName) +} + +func (re *rpcEngine) callFromMessage(msg *redis.Message) { + defer func() { + r := recover() + if r != nil { + logger.Error(r) + } + }() + + encoded := []byte(msg.Payload) + var target primitive.ObjectID + copy(target[:], encoded[:12]) + + encoded = encoded[12:] + for i, c := range encoded { + if c == ')' { + if manifest, ok := re.receivers[string(encoded[:i+1])]; ok { + // 리시버 찾음 + if manifest.r.TargetExists(target) { + // 이 리시버가 타겟을 가지고 있음 + encoded = encoded[i+1:] + decoder := gob.NewDecoder(bytes.NewBuffer(encoded)) + var params []any + if decoder.Decode(¶ms) == nil { + method := manifest.methods[params[0].(string)] + args := []reflect.Value{ + reflect.ValueOf(manifest.r), + } + for _, arg := range params[1:] { + args = append(args, reflect.ValueOf(arg)) + } + method.Func.Call(args) + } + } + } + } + } +} + +func (re *rpcEngine) loop(ctx context.Context, redisClient *redis.Client, chanName string) { + defer func() { + r := recover() + if r != nil { + logger.Error(r) + } + }() + + pubsub := redisClient.Subscribe(ctx, chanName) + for { + if ctx.Err() != nil { + return + } + + if pubsub == nil { + pubsub = redisClient.Subscribe(ctx, chanName) + } + + msg, err := pubsub.ReceiveMessage(ctx) + + if err != nil { + if err == redis.ErrClosed { + time.Sleep(time.Second) + } + pubsub = nil + } else { + re.callFromMessage(msg) + } + } +} + +var errNoReceiver = errors.New("no receiver") + +type RpcCallContext struct { + r Receiver + t primitive.ObjectID +} + +var ErrCanExecuteHere = errors.New("go ahead") + +func MakeCallContext(r Receiver) RpcCallContext { return RpcCallContext{r: r} } +func (c *RpcCallContext) Target(t primitive.ObjectID) { c.t = t } +func (c *RpcCallContext) Call(args ...any) error { + if c.r.TargetExists(c.t) { + // 여기 있네? + return ErrCanExecuteHere + } + + pc := make([]uintptr, 1) + n := runtime.Callers(2, pc[:]) + if n < 1 { + return errNoReceiver + } + + frame, _ := runtime.CallersFrames(pc).Next() + prf := strings.Split(frame.Function, ".") + rname := prf[1] + funcname := prf[2] + + serialized, err := encode(c.t, rname, funcname, args...) + if err != nil { + return err + } + + return engine.publish(serialized) +} + +func encode(target primitive.ObjectID, receiver string, funcname string, args ...any) ([]byte, error) { + buff := new(bytes.Buffer) + + // 타겟을 가장 먼저 기록 + buff.Write(target[:]) + + // receiver + buff.Write([]byte(receiver)) + + // 다음 call context 기록 + m := append([]any{funcname}, args...) + encoder := gob.NewEncoder(buff) + err := encoder.Encode(m) + if err != nil { + logger.Error("rpcCallContext.send err :", err) + return nil, err + } + + return buff.Bytes(), nil +} diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index cd270a5..23c9550 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -381,6 +381,7 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID } if messageType == websocket.CloseMessage { + sh.callReceiver(accid, c.alias, CloseMessage, r) break } From e912c8899357d4907b1b825bcdfed752cbe14032 Mon Sep 17 00:00:00 2001 From: mountain Date: Mon, 10 Jul 2023 10:29:16 +0900 Subject: [PATCH 12/26] =?UTF-8?q?rpc.call=20helper=20=ED=95=A8=EC=88=98=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rpc.go | 14 ++++++++++---- rpc/rpc_test.go | 7 +++++++ 2 files changed, 17 insertions(+), 4 deletions(-) create mode 100644 rpc/rpc_test.go diff --git a/rpc/rpc.go b/rpc/rpc.go index 69e3515..304e88c 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -134,16 +134,14 @@ func (re *rpcEngine) loop(ctx context.Context, redisClient *redis.Client, chanNa var errNoReceiver = errors.New("no receiver") -type RpcCallContext struct { +type callContext struct { r Receiver t primitive.ObjectID } var ErrCanExecuteHere = errors.New("go ahead") -func MakeCallContext(r Receiver) RpcCallContext { return RpcCallContext{r: r} } -func (c *RpcCallContext) Target(t primitive.ObjectID) { c.t = t } -func (c *RpcCallContext) Call(args ...any) error { +func (c callContext) call(args ...any) error { if c.r.TargetExists(c.t) { // 여기 있네? return ErrCanExecuteHere @@ -168,6 +166,14 @@ func (c *RpcCallContext) Call(args ...any) error { return engine.publish(serialized) } +func CallOrGo(r Receiver, target primitive.ObjectID, args ...any) error { + cc := callContext{ + r: r, + t: target, + } + return cc.call(args...) +} + func encode(target primitive.ObjectID, receiver string, funcname string, args ...any) ([]byte, error) { buff := new(bytes.Buffer) diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go new file mode 100644 index 0000000..941db5a --- /dev/null +++ b/rpc/rpc_test.go @@ -0,0 +1,7 @@ +package rpc + +import "testing" + +func TestRpc(t *testing.T) { + +} From 98efbc2875803239a31f92c2189cebb27d77dea4 Mon Sep 17 00:00:00 2001 From: mountain Date: Mon, 10 Jul 2023 10:53:49 +0900 Subject: [PATCH 13/26] =?UTF-8?q?callctx=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rpc.go | 10 ++++++---- rpc/rpc_test.go | 37 ++++++++++++++++++++++++++++++++++++- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 304e88c..f239bd1 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -6,6 +6,7 @@ import ( "encoding/gob" "errors" "fmt" + "path" "reflect" "runtime" "strings" @@ -148,15 +149,16 @@ func (c callContext) call(args ...any) error { } pc := make([]uintptr, 1) - n := runtime.Callers(2, pc[:]) + n := runtime.Callers(3, pc[:]) if n < 1 { return errNoReceiver } frame, _ := runtime.CallersFrames(pc).Next() - prf := strings.Split(frame.Function, ".") - rname := prf[1] - funcname := prf[2] + prf := path.Base(frame.Function) + lastdot := strings.LastIndex(prf, ".") + rname := prf[:lastdot] + funcname := prf[lastdot+1:] serialized, err := encode(c.t, rname, funcname, args...) if err != nil { diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 941db5a..dc8f521 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -1,7 +1,42 @@ package rpc -import "testing" +import ( + "context" + "testing" + + "go.mongodb.org/mongo-driver/bson/primitive" + "repositories.action2quare.com/ayo/gocommon" + "repositories.action2quare.com/ayo/gocommon/logger" +) + +type testReceiver struct { +} + +func (tr *testReceiver) TargetExists(tid primitive.ObjectID) bool { + return tid[0] >= 10 +} + +func (tr *testReceiver) TestFunc(a string, b string) { + logger.Println("TestFunc :", a, b) + + target := primitive.NewObjectID() + target[0] = 0 + if CallOrGo(tr, target, a, b) != ErrCanExecuteHere { + return + } + + logger.Println(a, b) +} func TestRpc(t *testing.T) { + var tr testReceiver + RegistReceiver(&tr) + myctx, cancel := context.WithCancel(context.Background()) + redisClient, _ := gocommon.NewRedisClient("redis://192.168.8.94:6379", 0) + Start(myctx, redisClient) + + tr.TestFunc("aaa", "bb") + <-myctx.Done() + cancel() } From 27a3f2f08c8766ca3eedf08e934fab62710e9173 Mon Sep 17 00:00:00 2001 From: mountain Date: Mon, 10 Jul 2023 11:12:45 +0900 Subject: [PATCH 14/26] =?UTF-8?q?funcname=20=EC=98=A4=EB=A5=98=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rpc.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index f239bd1..d39ada6 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -155,10 +155,10 @@ func (c callContext) call(args ...any) error { } frame, _ := runtime.CallersFrames(pc).Next() - prf := path.Base(frame.Function) - lastdot := strings.LastIndex(prf, ".") - rname := prf[:lastdot] - funcname := prf[lastdot+1:] + fullname := path.Base(frame.Function) + prf := strings.Split(fullname, ".") + rname := prf[1] + funcname := prf[2] serialized, err := encode(c.t, rname, funcname, args...) if err != nil { From c859aeb75fe97b977092010a0f3f379f75a6e71d Mon Sep 17 00:00:00 2001 From: mountain Date: Mon, 10 Jul 2023 11:43:52 +0900 Subject: [PATCH 15/26] =?UTF-8?q?=EC=B1=84=EB=84=90=EC=9D=B4=EB=A6=84=20?= =?UTF-8?q?=EA=B3=A0=EC=A0=95=EC=9C=BC=EB=A1=9C=20=EA=B3=84=EC=82=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rpc.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index d39ada6..08b49f6 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -3,7 +3,9 @@ package rpc import ( "bytes" "context" + "crypto/md5" "encoding/gob" + "encoding/hex" "errors" "fmt" "path" @@ -55,7 +57,12 @@ func Start(ctx context.Context, redisClient *redis.Client) { return } - pubsubName := primitive.NewObjectID().Hex()[6:] + hash := md5.New() + for k := range engine.receivers { + hash.Write([]byte(k)) + } + pubsubName := hex.EncodeToString(hash.Sum(nil))[:16] + engine.publish = func(s []byte) error { _, err := redisClient.Publish(ctx, pubsubName, s).Result() return err From b5a72aad05a770e6540e888e2eefc399a6c457a6 Mon Sep 17 00:00:00 2001 From: mountain Date: Mon, 10 Jul 2023 12:13:51 +0900 Subject: [PATCH 16/26] =?UTF-8?q?=EC=B1=84=EB=84=90=EC=9D=B4=EB=A6=84?= =?UTF-8?q?=EC=9D=84=20=EB=AA=A8=EB=93=A0=20public=20=ED=95=A8=EC=88=98?= =?UTF-8?q?=EC=9D=98=20signature=EB=A5=BC=20=EA=B3=A0=EB=A0=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rpc.go | 11 ++++++++++- rpc/rpc_test.go | 17 +++++++++++------ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 08b49f6..fa6090a 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -58,9 +58,18 @@ func Start(ctx context.Context, redisClient *redis.Client) { } hash := md5.New() - for k := range engine.receivers { + for k, manifest := range engine.receivers { hash.Write([]byte(k)) + for m, r := range manifest.methods { + hash.Write([]byte(m)) + hash.Write([]byte(r.Name)) + for i := 0; i < r.Type.NumIn(); i++ { + inName := r.Type.In(i).Name() + hash.Write([]byte(inName)) + } + } } + pubsubName := hex.EncodeToString(hash.Sum(nil))[:16] engine.publish = func(s []byte) error { diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index dc8f521..2965c1b 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -3,6 +3,7 @@ package rpc import ( "context" "testing" + "time" "go.mongodb.org/mongo-driver/bson/primitive" "repositories.action2quare.com/ayo/gocommon" @@ -13,19 +14,18 @@ type testReceiver struct { } func (tr *testReceiver) TargetExists(tid primitive.ObjectID) bool { + logger.Println(tid.Hex()) return tid[0] >= 10 } -func (tr *testReceiver) TestFunc(a string, b string) { - logger.Println("TestFunc :", a, b) - +func (tr *testReceiver) TestFunc(a string, b string, c int) { target := primitive.NewObjectID() target[0] = 0 if CallOrGo(tr, target, a, b) != ErrCanExecuteHere { return } - logger.Println(a, b) + logger.Println(" ", a, b) } func TestRpc(t *testing.T) { @@ -34,9 +34,14 @@ func TestRpc(t *testing.T) { myctx, cancel := context.WithCancel(context.Background()) redisClient, _ := gocommon.NewRedisClient("redis://192.168.8.94:6379", 0) - Start(myctx, redisClient) + go func() { + for { + tr.TestFunc("aaaa", "bbbb", 333) + time.Sleep(time.Second) + } + }() - tr.TestFunc("aaa", "bb") + Start(myctx, redisClient) <-myctx.Done() cancel() } From bfebc67eb7d3db8f0b221444a8eacd16e65533d7 Mon Sep 17 00:00:00 2001 From: mountain Date: Mon, 10 Jul 2023 12:28:32 +0900 Subject: [PATCH 17/26] =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=20=ED=95=A8?= =?UTF-8?q?=EC=88=98=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rpc_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 2965c1b..59479c8 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -2,6 +2,7 @@ package rpc import ( "context" + "math/rand" "testing" "time" @@ -20,12 +21,12 @@ func (tr *testReceiver) TargetExists(tid primitive.ObjectID) bool { func (tr *testReceiver) TestFunc(a string, b string, c int) { target := primitive.NewObjectID() - target[0] = 0 + target[0] = byte(rand.Intn(2) * 20) if CallOrGo(tr, target, a, b) != ErrCanExecuteHere { return } - logger.Println(" ", a, b) + logger.Println(" ", a, b, target[0]) } func TestRpc(t *testing.T) { From 1f668586f2dce94048e276401c4f136d2d278140 Mon Sep 17 00:00:00 2001 From: mountain Date: Mon, 10 Jul 2023 12:50:27 +0900 Subject: [PATCH 18/26] =?UTF-8?q?wshandler=20reciever=20=ED=95=A8=EC=88=98?= =?UTF-8?q?=20=EC=B6=94=EA=B0=80=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 23c9550..7170bbd 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -81,9 +81,9 @@ type subhandler struct { // WebsocketHandler : type WebsocketHandler struct { - authCaches map[string]*subhandler - RedisSync *redis.Client - ClientMessageReceiver WebSocketMessageReceiver + authCaches map[string]*subhandler + RedisSync *redis.Client + receiverChain []WebSocketMessageReceiver } type wsConfig struct { @@ -130,9 +130,28 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock } return &WebsocketHandler{ - authCaches: authCaches, - RedisSync: redisSync, - ClientMessageReceiver: func(primitive.ObjectID, string, WebSocketMessageType, io.Reader) {}, + authCaches: authCaches, + RedisSync: redisSync, + } +} + +func (ws *WebsocketHandler) RegisterReceiver(receiver WebSocketMessageReceiver) { + ws.receiverChain = append(ws.receiverChain, receiver) +} + +func (ws *WebsocketHandler) Start(ctx context.Context) { + for _, sh := range ws.authCaches { + if len(ws.receiverChain) == 1 { + sh.callReceiver = ws.receiverChain[0] + } else { + sh.callReceiver = func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) { + for _, r := range ws.receiverChain { + r(accid, alias, messageType, body) + } + } + } + + go sh.mainLoop(ctx) } } @@ -142,7 +161,7 @@ func (ws *WebsocketHandler) Cleanup() { } } -func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { +func (ws *WebsocketHandler) RegisterHandlers(serveMux *http.ServeMux, prefix string) error { for region, sh := range ws.authCaches { if region == "default" { region = "" @@ -153,8 +172,6 @@ func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http } else { serveMux.HandleFunc(url, sh.upgrade) } - sh.callReceiver = ws.ClientMessageReceiver - go sh.mainLoop(ctx) } return nil From 8598d59ab46f0f7a295b10acb6629926fa10e74e Mon Sep 17 00:00:00 2001 From: mountain Date: Mon, 10 Jul 2023 13:04:45 +0900 Subject: [PATCH 19/26] =?UTF-8?q?receiver=20=ED=95=A8=EC=88=98=EB=A5=BC=20?= =?UTF-8?q?region=20=EB=B3=84=EB=A1=9C=20=EB=B6=84=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 7170bbd..9bb7acb 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -83,7 +83,7 @@ type subhandler struct { type WebsocketHandler struct { authCaches map[string]*subhandler RedisSync *redis.Client - receiverChain []WebSocketMessageReceiver + receiverChain map[string][]WebSocketMessageReceiver } type wsConfig struct { @@ -130,22 +130,26 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock } return &WebsocketHandler{ - authCaches: authCaches, - RedisSync: redisSync, + authCaches: authCaches, + RedisSync: redisSync, + receiverChain: make(map[string][]WebSocketMessageReceiver), } } -func (ws *WebsocketHandler) RegisterReceiver(receiver WebSocketMessageReceiver) { - ws.receiverChain = append(ws.receiverChain, receiver) +func (ws *WebsocketHandler) RegisterReceiver(region string, receiver WebSocketMessageReceiver) { + ws.receiverChain[region] = append(ws.receiverChain[region], receiver) } func (ws *WebsocketHandler) Start(ctx context.Context) { - for _, sh := range ws.authCaches { - if len(ws.receiverChain) == 1 { - sh.callReceiver = ws.receiverChain[0] + for region, sh := range ws.authCaches { + chain := ws.receiverChain[region] + if len(chain) == 0 { + sh.callReceiver = func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) {} + } else if len(chain) == 1 { + sh.callReceiver = chain[0] } else { sh.callReceiver = func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) { - for _, r := range ws.receiverChain { + for _, r := range chain { r(accid, alias, messageType, body) } } From a842845685ee7dfbc2e01b6dbdca8ec05faaeb99 Mon Sep 17 00:00:00 2001 From: mountain Date: Mon, 10 Jul 2023 14:30:24 +0900 Subject: [PATCH 20/26] =?UTF-8?q?parameter=EB=A5=BC=20=EC=A2=80=EB=8D=94?= =?UTF-8?q?=20=EB=AA=85=ED=99=95=ED=95=98=EA=B2=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rpc.go | 15 +++++++++------ rpc/rpc_test.go | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index fa6090a..3bcff82 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -151,14 +151,14 @@ func (re *rpcEngine) loop(ctx context.Context, redisClient *redis.Client, chanNa var errNoReceiver = errors.New("no receiver") -type callContext struct { +type CallContext struct { r Receiver t primitive.ObjectID } var ErrCanExecuteHere = errors.New("go ahead") -func (c callContext) call(args ...any) error { +func (c *CallContext) Call(args ...any) error { if c.r.TargetExists(c.t) { // 여기 있네? return ErrCanExecuteHere @@ -184,12 +184,15 @@ func (c callContext) call(args ...any) error { return engine.publish(serialized) } -func CallOrGo(r Receiver, target primitive.ObjectID, args ...any) error { - cc := callContext{ +func Make(r Receiver) *CallContext { + return &CallContext{ r: r, - t: target, } - return cc.call(args...) +} + +func (cc *CallContext) To(target primitive.ObjectID) *CallContext { + cc.t = target + return cc } func encode(target primitive.ObjectID, receiver string, funcname string, args ...any) ([]byte, error) { diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 59479c8..ccc3071 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -22,7 +22,7 @@ func (tr *testReceiver) TargetExists(tid primitive.ObjectID) bool { func (tr *testReceiver) TestFunc(a string, b string, c int) { target := primitive.NewObjectID() target[0] = byte(rand.Intn(2) * 20) - if CallOrGo(tr, target, a, b) != ErrCanExecuteHere { + if Make(tr).To(target).Call(a, b, c) != ErrCanExecuteHere { return } From 3bb985d0b617c3864142bd96ce6eff536fd9641e Mon Sep 17 00:00:00 2001 From: mountain Date: Tue, 11 Jul 2023 09:36:21 +0900 Subject: [PATCH 21/26] =?UTF-8?q?MessageReceiver=20signature=20=EC=88=AE?= =?UTF-8?q?=EC=96=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/room.go | 4 +-- wshandler/wshandler.go | 68 +++++++++++++++++++++++++----------------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/wshandler/room.go b/wshandler/room.go index f74fb45..8a1c046 100644 --- a/wshandler/room.go +++ b/wshandler/room.go @@ -62,10 +62,10 @@ func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd b return true case conn := <-r.inChan: - (*conns)[conn.alias] = conn + (*conns)[conn.sender.Accid.Hex()] = conn case conn := <-r.outChan: - delete((*conns), conn.alias) + delete((*conns), conn.sender.Accid.Hex()) case msg := <-r.messageChan: for _, conn := range *conns { diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 9bb7acb..d1bfc95 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -24,8 +24,7 @@ var noSessionFlag = flagx.Bool("nosession", false, "nosession=[true|false]") type wsconn struct { *websocket.Conn - alias string - accid primitive.ObjectID + sender *Sender } type UpstreamMessage struct { @@ -65,7 +64,13 @@ const ( Disconnected = WebSocketMessageType(101) ) -type WebSocketMessageReceiver func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) +type Sender struct { + Region string + Accid primitive.ObjectID + Alias string +} + +type WebSocketMessageReceiver func(sender *Sender, messageType WebSocketMessageType, body io.Reader) type subhandler struct { authCache *gocommon.AuthCollection @@ -77,6 +82,7 @@ type subhandler struct { localDeliveryChan chan any callReceiver WebSocketMessageReceiver connWaitGroup sync.WaitGroup + region string } // WebsocketHandler : @@ -124,6 +130,7 @@ func NewWebsocketHandler(authglobal gocommon.AuthCollectionGlobal) (wsh *Websock connInOutChan: make(chan *wsconn), deliveryChan: make(chan any, 1000), localDeliveryChan: make(chan any, 100), + region: region, } authCaches[region] = sh @@ -144,13 +151,13 @@ func (ws *WebsocketHandler) Start(ctx context.Context) { for region, sh := range ws.authCaches { chain := ws.receiverChain[region] if len(chain) == 0 { - sh.callReceiver = func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) {} + sh.callReceiver = func(sender *Sender, messageType WebSocketMessageType, body io.Reader) {} } else if len(chain) == 1 { sh.callReceiver = chain[0] } else { - sh.callReceiver = func(accid primitive.ObjectID, alias string, messageType WebSocketMessageType, body io.Reader) { + sh.callReceiver = func(sender *Sender, messageType WebSocketMessageType, body io.Reader) { for _, r := range chain { - r(accid, alias, messageType, body) + r(sender, messageType, body) } } } @@ -281,7 +288,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) { case *UpstreamMessage: target := usermsg.Target if target[0] == '@' { - conn := entireConns[target[1:]] + accid := target[1:] + conn := entireConns[accid] if conn != nil { // 이 경우 아니면 publish 해야 함 conn.WriteMessage(websocket.TextMessage, usermsg.Body) @@ -294,19 +302,19 @@ func (sh *subhandler) mainLoop(ctx context.Context) { case *CommandMessage: if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { - alias := usermsg.Args[0].(string) + accid := usermsg.Args[0].(string) roomName := usermsg.Args[1].(string) - conn := entireConns[alias] + conn := entireConns[accid] if conn != nil { findRoom(roomName, true).in(conn) break } } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { - alias := usermsg.Args[0].(string) + accid := usermsg.Args[0].(string) roomName := usermsg.Args[1].(string) - conn := entireConns[alias] + conn := entireConns[accid] if conn != nil { if room := findRoom(roomName, false); room != nil { room.out(conn) @@ -314,10 +322,11 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } } } else if usermsg.Cmd == CommandType_WriteControl && len(usermsg.Args) == 2 { - alias := usermsg.Args[0].(string) - conn := entireConns[alias] + accid := usermsg.Args[0].(string) + conn := entireConns[accid] if conn != nil { conn.WriteControl(usermsg.Args[1].(int), usermsg.Args[2].([]byte), time.Time{}) + break } } @@ -338,7 +347,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) { room.broadcast(usermsg) } } else if target[0] == '@' { - conn := entireConns[target[1:]] + accid := target[1:] + conn := entireConns[accid] if conn != nil { conn.WriteMessage(websocket.TextMessage, usermsg.Body) } @@ -346,18 +356,18 @@ func (sh *subhandler) mainLoop(ctx context.Context) { case *CommandMessage: if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { - alias := usermsg.Args[0].(string) + accid := usermsg.Args[0].(string) roomName := usermsg.Args[1].(string) - conn := entireConns[alias] + conn := entireConns[accid] if conn != nil { findRoom(roomName, true).in(conn) } } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { - alias := usermsg.Args[0].(string) + accid := usermsg.Args[0].(string) roomName := usermsg.Args[1].(string) - conn := entireConns[alias] + conn := entireConns[accid] if conn != nil { if room := findRoom(roomName, false); room != nil { room.out(conn) @@ -371,13 +381,14 @@ func (sh *subhandler) mainLoop(ctx context.Context) { case c := <-sh.connInOutChan: if c.Conn == nil { - delete(entireConns, c.alias) + delete(entireConns, c.sender.Accid.Hex()) for _, room := range rooms { room.out(c) } - sh.callReceiver(c.accid, c.alias, Connected, nil) + sh.callReceiver(c.sender, Disconnected, nil) } else { - entireConns[c.alias] = c + entireConns[c.sender.Accid.Hex()] = c + sh.callReceiver(c.sender, Connected, nil) } } } @@ -385,9 +396,12 @@ func (sh *subhandler) mainLoop(ctx context.Context) { func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID, alias string) { newconn := &wsconn{ - Conn: conn, - alias: alias, - accid: accid, + Conn: conn, + sender: &Sender{ + Region: sh.region, + Alias: alias, + Accid: accid, + }, } sh.connInOutChan <- newconn @@ -402,15 +416,15 @@ func upgrade_core(sh *subhandler, conn *websocket.Conn, accid primitive.ObjectID } if messageType == websocket.CloseMessage { - sh.callReceiver(accid, c.alias, CloseMessage, r) + sh.callReceiver(c.sender, CloseMessage, r) break } if messageType == websocket.TextMessage { // 유저가 직접 보낸 메시지 - sh.callReceiver(accid, c.alias, TextMessage, r) + sh.callReceiver(c.sender, TextMessage, r) } else if messageType == websocket.BinaryMessage { - sh.callReceiver(accid, c.alias, BinaryMessage, r) + sh.callReceiver(c.sender, BinaryMessage, r) } } sh.redisSync.Del(context.Background(), accid.Hex()) From a42eb2888e9768e3568118a947924c86e0d91f28 Mon Sep 17 00:00:00 2001 From: mountain Date: Tue, 11 Jul 2023 09:56:04 +0900 Subject: [PATCH 22/26] =?UTF-8?q?commandMessage=EB=A5=BC=20private?= =?UTF-8?q?=EC=9C=BC=EB=A1=9C=20=EB=A7=8C=EB=93=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 52 +++++++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index d1bfc95..8233163 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -39,16 +39,16 @@ type DownstreamMessage struct { Body string } -type CommandType string +type commandType string const ( - CommandType_JoinRoom = CommandType("join_room") - CommandType_LeaveRoom = CommandType("leave_room") - CommandType_WriteControl = CommandType("write_control") + commandType_JoinRoom = commandType("join_room") + commandType_LeaveRoom = commandType("leave_room") + commandType_WriteControl = commandType("write_control") ) -type CommandMessage struct { - Cmd CommandType +type commandMessage struct { + Cmd commandType Args []any } @@ -210,8 +210,8 @@ func (ws *WebsocketHandler) SendUpstreamMessage(region string, msg *UpstreamMess func (ws *WebsocketHandler) SendCloseMessage(region string, target string, text string) { sh := ws.authCaches[region] if sh != nil { - sh.localDeliveryChan <- &CommandMessage{ - Cmd: CommandType_WriteControl, + sh.localDeliveryChan <- &commandMessage{ + Cmd: commandType_WriteControl, Args: []any{ target, int(websocket.CloseMessage), @@ -221,6 +221,26 @@ func (ws *WebsocketHandler) SendCloseMessage(region string, target string, text } } +func (ws *WebsocketHandler) EnterRoom(region string, room string, accid primitive.ObjectID) { + sh := ws.authCaches[region] + if sh != nil { + sh.localDeliveryChan <- &commandMessage{ + Cmd: commandType_JoinRoom, + Args: []any{room, accid}, + } + } +} + +func (ws *WebsocketHandler) LeaveRoom(region string, room string, accid primitive.ObjectID) { + sh := ws.authCaches[region] + if sh != nil { + sh.localDeliveryChan <- &commandMessage{ + Cmd: commandType_LeaveRoom, + Args: []any{room, accid}, + } + } +} + func (sh *subhandler) mainLoop(ctx context.Context) { defer func() { s := recover() @@ -247,7 +267,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { logger.Println("decode UpstreamMessage failed :", err) } } else if raw.Channel == sh.redisCmdChanName { - var cmd CommandMessage + var cmd commandMessage if err := json.Unmarshal([]byte(raw.Payload), &cmd); err == nil { sh.deliveryChan <- &cmd } else { @@ -300,8 +320,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) { sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, bt).Result() } - case *CommandMessage: - if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { + case *commandMessage: + if usermsg.Cmd == commandType_JoinRoom && len(usermsg.Args) == 2 { accid := usermsg.Args[0].(string) roomName := usermsg.Args[1].(string) @@ -310,7 +330,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { findRoom(roomName, true).in(conn) break } - } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { + } else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 { accid := usermsg.Args[0].(string) roomName := usermsg.Args[1].(string) @@ -321,7 +341,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { break } } - } else if usermsg.Cmd == CommandType_WriteControl && len(usermsg.Args) == 2 { + } else if usermsg.Cmd == commandType_WriteControl && len(usermsg.Args) == 2 { accid := usermsg.Args[0].(string) conn := entireConns[accid] if conn != nil { @@ -354,8 +374,8 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } } - case *CommandMessage: - if usermsg.Cmd == CommandType_JoinRoom && len(usermsg.Args) == 2 { + case *commandMessage: + if usermsg.Cmd == commandType_JoinRoom && len(usermsg.Args) == 2 { accid := usermsg.Args[0].(string) roomName := usermsg.Args[1].(string) @@ -363,7 +383,7 @@ func (sh *subhandler) mainLoop(ctx context.Context) { if conn != nil { findRoom(roomName, true).in(conn) } - } else if usermsg.Cmd == CommandType_LeaveRoom && len(usermsg.Args) == 2 { + } else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 { accid := usermsg.Args[0].(string) roomName := usermsg.Args[1].(string) From d7b26608df94abb6ec8276fa6c8c9890247772c9 Mon Sep 17 00:00:00 2001 From: mountain Date: Tue, 11 Jul 2023 12:31:35 +0900 Subject: [PATCH 23/26] =?UTF-8?q?=ED=83=80=EC=9E=85=20=EB=AF=B8=EC=8A=A4?= =?UTF-8?q?=EB=A7=A4=EC=B9=98=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 8233163..c51ebb4 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -322,19 +322,17 @@ func (sh *subhandler) mainLoop(ctx context.Context) { case *commandMessage: if usermsg.Cmd == commandType_JoinRoom && len(usermsg.Args) == 2 { - accid := usermsg.Args[0].(string) - roomName := usermsg.Args[1].(string) - - conn := entireConns[accid] + roomName := usermsg.Args[0].(string) + accid := usermsg.Args[1].(primitive.ObjectID) + conn := entireConns[accid.Hex()] if conn != nil { findRoom(roomName, true).in(conn) break } } else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 { - accid := usermsg.Args[0].(string) - roomName := usermsg.Args[1].(string) - - conn := entireConns[accid] + roomName := usermsg.Args[0].(string) + accid := usermsg.Args[1].(primitive.ObjectID) + conn := entireConns[accid.Hex()] if conn != nil { if room := findRoom(roomName, false); room != nil { room.out(conn) @@ -376,18 +374,16 @@ func (sh *subhandler) mainLoop(ctx context.Context) { case *commandMessage: if usermsg.Cmd == commandType_JoinRoom && len(usermsg.Args) == 2 { - accid := usermsg.Args[0].(string) - roomName := usermsg.Args[1].(string) - - conn := entireConns[accid] + roomName := usermsg.Args[0].(string) + accid := usermsg.Args[1].(primitive.ObjectID) + conn := entireConns[accid.Hex()] if conn != nil { findRoom(roomName, true).in(conn) } } else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 { - accid := usermsg.Args[0].(string) - roomName := usermsg.Args[1].(string) - - conn := entireConns[accid] + roomName := usermsg.Args[0].(string) + accid := usermsg.Args[1].(primitive.ObjectID) + conn := entireConns[accid.Hex()] if conn != nil { if room := findRoom(roomName, false); room != nil { room.out(conn) From 9fd0dd00cb7d97d54ad3c809e3c0dbf4e5692563 Mon Sep 17 00:00:00 2001 From: mountain Date: Tue, 11 Jul 2023 12:57:57 +0900 Subject: [PATCH 24/26] =?UTF-8?q?mesasge=EC=97=90=20=ED=83=9C=EA=B7=B8=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/wshandler.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index c51ebb4..58f54b8 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -32,11 +32,13 @@ type UpstreamMessage struct { Accid primitive.ObjectID Target string Body []byte + Tag []string } type DownstreamMessage struct { - Alias string - Body string + Alias string `json:",omitempty"` + Body string `json:",omitempty"` + Tag []string `json:",omitempty"` } type commandType string From 4acb81a20d9cd4eb0910e38b3da0bdaa081b38d0 Mon Sep 17 00:00:00 2001 From: mountain Date: Tue, 11 Jul 2023 14:30:10 +0900 Subject: [PATCH 25/26] =?UTF-8?q?message=20body=EB=A5=BC=20any=EB=A1=9C=20?= =?UTF-8?q?=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/room.go | 16 +++++++++------- wshandler/wshandler.go | 19 +++++++++++++++---- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/wshandler/room.go b/wshandler/room.go index 8a1c046..c04a946 100644 --- a/wshandler/room.go +++ b/wshandler/room.go @@ -2,6 +2,7 @@ package wshandler import ( "context" + "encoding/json" "github.com/gorilla/websocket" "repositories.action2quare.com/ayo/gocommon/logger" @@ -54,8 +55,6 @@ func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd b } }() - a, b, c := []byte(`{"alias":"`), []byte(`","body":"`), []byte(`"}`) - for { select { case <-ctx.Done(): @@ -68,13 +67,16 @@ func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd b delete((*conns), conn.sender.Accid.Hex()) case msg := <-r.messageChan: + ds := DownstreamMessage{ + Alias: msg.Alias, + Body: msg.Body, + Tag: append(msg.Tag, r.name), + } + bt, _ := json.Marshal(ds) + for _, conn := range *conns { writer, _ := conn.NextWriter(websocket.TextMessage) - writer.Write(a) - writer.Write([]byte(msg.Alias)) - writer.Write(b) - writer.Write(msg.Body) - writer.Write(c) + writer.Write(bt) writer.Close() } } diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index 58f54b8..e4427c0 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -31,13 +31,13 @@ type UpstreamMessage struct { Alias string Accid primitive.ObjectID Target string - Body []byte + Body any Tag []string } type DownstreamMessage struct { Alias string `json:",omitempty"` - Body string `json:",omitempty"` + Body any `json:",omitempty"` Tag []string `json:",omitempty"` } @@ -314,7 +314,13 @@ func (sh *subhandler) mainLoop(ctx context.Context) { conn := entireConns[accid] if conn != nil { // 이 경우 아니면 publish 해야 함 - conn.WriteMessage(websocket.TextMessage, usermsg.Body) + ds, _ := json.Marshal(DownstreamMessage{ + Alias: usermsg.Alias, + Body: usermsg.Body, + Tag: usermsg.Tag, + }) + + conn.WriteMessage(websocket.TextMessage, ds) break } } @@ -370,7 +376,12 @@ func (sh *subhandler) mainLoop(ctx context.Context) { accid := target[1:] conn := entireConns[accid] if conn != nil { - conn.WriteMessage(websocket.TextMessage, usermsg.Body) + ds, _ := json.Marshal(DownstreamMessage{ + Alias: usermsg.Alias, + Body: usermsg.Body, + Tag: usermsg.Tag, + }) + conn.WriteMessage(websocket.TextMessage, ds) } } From 74829b93ac1b3e3ea4a51fbc5113800ad57006fc Mon Sep 17 00:00:00 2001 From: mountain Date: Tue, 11 Jul 2023 15:26:13 +0900 Subject: [PATCH 26/26] =?UTF-8?q?tag=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wshandler/room.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/wshandler/room.go b/wshandler/room.go index c04a946..762e165 100644 --- a/wshandler/room.go +++ b/wshandler/room.go @@ -55,6 +55,7 @@ func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd b } }() + tag := "#" + r.name for { select { case <-ctx.Done(): @@ -70,7 +71,7 @@ func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd b ds := DownstreamMessage{ Alias: msg.Alias, Body: msg.Body, - Tag: append(msg.Tag, r.name), + Tag: append(msg.Tag, tag), } bt, _ := json.Marshal(ds)