From 165d75a21f0f93d90ab0fd4649594299bc6f49ba Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 3 Aug 2023 10:13:51 +0900 Subject: [PATCH] Update wshandler.go --- wshandler/wshandler.go | 202 ++++++++++++++++++++++++----------------- 1 file changed, 117 insertions(+), 85 deletions(-) diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index c552e37..6fb7a8e 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -7,6 +7,7 @@ import ( "encoding/gob" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -45,6 +46,15 @@ func (conn *wsconn) pushRoom(r *room) { conn.joinedRooms = append(conn.joinedRooms, r) } +func (conn *wsconn) isInRoom(roomname string) bool { + for _, r := range conn.joinedRooms { + if r.name == roomname { + return true + } + } + return false +} + type UpstreamMessage struct { Alias string Accid primitive.ObjectID @@ -385,6 +395,85 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } }() + errProcessFailed_NotInRoom := errors.New("not in room") + processPrivateUpstreamMessage := func(usermsg *UpstreamMessage) (bool, error) { + target := usermsg.Target + if target[0] == '#' { + return false, nil + } + roomindex := strings.IndexByte(target, '@') + var accid string + var roomid string + if roomindex > 0 { + accid = target[:roomindex] + roomid = target[roomindex+1:] + } else { + accid = target + } + conn := entireConns[accid] + if conn == nil { + return false, nil + } + + if len(roomid) > 0 { + if !conn.isInRoom(roomid) { + return false, errProcessFailed_NotInRoom + } + } + + // conn에게 메시지 보내면 처리 완료 + ds, _ := json.Marshal(DownstreamMessage{ + Alias: usermsg.Alias, + Body: usermsg.Body, + Tag: usermsg.Tag, + }) + sh.sendMsgChan <- send_msg_queue_elem{ + to: conn, + mt: websocket.TextMessage, + msg: ds, + } + + return true, nil + } + errCommandArgumentNotCorrect := errors.New("command arguments are not correct") + processCommandMessage := func(usermsg *commandMessage) (bool, error) { + switch usermsg.Cmd { + case commandType_EnterRoom: + if len(usermsg.Args) != 2 { + return false, errCommandArgumentNotCorrect + } + roomName := usermsg.Args[0].(string) + accid := usermsg.Args[1].(primitive.ObjectID) + conn := entireConns[accid.Hex()] + if conn == nil { + return false, nil + } + conn.pushRoom(findRoom(roomName, true).in(conn)) + + case commandType_LeaveRoom: + if len(usermsg.Args) != 2 { + return false, errCommandArgumentNotCorrect + } + roomName := usermsg.Args[0].(string) + accid := usermsg.Args[1].(primitive.ObjectID) + conn := entireConns[accid.Hex()] + if conn == nil { + return false, nil + } + + room := findRoom(roomName, false) + if room == nil { + return false, errProcessFailed_NotInRoom + } + + if conn.popRoom(room.out(conn)) == 0 { + closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") + conn.WriteControl(websocket.CloseMessage, closeMsg, time.Time{}) + } + } + return true, nil + } + // 유저에게서 온 메세지, 소켓 연결/해체 처리 for { buffer := bytes.NewBuffer(make([]byte, 0, 1024)) @@ -403,30 +492,17 @@ func (sh *subhandler) mainLoop(ctx context.Context) { // 없으면 publish한다. switch usermsg := usermsg.(type) { case *UpstreamMessage: - target := usermsg.Target - if target[0] == '@' { - accid := target[1:] - conn := entireConns[accid] - if conn != nil { - // 이 경우 아니면 publish 해야 함 - ds, _ := json.Marshal(DownstreamMessage{ - Alias: usermsg.Alias, - Body: usermsg.Body, - Tag: usermsg.Tag, - }) - sh.sendMsgChan <- send_msg_queue_elem{ - to: conn, - mt: websocket.TextMessage, - msg: ds, - } - break - } + processed, err := processPrivateUpstreamMessage(usermsg) + if err != nil { + logger.Println("processUpstreamMessage returns err :", err) } - var err error - enc := gob.NewEncoder(buffer) - if err = enc.Encode(usermsg); err == nil { - _, err = sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, buffer.Bytes()).Result() + if !processed { + // private메시지가 처리가 안됐으므로 publish + enc := gob.NewEncoder(buffer) + if err = enc.Encode(usermsg); err == nil { + _, err = sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, buffer.Bytes()).Result() + } } if err != nil { @@ -434,37 +510,21 @@ func (sh *subhandler) mainLoop(ctx context.Context) { } case *commandMessage: - if usermsg.Cmd == commandType_EnterRoom && len(usermsg.Args) == 2 { - roomName := usermsg.Args[0].(string) - accid := usermsg.Args[1].(primitive.ObjectID) - conn := entireConns[accid.Hex()] - if conn != nil { - conn.pushRoom(findRoom(roomName, true).in(conn)) - break - } - } else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 { - roomName := usermsg.Args[0].(string) - accid := usermsg.Args[1].(primitive.ObjectID) - conn := entireConns[accid.Hex()] - if conn != nil { - if room := findRoom(roomName, false); room != nil { - if conn.popRoom(room.out(conn)) == 0 { - closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") - conn.WriteControl(websocket.CloseMessage, closeMsg, time.Time{}) - } - break - } - } + processed, err := processCommandMessage(usermsg) + if err != nil { + logger.Println("processCommandMessage returns err :", err) + break } - // 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다 - var err error - enc := gob.NewEncoder(buffer) - if err = enc.Encode(usermsg); err == nil { - _, err = sh.redisSync.Publish(context.Background(), sh.redisCmdChanName, buffer.Bytes()).Result() - } - if err != nil { - logger.Println("gob.Encode or Publish failed :", err) + if !processed { + var err error + enc := gob.NewEncoder(buffer) + if err = enc.Encode(usermsg); err == nil { + _, err = sh.redisSync.Publish(context.Background(), sh.redisCmdChanName, buffer.Bytes()).Result() + } + if err != nil { + logger.Println("gob.Encode or Publish failed :", err) + } } } @@ -478,43 +538,15 @@ func (sh *subhandler) mainLoop(ctx context.Context) { if room := findRoom(roomName, false); room != nil { room.broadcast(usermsg) } - } else if target[0] == '@' { - accid := target[1:] - conn := entireConns[accid] - if conn != nil { - ds, _ := json.Marshal(DownstreamMessage{ - Alias: usermsg.Alias, - Body: usermsg.Body, - Tag: usermsg.Tag, - }) - sh.sendMsgChan <- send_msg_queue_elem{ - to: conn, - mt: websocket.TextMessage, - msg: ds, - } - } + } else if _, err := processPrivateUpstreamMessage(usermsg); err != nil { + logger.Println("processPrivateUpstreamMessage returns err :", err) } case *commandMessage: - if usermsg.Cmd == commandType_EnterRoom && len(usermsg.Args) == 2 { - roomName := usermsg.Args[0].(string) - accid := usermsg.Args[1].(primitive.ObjectID) - conn := entireConns[accid.Hex()] - if conn != nil { - conn.pushRoom(findRoom(roomName, true).in(conn)) - } - } else if usermsg.Cmd == commandType_LeaveRoom && len(usermsg.Args) == 2 { - roomName := usermsg.Args[0].(string) - accid := usermsg.Args[1].(primitive.ObjectID) - conn := entireConns[accid.Hex()] - if conn != nil { - if room := findRoom(roomName, false); room != nil { - if conn.popRoom(room.out(conn)) == 0 { - closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") - conn.WriteControl(websocket.CloseMessage, closeMsg, time.Time{}) - } - } - } + _, err := processCommandMessage(usermsg) + if err != nil { + logger.Println("processCommandMessage returns err :", err) + break } default: