From 76923f8ecfb5e8184c62e22ce6dee3707bbea410 Mon Sep 17 00:00:00 2001 From: mountain Date: Fri, 22 Dec 2023 11:36:55 +0900 Subject: [PATCH] =?UTF-8?q?Peer=20Interface=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit peer handler 코드 정리 --- wshandler/api_handler_peer.go | 126 --------------------------- wshandler/api_handler_test.go | 56 ++---------- wshandler/wshandler_peer.go | 155 ++++++++++++++++++++++++++++++---- 3 files changed, 145 insertions(+), 192 deletions(-) delete mode 100644 wshandler/api_handler_peer.go diff --git a/wshandler/api_handler_peer.go b/wshandler/api_handler_peer.go deleted file mode 100644 index 0d782ec..0000000 --- a/wshandler/api_handler_peer.go +++ /dev/null @@ -1,126 +0,0 @@ -package wshandler - -import ( - "encoding/json" - "fmt" - "io" - "reflect" - "strings" - - "go.mongodb.org/mongo-driver/bson/primitive" - "repositories.action2quare.com/ayo/gocommon/logger" -) - -type PeerInterface interface { - ClientDisconnected(string) -} -type peerApiFuncType[T PeerInterface] func(T, io.Reader) (any, error) - -type WebsocketPeerApiHandler[T PeerInterface] struct { - methods map[string]peerApiFuncType[T] - originalReceiverName string -} - -func MakeWebsocketPeerApiHandler[T PeerInterface](receiverName string) WebsocketPeerApiHandler[T] { - methods := make(map[string]peerApiFuncType[T]) - - var archetype T - tp := reflect.TypeOf(archetype) - if len(receiverName) == 0 { - receiverName = tp.Elem().Name() - } - - for i := 0; i < tp.NumMethod(); i++ { - method := tp.Method(i) - if method.Type.In(0) != tp { - continue - } - - if method.Name == ClientDisconnected { - continue - } - - var intypes []reflect.Type - for i := 1; i < method.Type.NumIn(); i++ { - intypes = append(intypes, method.Type.In(i)) - } - - var outconv func([]reflect.Value) (any, error) - if method.Type.NumOut() == 0 { - outconv = func([]reflect.Value) (any, error) { return nil, nil } - } else if method.Type.NumOut() == 1 { - if method.Type.Out(0).Implements(reflect.TypeOf((*error)(nil)).Elem()) { - outconv = func(out []reflect.Value) (any, error) { - if out[0].Interface() == nil { - return nil, nil - } - return nil, out[0].Interface().(error) - } - } else { - outconv = func(out []reflect.Value) (any, error) { - return out[0].Interface(), nil - } - } - } else if method.Type.NumOut() == 2 && method.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { - outconv = func(out []reflect.Value) (any, error) { - if out[1].Interface() == nil { - return out[0].Interface(), nil - } - return out[0].Interface(), out[1].Interface().(error) - } - } - - methods[receiverName+"."+method.Name] = func(recv T, r io.Reader) (any, error) { - decoder := json.NewDecoder(r) - inargs := make([]any, len(intypes)) - - for i, intype := range intypes { - zerovalueptr := reflect.New(intype) - inargs[i] = zerovalueptr.Interface() - } - - err := decoder.Decode(&inargs) - if err != nil { - return nil, err - } - - reflectargs := make([]reflect.Value, 0, len(inargs)+1) - reflectargs = append(reflectargs, reflect.ValueOf(recv)) - for _, p := range inargs { - reflectargs = append(reflectargs, reflect.ValueOf(p).Elem()) - } - - return outconv(method.Func.Call(reflectargs)) - } - } - - return WebsocketPeerApiHandler[T]{ - methods: methods, - originalReceiverName: tp.Elem().Name(), - } -} - -type WebsocketPeerApiBroker[T PeerInterface] struct { - methods map[string]peerApiFuncType[T] - CreatePeer func(primitive.ObjectID) T -} - -func (hc *WebsocketPeerApiBroker[T]) AddHandler(receiver WebsocketPeerApiHandler[T]) { - if hc.methods == nil { - hc.methods = make(map[string]peerApiFuncType[T]) - } - - for k, v := range receiver.methods { - ab := strings.Split(k, ".") - logger.Printf("ws api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k) - hc.methods[k] = v - } -} - -func (hc *WebsocketPeerApiBroker[T]) Call(recv T, funcname string, r io.Reader) (any, error) { - if found := hc.methods[funcname]; found != nil { - return found(recv, r) - } - - return nil, fmt.Errorf("api is not found : %s", funcname) -} diff --git a/wshandler/api_handler_test.go b/wshandler/api_handler_test.go index 3c35820..2c2447b 100644 --- a/wshandler/api_handler_test.go +++ b/wshandler/api_handler_test.go @@ -4,12 +4,9 @@ package wshandler import ( "bytes" "encoding/json" - "errors" "fmt" "reflect" "testing" - - "repositories.action2quare.com/ayo/gocommon/session" ) type TestReceiver struct { @@ -59,52 +56,11 @@ func TestUnmarshalToken(t *testing.T) { // con.AddHandler(receiver) } - -type testpeer struct { - id string -} - -func (ph *testpeer) ApiFunc1(arg1 string, arg2 int) error { - fmt.Println("ApiFunc1", ph.id, arg1, arg2) - return errors.New("fake") -} - -func (ph *testpeer) ApiFunc2(arg1 string, arg2 map[string]int) (string, error) { - fmt.Println("ApiFunc2", ph.id, arg1, arg2) - return "success", nil -} - -func (ph *testpeer) ApiFunc3(arg1 float64, arg2 []int) { - fmt.Println("ApiFunc3", ph.id, arg1, arg2) -} - -func (ph *testpeer) ClientDisconnected(reason string) { -} - -type dummySessionConsumer struct { -} - -func (dsc *dummySessionConsumer) Query(string) (session.Authorization, error) { - return session.Authorization{}, nil -} -func (dsc *dummySessionConsumer) Touch(string) (session.Authorization, error) { - return session.Authorization{}, nil -} - -func TestPeerApiBroker(t *testing.T) { - handler := MakeWebsocketPeerApiHandler[*testpeer]("test") - ws := NewWebsocketPeerHandler[*testpeer](&dummySessionConsumer{}) - ws.AddHandler(handler) - - tp := &testpeer{ - id: "onlyone", +func TestUnmarshal(t *testing.T) { + src := []byte(`{"123" :"str1", "456": "str2"}`) + var test map[int]string + err := json.Unmarshal(src, &test) + if err != nil { + t.Error(err) } - func1args, _ := json.Marshal([]any{string("arg1"), int(100)}) - ws.Call(tp, "test.ApiFunc1", bytes.NewBuffer(func1args)) - - func1args, _ = json.Marshal([]any{string("arg1"), map[string]int{"arg2.key": 99}}) - ws.Call(tp, "test.ApiFunc2", bytes.NewBuffer(func1args)) - - func1args, _ = json.Marshal([]any{float64(111.1), []int{99, 98}}) - ws.Call(tp, "test.ApiFunc3", bytes.NewBuffer(func1args)) } diff --git a/wshandler/wshandler_peer.go b/wshandler/wshandler_peer.go index 3af127b..4d00945 100644 --- a/wshandler/wshandler_peer.go +++ b/wshandler/wshandler_peer.go @@ -7,44 +7,167 @@ import ( "io" "math/rand" "net/http" + "reflect" "strings" "time" "go.mongodb.org/mongo-driver/bson/primitive" - "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/session" "github.com/gorilla/websocket" ) -type WebsocketPeerHandler[T PeerInterface] struct { - WebsocketPeerApiBroker[T] +type WebsocketPeerHandler interface { + RegisterHandlers(serveMux *http.ServeMux, prefix string) error +} + +type websocketPeerHandler[T PeerInterface] struct { + methods map[string]peerApiFuncType[T] + createPeer func(primitive.ObjectID) T sessionConsumer session.Consumer } -func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer) WebsocketPeerHandler[T] { - return WebsocketPeerHandler[T]{ - sessionConsumer: consumer, +type PeerInterface interface { + ClientDisconnected(string) + ClientConnected(*websocket.Conn) +} +type peerApiFuncType[T PeerInterface] func(T, io.Reader) (any, error) + +type websocketPeerApiHandler[T PeerInterface] struct { + methods map[string]peerApiFuncType[T] + originalReceiverName string +} + +func (hc *websocketPeerHandler[T]) call(recv T, funcname string, r io.Reader) (v any, e error) { + defer func() { + r := recover() + if r != nil { + logger.Error(r) + e = fmt.Errorf("%v", r) + } + }() + + if found := hc.methods[funcname]; found != nil { + return found(recv, r) + } + + return nil, fmt.Errorf("api is not found : %s", funcname) +} + +func makeWebsocketPeerApiHandler[T PeerInterface](receiverName string) websocketPeerApiHandler[T] { + methods := make(map[string]peerApiFuncType[T]) + + var archetype T + tp := reflect.TypeOf(archetype) + if len(receiverName) == 0 { + receiverName = tp.Elem().Name() + } + + for i := 0; i < tp.NumMethod(); i++ { + method := tp.Method(i) + if method.Type.In(0) != tp { + continue + } + + if method.Name == ClientDisconnected { + continue + } + + var intypes []reflect.Type + for i := 1; i < method.Type.NumIn(); i++ { + intypes = append(intypes, method.Type.In(i)) + } + + var outconv func([]reflect.Value) (any, error) + if method.Type.NumOut() == 0 { + outconv = func([]reflect.Value) (any, error) { return nil, nil } + } else if method.Type.NumOut() == 1 { + if method.Type.Out(0).Implements(reflect.TypeOf((*error)(nil)).Elem()) { + outconv = func(out []reflect.Value) (any, error) { + if out[0].Interface() == nil { + return nil, nil + } + return nil, out[0].Interface().(error) + } + } else { + outconv = func(out []reflect.Value) (any, error) { + return out[0].Interface(), nil + } + } + } else if method.Type.NumOut() == 2 && method.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { + outconv = func(out []reflect.Value) (any, error) { + if out[1].Interface() == nil { + return out[0].Interface(), nil + } + return out[0].Interface(), out[1].Interface().(error) + } + } + + methods[receiverName+"."+method.Name] = func(recv T, r io.Reader) (any, error) { + decoder := json.NewDecoder(r) + inargs := make([]any, len(intypes)) + + for i, intype := range intypes { + zerovalueptr := reflect.New(intype) + inargs[i] = zerovalueptr.Interface() + } + + err := decoder.Decode(&inargs) + if err != nil { + return nil, err + } + + reflectargs := make([]reflect.Value, 0, len(inargs)+1) + reflectargs = append(reflectargs, reflect.ValueOf(recv)) + for _, p := range inargs { + reflectargs = append(reflectargs, reflect.ValueOf(p).Elem()) + } + + return outconv(method.Func.Call(reflectargs)) + } + } + + return websocketPeerApiHandler[T]{ + methods: methods, + originalReceiverName: tp.Elem().Name(), } } -func (ws *WebsocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, prefix string) error { - url := gocommon.MakeHttpHandlerPattern(prefix, "ws") +func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer, receiverName string, creator func(primitive.ObjectID) T) WebsocketPeerHandler { + methods := make(map[string]peerApiFuncType[T]) + receiver := makeWebsocketPeerApiHandler[T](receiverName) + + for k, v := range receiver.methods { + ab := strings.Split(k, ".") + logger.Printf("ws api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k) + methods[k] = v + } + + return &websocketPeerHandler[T]{ + sessionConsumer: consumer, + methods: methods, + createPeer: creator, + } +} + +func (ws *websocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, prefix string) error { if *noAuthFlag { - serveMux.HandleFunc(url, ws.upgrade_nosession) + serveMux.HandleFunc(prefix, ws.upgrade_nosession) } else { - serveMux.HandleFunc(url, ws.upgrade) + serveMux.HandleFunc(prefix, ws.upgrade) } return nil } -func (ws *WebsocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) { +func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) { go func(c *websocket.Conn, accid primitive.ObjectID) { - peer := ws.CreatePeer(accid) + peer := ws.createPeer(accid) var closeReason string + peer.ClientConnected(conn) + defer func() { peer.ClientDisconnected(closeReason) }() @@ -80,7 +203,7 @@ func (ws *WebsocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid prim cmd := make([]byte, size[0]) r.Read(cmd) - result, err := ws.Call(peer, string(cmd), r) + result, err := ws.call(peer, string(cmd), r) if err != nil { response[0] = 21 // 21 : Negative Ack @@ -110,14 +233,14 @@ func (ws *WebsocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid prim } else { cmd := make([]byte, flag[0]) r.Read(cmd) - ws.Call(peer, string(cmd), r) + ws.call(peer, string(cmd), r) } } } }(conn, accid) } -func (ws *WebsocketPeerHandler[T]) upgrade_nosession(w http.ResponseWriter, r *http.Request) { +func (ws *websocketPeerHandler[T]) upgrade_nosession(w http.ResponseWriter, r *http.Request) { // 클라이언트 접속 defer func() { s := recover() @@ -167,7 +290,7 @@ func (ws *WebsocketPeerHandler[T]) upgrade_nosession(w http.ResponseWriter, r *h ws.upgrade_core(conn, accid, nonce) } -func (ws *WebsocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) { +func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) { // 클라이언트 접속 defer func() { s := recover()