diff --git a/wshandler/api_handler_peer.go b/wshandler/api_handler_peer.go new file mode 100644 index 0000000..8512d63 --- /dev/null +++ b/wshandler/api_handler_peer.go @@ -0,0 +1,187 @@ +package wshandler + +import ( + "bytes" + "encoding/json" + "reflect" + "strings" + "unsafe" + + "github.com/gorilla/websocket" + "go.mongodb.org/mongo-driver/bson/primitive" + "repositories.action2quare.com/ayo/gocommon/logger" +) + +type peerApiFuncType func(any, []byte) (any, error) +type peerConnFuncType func(any, *websocket.Conn) +type peerDisconnFuncType func(any, string) + +type WebsocketPeerApiHandler struct { + methods map[string]peerApiFuncType + connfunc peerConnFuncType + disconnfunc peerDisconnFuncType + originalReceiverName string +} + +func MakeWebsocketPeerApiHandler[T any](receiverName string) WebsocketPeerApiHandler { + methods := make(map[string]peerApiFuncType) + + var archetype *T + tp := reflect.TypeOf(archetype) + if len(receiverName) == 0 { + receiverName = tp.Elem().Name() + } + + var connfunc peerConnFuncType + var disconnfunc peerDisconnFuncType + + for i := 0; i < tp.NumMethod(); i++ { + method := tp.Method(i) + if method.Type.In(0) != tp { + continue + } + + if method.Name == ClientConnected { + if method.Type.NumIn() != 2 { + continue + } + if method.Type.In(1) != reflect.TypeOf((*websocket.Conn)(nil)) { + continue + } + funcptr := method.Func.Pointer() + p1 := unsafe.Pointer(&funcptr) + p2 := unsafe.Pointer(&p1) + connfuncptr := (*func(*T, *websocket.Conn))(p2) + connfunc = func(r any, c *websocket.Conn) { + (*connfuncptr)(r.(*T), c) + } + } else if method.Name == ClientDisconnected { + if method.Type.NumIn() != 2 { + continue + } + if method.Type.In(1) != reflect.TypeOf("") { + continue + } + funcptr := method.Func.Pointer() + p1 := unsafe.Pointer(&funcptr) + p2 := unsafe.Pointer(&p1) + disconnfuncptr := (*func(*T, string))(p2) + disconnfunc = func(r any, msg string) { + (*disconnfuncptr)(r.(*T), msg) + } + } else { + var intypes []reflect.Type + for i := 1; i < method.Type.NumIn(); i++ { + intypes = append(intypes, method.Type.In(i)) + } + + var outconv func([]reflect.Value) (any, error) + if method.Type.NumOut() == 0 { + outconv = func([]reflect.Value) (any, error) { return nil, nil } + } else if method.Type.NumOut() == 1 { + if method.Type.Out(0).Implements(reflect.TypeOf((*error)(nil)).Elem()) { + outconv = func(out []reflect.Value) (any, error) { + if out[0].Interface() == nil { + return nil, nil + } + return nil, out[0].Interface().(error) + } + } else { + outconv = func(out []reflect.Value) (any, error) { + return out[0].Interface(), nil + } + } + } else if method.Type.NumOut() == 2 && method.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { + outconv = func(out []reflect.Value) (any, error) { + if out[1].Interface() == nil { + return out[0].Interface(), nil + } + return out[0].Interface(), out[1].Interface().(error) + } + } + + methods[receiverName+"."+method.Name] = func(recv any, buff []byte) (any, error) { + decoder := json.NewDecoder(bytes.NewBuffer(buff)) + inargs := make([]any, len(intypes)) + + for i, intype := range intypes { + zerovalueptr := reflect.New(intype) + inargs[i] = zerovalueptr.Interface() + } + + err := decoder.Decode(&inargs) + if err != nil { + return nil, err + } + + reflectargs := make([]reflect.Value, 0, len(inargs)+1) + reflectargs = append(reflectargs, reflect.ValueOf(recv)) + for _, p := range inargs { + reflectargs = append(reflectargs, reflect.ValueOf(p).Elem()) + } + + return outconv(method.Func.Call(reflectargs)) + } + } + } + + return WebsocketPeerApiHandler{ + methods: methods, + connfunc: connfunc, + disconnfunc: disconnfunc, + originalReceiverName: tp.Elem().Name(), + } +} + +type WebsocketPeerApiBroker struct { + methods map[string]peerApiFuncType + connFuncs []peerConnFuncType + disconnFuncs []peerDisconnFuncType + CreatePeer func(primitive.ObjectID) any +} + +func (hc *WebsocketPeerApiBroker) AddHandler(receiver WebsocketPeerApiHandler) { + if hc.methods == nil { + hc.methods = make(map[string]peerApiFuncType) + } + + for k, v := range receiver.methods { + ab := strings.Split(k, ".") + logger.Printf("ws api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k) + hc.methods[k] = v + } + + if receiver.connfunc != nil { + logger.Printf("ws api registered : %s.ClientConnected\n", receiver.originalReceiverName) + hc.connFuncs = append(hc.connFuncs, receiver.connfunc) + } + + if receiver.disconnfunc != nil { + // disconnfunc은 역순 + logger.Printf("ws api registered : %s.ClientDisconnected\n", receiver.originalReceiverName) + hc.disconnFuncs = append([]peerDisconnFuncType{receiver.disconnfunc}, hc.disconnFuncs...) + } +} + +func (hc *WebsocketPeerApiBroker) ClientConnected(recv any, c *wsconn) { + for _, v := range hc.connFuncs { + v(recv, c.Conn) + } +} + +func (hc *WebsocketPeerApiBroker) ClientDisconnected(recv any, c *wsconn) { + for _, v := range hc.disconnFuncs { + v(recv, c.closeMessage) + } +} + +func (hc *WebsocketPeerApiBroker) Call(recv any, funcname string, buff []byte) { + if found := hc.methods[funcname]; found != nil { + _, err := found(recv, buff) + if err != nil { + logger.Println("api call is failed. err :", err) + } + } else { + logger.Println("api is not found :", funcname) + } +} diff --git a/wshandler/api_handler_test.go b/wshandler/api_handler_test.go index fb0d04d..8986a73 100644 --- a/wshandler/api_handler_test.go +++ b/wshandler/api_handler_test.go @@ -2,8 +2,14 @@ package wshandler import ( + "bytes" + "encoding/json" + "errors" "fmt" + "reflect" "testing" + + "repositories.action2quare.com/ayo/gocommon/session" ) type TestReceiver struct { @@ -17,7 +23,32 @@ func (tr *TestReceiver) Func2(args []any) { fmt.Println(args...) } -func TestExpTable(t *testing.T) { +func TestUnmarshalToken(t *testing.T) { + // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 + // 012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 + insrc := []byte(`["string value 1",200,["inner string value 1","inner string value 2"],{"inner map key 1":"inner map string value 1","inner map key 2":"inner map string value 2"},500.1]`) + + dec := json.NewDecoder(bytes.NewBuffer(insrc)) + dec.Token() + + for { + token_start := dec.InputOffset() + tok, _ := dec.Token() + token_end := dec.InputOffset() + + var string_val_1 string + stringtype := reflect.TypeOf(string_val_1) + stringvalue := reflect.New(stringtype) + castptr := stringvalue.Interface() + + err := json.Unmarshal(insrc[token_start:token_end], castptr) + fmt.Println(err) + + if tok == nil { + break + } + fmt.Println(tok, dec.InputOffset()) + } // src := []any{"a", 1, false} // payload, _ := json.Marshal(src) @@ -28,3 +59,53 @@ func TestExpTable(t *testing.T) { // con.AddHandler(receiver) } + +type peerHandler struct { + id string +} + +func (ph *peerHandler) ApiFunc1(arg1 string, arg2 int) error { + fmt.Println("ApiFunc1", ph.id, arg1, arg2) + return errors.New("fake") +} + +func (ph *peerHandler) ApiFunc2(arg1 string, arg2 map[string]int) (string, error) { + fmt.Println("ApiFunc2", ph.id, arg1, arg2) + return "success", nil +} + +func (ph *peerHandler) ApiFunc3(arg1 float64, arg2 []int) { + fmt.Println("ApiFunc3", ph.id, arg1, arg2) +} + +type dummySessionConsumer struct { +} + +func (dsc *dummySessionConsumer) Query(string) (session.Authorization, error) { + return session.Authorization{}, nil +} +func (dsc *dummySessionConsumer) Touch(string) (session.Authorization, error) { + return session.Authorization{}, nil +} + +func TestPeerApiBroker(t *testing.T) { + handler := MakeWebsocketPeerApiHandler[peerHandler]("test") + ws, err := NewWebsocketPeerHandler(&dummySessionConsumer{}, "redis://192.168.8.94:6380/4") + if err != nil { + t.Error(err) + } + + ws.AddHandler(handler) + + peer := &peerHandler{ + id: "onlyone", + } + func1args, _ := json.Marshal([]any{string("arg1"), int(100)}) + ws.Call(peer, "test.ApiFunc1", func1args) + + func1args, _ = json.Marshal([]any{string("arg1"), map[string]int{"arg2.key": 99}}) + ws.Call(peer, "test.ApiFunc2", func1args) + + func1args, _ = json.Marshal([]any{float64(111.1), []int{99, 98}}) + ws.Call(peer, "test.ApiFunc3", func1args) +} diff --git a/wshandler/config.json b/wshandler/config.json new file mode 100644 index 0000000..9e26dfe --- /dev/null +++ b/wshandler/config.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/wshandler/wshandler.go b/wshandler/wshandler.go index fcaf7bd..a6758a4 100644 --- a/wshandler/wshandler.go +++ b/wshandler/wshandler.go @@ -86,9 +86,7 @@ type send_msg_queue_elem struct { msg []byte } -type WebsocketHandler struct { - WebsocketApiBroker - +type websocketHandlerBase struct { redisMsgChanName string redisCmdChanName string redisSync *redis.Client @@ -101,6 +99,16 @@ type WebsocketHandler struct { sessionConsumer session.Consumer } +type WebsocketHandler struct { + WebsocketApiBroker + *websocketHandlerBase +} + +type WebsocketPeerHandler struct { + WebsocketPeerApiBroker + *websocketHandlerBase +} + type wsConfig struct { gocommon.StorageAddr `json:"storage"` } @@ -116,7 +124,7 @@ func init() { gob.Register([]any{}) } -func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) { +func makeWebsocketHandlerBase(consumer session.Consumer, redisUrl string) (*websocketHandlerBase, error) { var config wsConfig if err := gocommon.LoadConfig(&config); err != nil { return nil, err @@ -151,7 +159,7 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket } }() - return &WebsocketHandler{ + return &websocketHandlerBase{ redisMsgChanName: fmt.Sprintf("_wsh_msg_%d", redisSync.Options().DB), redisCmdChanName: fmt.Sprintf("_wsh_cmd_%d", redisSync.Options().DB), redisSync: redisSync, @@ -163,6 +171,28 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket }, nil } +func NewWebsocketPeerHandler(consumer session.Consumer, redisUrl string) (*WebsocketPeerHandler, error) { + base, err := makeWebsocketHandlerBase(consumer, redisUrl) + if err != nil { + return nil, err + } + + return &WebsocketPeerHandler{ + websocketHandlerBase: base, + }, nil +} + +func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) { + base, err := makeWebsocketHandlerBase(consumer, redisUrl) + if err != nil { + return nil, err + } + + return &WebsocketHandler{ + websocketHandlerBase: base, + }, nil +} + func (ws *WebsocketHandler) Start(ctx context.Context) { ws.connWaitGroup.Add(1) go ws.mainLoop(ctx)