diff --git a/wshandler/api_handler_peer.go b/wshandler/api_handler_peer.go index 8512d63..29ddbbb 100644 --- a/wshandler/api_handler_peer.go +++ b/wshandler/api_handler_peer.go @@ -1,8 +1,8 @@ package wshandler import ( - "bytes" "encoding/json" + "io" "reflect" "strings" "unsafe" @@ -12,7 +12,7 @@ import ( "repositories.action2quare.com/ayo/gocommon/logger" ) -type peerApiFuncType func(any, []byte) (any, error) +type peerApiFuncType func(any, io.Reader) (any, error) type peerConnFuncType func(any, *websocket.Conn) type peerDisconnFuncType func(any, string) @@ -100,8 +100,8 @@ func MakeWebsocketPeerApiHandler[T any](receiverName string) WebsocketPeerApiHan } } - methods[receiverName+"."+method.Name] = func(recv any, buff []byte) (any, error) { - decoder := json.NewDecoder(bytes.NewBuffer(buff)) + methods[receiverName+"."+method.Name] = func(recv any, r io.Reader) (any, error) { + decoder := json.NewDecoder(r) inargs := make([]any, len(intypes)) for i, intype := range intypes { @@ -163,21 +163,21 @@ func (hc *WebsocketPeerApiBroker) AddHandler(receiver WebsocketPeerApiHandler) { } } -func (hc *WebsocketPeerApiBroker) ClientConnected(recv any, c *wsconn) { +func (hc *WebsocketPeerApiBroker) ClientConnected(recv any, c *websocket.Conn) { for _, v := range hc.connFuncs { - v(recv, c.Conn) + v(recv, c) } } -func (hc *WebsocketPeerApiBroker) ClientDisconnected(recv any, c *wsconn) { +func (hc *WebsocketPeerApiBroker) ClientDisconnected(recv any, reason string) { for _, v := range hc.disconnFuncs { - v(recv, c.closeMessage) + v(recv, reason) } } -func (hc *WebsocketPeerApiBroker) Call(recv any, funcname string, buff []byte) { +func (hc *WebsocketPeerApiBroker) Call(recv any, funcname string, r io.Reader) { if found := hc.methods[funcname]; found != nil { - _, err := found(recv, buff) + _, err := found(recv, r) if err != nil { logger.Println("api call is failed. err :", err) } diff --git a/wshandler/api_handler_test.go b/wshandler/api_handler_test.go index 8986a73..33d4b77 100644 --- a/wshandler/api_handler_test.go +++ b/wshandler/api_handler_test.go @@ -90,22 +90,18 @@ func (dsc *dummySessionConsumer) Touch(string) (session.Authorization, error) { func TestPeerApiBroker(t *testing.T) { handler := MakeWebsocketPeerApiHandler[peerHandler]("test") - ws, err := NewWebsocketPeerHandler(&dummySessionConsumer{}, "redis://192.168.8.94:6380/4") - if err != nil { - t.Error(err) - } - + ws := NewWebsocketPeerHandler(&dummySessionConsumer{}) ws.AddHandler(handler) peer := &peerHandler{ id: "onlyone", } func1args, _ := json.Marshal([]any{string("arg1"), int(100)}) - ws.Call(peer, "test.ApiFunc1", func1args) + ws.Call(peer, "test.ApiFunc1", bytes.NewBuffer(func1args)) func1args, _ = json.Marshal([]any{string("arg1"), map[string]int{"arg2.key": 99}}) - ws.Call(peer, "test.ApiFunc2", func1args) + ws.Call(peer, "test.ApiFunc2", bytes.NewBuffer(func1args)) func1args, _ = json.Marshal([]any{float64(111.1), []int{99, 98}}) - ws.Call(peer, "test.ApiFunc3", func1args) + ws.Call(peer, "test.ApiFunc3", bytes.NewBuffer(func1args)) } diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index a6758a4..fcaf7bd 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -86,7 +86,9 @@ type send_msg_queue_elem struct { msg []byte } -type websocketHandlerBase struct { +type WebsocketHandler struct { + WebsocketApiBroker + redisMsgChanName string redisCmdChanName string redisSync *redis.Client @@ -99,16 +101,6 @@ type websocketHandlerBase struct { sessionConsumer session.Consumer } -type WebsocketHandler struct { - WebsocketApiBroker - *websocketHandlerBase -} - -type WebsocketPeerHandler struct { - WebsocketPeerApiBroker - *websocketHandlerBase -} - type wsConfig struct { gocommon.StorageAddr `json:"storage"` } @@ -124,7 +116,7 @@ func init() { gob.Register([]any{}) } -func makeWebsocketHandlerBase(consumer session.Consumer, redisUrl string) (*websocketHandlerBase, error) { +func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) { var config wsConfig if err := gocommon.LoadConfig(&config); err != nil { return nil, err @@ -159,7 +151,7 @@ func makeWebsocketHandlerBase(consumer session.Consumer, redisUrl string) (*webs } }() - return &websocketHandlerBase{ + return &WebsocketHandler{ redisMsgChanName: fmt.Sprintf("_wsh_msg_%d", redisSync.Options().DB), redisCmdChanName: fmt.Sprintf("_wsh_cmd_%d", redisSync.Options().DB), redisSync: redisSync, @@ -171,28 +163,6 @@ func makeWebsocketHandlerBase(consumer session.Consumer, redisUrl string) (*webs }, nil } -func NewWebsocketPeerHandler(consumer session.Consumer, redisUrl string) (*WebsocketPeerHandler, error) { - base, err := makeWebsocketHandlerBase(consumer, redisUrl) - if err != nil { - return nil, err - } - - return &WebsocketPeerHandler{ - websocketHandlerBase: base, - }, nil -} - -func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) { - base, err := makeWebsocketHandlerBase(consumer, redisUrl) - if err != nil { - return nil, err - } - - return &WebsocketHandler{ - websocketHandlerBase: base, - }, nil -} - func (ws *WebsocketHandler) Start(ctx context.Context) { ws.connWaitGroup.Add(1) go ws.mainLoop(ctx) diff --git a/wshandler/wshandler_peer.go b/wshandler/wshandler_peer.go new file mode 100644 index 0000000..d71aa86 --- /dev/null +++ b/wshandler/wshandler_peer.go @@ -0,0 +1,159 @@ +package wshandler + +import ( + "encoding/base64" + "encoding/hex" + "io" + "net/http" + "strings" + + "go.mongodb.org/mongo-driver/bson/primitive" + "repositories.action2quare.com/ayo/gocommon" + "repositories.action2quare.com/ayo/gocommon/logger" + "repositories.action2quare.com/ayo/gocommon/session" + + "github.com/gorilla/websocket" +) + +type WebsocketPeerHandler struct { + WebsocketPeerApiBroker + sessionConsumer session.Consumer +} + +func NewWebsocketPeerHandler(consumer session.Consumer) WebsocketPeerHandler { + return WebsocketPeerHandler{ + sessionConsumer: consumer, + } +} + +func (ws *WebsocketPeerHandler) RegisterHandlers(serveMux *http.ServeMux, prefix string) error { + url := gocommon.MakeHttpHandlerPattern(prefix, "ws") + if *noAuthFlag { + serveMux.HandleFunc(url, ws.upgrade_nosession) + } else { + serveMux.HandleFunc(url, ws.upgrade) + } + + return nil +} + +func (ws *WebsocketPeerHandler) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, alias string) { + go func(c *websocket.Conn, accid primitive.ObjectID) { + peer := ws.CreatePeer(accid) + ws.ClientConnected(peer, c) + + var closeReason string + for { + messageType, r, err := c.NextReader() + if err != nil { + if ce, ok := err.(*websocket.CloseError); ok { + closeReason = ce.Text + } + c.Close() + break + } + + if messageType == websocket.CloseMessage { + closeMsg, _ := io.ReadAll(r) + closeReason = string(closeMsg) + break + } + + if messageType == websocket.BinaryMessage { + var size [1]byte + r.Read(size[:]) + cmd := make([]byte, size[0]) + r.Read(cmd) + ws.Call(peer, string(cmd), r) + } + } + ws.ClientDisconnected(peer, closeReason) + }(conn, accid) +} + +func (ws *WebsocketPeerHandler) upgrade_nosession(w http.ResponseWriter, r *http.Request) { + // 클라이언트 접속 + defer func() { + s := recover() + if s != nil { + logger.Error(s) + } + io.Copy(io.Discard, r.Body) + r.Body.Close() + }() + + auth := strings.Split(r.Header.Get("Authorization"), " ") + if len(auth) != 2 { + w.WriteHeader(http.StatusBadRequest) + return + } + + temp, err := hex.DecodeString(auth[1]) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + if len(temp) != len(primitive.NilObjectID) { + w.WriteHeader(http.StatusBadRequest) + return + } + + raw := (*[12]byte)(temp) + accid := primitive.ObjectID(*raw) + + var upgrader = websocket.Upgrader{} // use default options + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + var alias string + if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 { + vt, _ := base64.StdEncoding.DecodeString(v) + alias = string(vt) + } else { + alias = accid.Hex() + } + + ws.upgrade_core(conn, accid, alias) +} + +func (ws *WebsocketPeerHandler) upgrade(w http.ResponseWriter, r *http.Request) { + // 클라이언트 접속 + defer func() { + s := recover() + if s != nil { + logger.Error(s) + } + io.Copy(io.Discard, r.Body) + r.Body.Close() + }() + + sk := r.Header.Get("AS-X-SESSION") + logger.Println("WebsocketHandler.upgrade sk :", sk) + authinfo, err := ws.sessionConsumer.Query(sk) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + logger.Error("authorize query failed :", err) + return + } + + var upgrader = websocket.Upgrader{} // use default options + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + var alias string + if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 { + vt, _ := base64.StdEncoding.DecodeString(v) + alias = string(vt) + } else { + alias = authinfo.Account.Hex() + } + + ws.upgrade_core(conn, authinfo.Account, alias) +}