로그 변경

peer handler 코드 정리
This commit is contained in:
2023-12-22 11:36:55 +09:00
parent 00f4cab992
commit 08802176cb
3 changed files with 141 additions and 192 deletions

View File

@ -1,126 +0,0 @@
package wshandler
import (
"encoding/json"
"fmt"
"io"
"reflect"
"strings"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon/logger"
)
type PeerInterface interface {
ClientDisconnected(string)
}
type peerApiFuncType[T PeerInterface] func(T, io.Reader) (any, error)
type WebsocketPeerApiHandler[T PeerInterface] struct {
methods map[string]peerApiFuncType[T]
originalReceiverName string
}
func MakeWebsocketPeerApiHandler[T PeerInterface](receiverName string) WebsocketPeerApiHandler[T] {
methods := make(map[string]peerApiFuncType[T])
var archetype T
tp := reflect.TypeOf(archetype)
if len(receiverName) == 0 {
receiverName = tp.Elem().Name()
}
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
if method.Type.In(0) != tp {
continue
}
if method.Name == ClientDisconnected {
continue
}
var intypes []reflect.Type
for i := 1; i < method.Type.NumIn(); i++ {
intypes = append(intypes, method.Type.In(i))
}
var outconv func([]reflect.Value) (any, error)
if method.Type.NumOut() == 0 {
outconv = func([]reflect.Value) (any, error) { return nil, nil }
} else if method.Type.NumOut() == 1 {
if method.Type.Out(0).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
outconv = func(out []reflect.Value) (any, error) {
if out[0].Interface() == nil {
return nil, nil
}
return nil, out[0].Interface().(error)
}
} else {
outconv = func(out []reflect.Value) (any, error) {
return out[0].Interface(), nil
}
}
} else if method.Type.NumOut() == 2 && method.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
outconv = func(out []reflect.Value) (any, error) {
if out[1].Interface() == nil {
return out[0].Interface(), nil
}
return out[0].Interface(), out[1].Interface().(error)
}
}
methods[receiverName+"."+method.Name] = func(recv T, r io.Reader) (any, error) {
decoder := json.NewDecoder(r)
inargs := make([]any, len(intypes))
for i, intype := range intypes {
zerovalueptr := reflect.New(intype)
inargs[i] = zerovalueptr.Interface()
}
err := decoder.Decode(&inargs)
if err != nil {
return nil, err
}
reflectargs := make([]reflect.Value, 0, len(inargs)+1)
reflectargs = append(reflectargs, reflect.ValueOf(recv))
for _, p := range inargs {
reflectargs = append(reflectargs, reflect.ValueOf(p).Elem())
}
return outconv(method.Func.Call(reflectargs))
}
}
return WebsocketPeerApiHandler[T]{
methods: methods,
originalReceiverName: tp.Elem().Name(),
}
}
type WebsocketPeerApiBroker[T PeerInterface] struct {
methods map[string]peerApiFuncType[T]
CreatePeer func(primitive.ObjectID) T
}
func (hc *WebsocketPeerApiBroker[T]) AddHandler(receiver WebsocketPeerApiHandler[T]) {
if hc.methods == nil {
hc.methods = make(map[string]peerApiFuncType[T])
}
for k, v := range receiver.methods {
ab := strings.Split(k, ".")
logger.Printf("ws api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k)
hc.methods[k] = v
}
}
func (hc *WebsocketPeerApiBroker[T]) Call(recv T, funcname string, r io.Reader) (any, error) {
if found := hc.methods[funcname]; found != nil {
return found(recv, r)
}
return nil, fmt.Errorf("api is not found : %s", funcname)
}

View File

@ -4,12 +4,9 @@ package wshandler
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"reflect" "reflect"
"testing" "testing"
"repositories.action2quare.com/ayo/gocommon/session"
) )
type TestReceiver struct { type TestReceiver struct {
@ -59,52 +56,11 @@ func TestUnmarshalToken(t *testing.T) {
// con.AddHandler(receiver) // con.AddHandler(receiver)
} }
func TestUnmarshal(t *testing.T) {
type testpeer struct { src := []byte(`{"123" :"str1", "456": "str2"}`)
id string var test map[int]string
} err := json.Unmarshal(src, &test)
if err != nil {
func (ph *testpeer) ApiFunc1(arg1 string, arg2 int) error { t.Error(err)
fmt.Println("ApiFunc1", ph.id, arg1, arg2)
return errors.New("fake")
}
func (ph *testpeer) ApiFunc2(arg1 string, arg2 map[string]int) (string, error) {
fmt.Println("ApiFunc2", ph.id, arg1, arg2)
return "success", nil
}
func (ph *testpeer) ApiFunc3(arg1 float64, arg2 []int) {
fmt.Println("ApiFunc3", ph.id, arg1, arg2)
}
func (ph *testpeer) ClientDisconnected(reason string) {
}
type dummySessionConsumer struct {
}
func (dsc *dummySessionConsumer) Query(string) (session.Authorization, error) {
return session.Authorization{}, nil
}
func (dsc *dummySessionConsumer) Touch(string) (session.Authorization, error) {
return session.Authorization{}, nil
}
func TestPeerApiBroker(t *testing.T) {
handler := MakeWebsocketPeerApiHandler[*testpeer]("test")
ws := NewWebsocketPeerHandler[*testpeer](&dummySessionConsumer{})
ws.AddHandler(handler)
tp := &testpeer{
id: "onlyone",
} }
func1args, _ := json.Marshal([]any{string("arg1"), int(100)})
ws.Call(tp, "test.ApiFunc1", bytes.NewBuffer(func1args))
func1args, _ = json.Marshal([]any{string("arg1"), map[string]int{"arg2.key": 99}})
ws.Call(tp, "test.ApiFunc2", bytes.NewBuffer(func1args))
func1args, _ = json.Marshal([]any{float64(111.1), []int{99, 98}})
ws.Call(tp, "test.ApiFunc3", bytes.NewBuffer(func1args))
} }

View File

@ -7,44 +7,163 @@ import (
"io" "io"
"math/rand" "math/rand"
"net/http" "net/http"
"reflect"
"strings" "strings"
"time" "time"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/session" "repositories.action2quare.com/ayo/gocommon/session"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
type WebsocketPeerHandler[T PeerInterface] struct { type WebsocketPeerHandler interface {
WebsocketPeerApiBroker[T] RegisterHandlers(serveMux *http.ServeMux, prefix string) error
}
type websocketPeerHandler[T PeerInterface] struct {
methods map[string]peerApiFuncType[T]
createPeer func(primitive.ObjectID) T
sessionConsumer session.Consumer sessionConsumer session.Consumer
} }
func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer) WebsocketPeerHandler[T] { type PeerInterface interface {
return WebsocketPeerHandler[T]{ ClientDisconnected(string)
sessionConsumer: consumer, ClientConnected(*websocket.Conn)
}
type peerApiFuncType[T PeerInterface] func(T, io.Reader) (any, error)
type websocketPeerApiHandler[T PeerInterface] struct {
methods map[string]peerApiFuncType[T]
originalReceiverName string
}
func (hc *websocketPeerHandler[T]) call(recv T, funcname string, r io.Reader) (v any, e error) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
e = fmt.Errorf("%v", r)
}
}()
if found := hc.methods[funcname]; found != nil {
return found(recv, r)
}
return nil, fmt.Errorf("api is not found : %s", funcname)
}
func makeWebsocketPeerApiHandler[T PeerInterface]() websocketPeerApiHandler[T] {
methods := make(map[string]peerApiFuncType[T])
var archetype T
tp := reflect.TypeOf(archetype)
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
if method.Type.In(0) != tp {
continue
}
if method.Name == ClientDisconnected {
continue
}
var intypes []reflect.Type
for i := 1; i < method.Type.NumIn(); i++ {
intypes = append(intypes, method.Type.In(i))
}
var outconv func([]reflect.Value) (any, error)
if method.Type.NumOut() == 0 {
outconv = func([]reflect.Value) (any, error) { return nil, nil }
} else if method.Type.NumOut() == 1 {
if method.Type.Out(0).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
outconv = func(out []reflect.Value) (any, error) {
if out[0].Interface() == nil {
return nil, nil
}
return nil, out[0].Interface().(error)
}
} else {
outconv = func(out []reflect.Value) (any, error) {
return out[0].Interface(), nil
}
}
} else if method.Type.NumOut() == 2 && method.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
outconv = func(out []reflect.Value) (any, error) {
if out[1].Interface() == nil {
return out[0].Interface(), nil
}
return out[0].Interface(), out[1].Interface().(error)
}
}
methods[method.Name] = func(recv T, r io.Reader) (any, error) {
decoder := json.NewDecoder(r)
inargs := make([]any, len(intypes))
for i, intype := range intypes {
zerovalueptr := reflect.New(intype)
inargs[i] = zerovalueptr.Interface()
}
err := decoder.Decode(&inargs)
if err != nil {
return nil, err
}
reflectargs := make([]reflect.Value, 0, len(inargs)+1)
reflectargs = append(reflectargs, reflect.ValueOf(recv))
for _, p := range inargs {
reflectargs = append(reflectargs, reflect.ValueOf(p).Elem())
}
return outconv(method.Func.Call(reflectargs))
}
}
return websocketPeerApiHandler[T]{
methods: methods,
originalReceiverName: tp.Elem().Name(),
} }
} }
func (ws *WebsocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, prefix string) error { func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer, creator func(primitive.ObjectID) T) WebsocketPeerHandler {
url := gocommon.MakeHttpHandlerPattern(prefix, "ws") methods := make(map[string]peerApiFuncType[T])
receiver := makeWebsocketPeerApiHandler[T]()
for k, v := range receiver.methods {
logger.Printf("ws api registered : %s.%s\n", receiver.originalReceiverName, k)
methods[k] = v
}
return &websocketPeerHandler[T]{
sessionConsumer: consumer,
methods: methods,
createPeer: creator,
}
}
func (ws *websocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, prefix string) error {
if *noAuthFlag { if *noAuthFlag {
serveMux.HandleFunc(url, ws.upgrade_nosession) serveMux.HandleFunc(prefix, ws.upgrade_nosession)
} else { } else {
serveMux.HandleFunc(url, ws.upgrade) serveMux.HandleFunc(prefix, ws.upgrade)
} }
return nil return nil
} }
func (ws *WebsocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) { func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) {
go func(c *websocket.Conn, accid primitive.ObjectID) { go func(c *websocket.Conn, accid primitive.ObjectID) {
peer := ws.CreatePeer(accid) peer := ws.createPeer(accid)
var closeReason string var closeReason string
peer.ClientConnected(conn)
defer func() { defer func() {
peer.ClientDisconnected(closeReason) peer.ClientDisconnected(closeReason)
}() }()
@ -80,7 +199,7 @@ func (ws *WebsocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid prim
cmd := make([]byte, size[0]) cmd := make([]byte, size[0])
r.Read(cmd) r.Read(cmd)
result, err := ws.Call(peer, string(cmd), r) result, err := ws.call(peer, string(cmd), r)
if err != nil { if err != nil {
response[0] = 21 // 21 : Negative Ack response[0] = 21 // 21 : Negative Ack
@ -110,14 +229,14 @@ func (ws *WebsocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid prim
} else { } else {
cmd := make([]byte, flag[0]) cmd := make([]byte, flag[0])
r.Read(cmd) r.Read(cmd)
ws.Call(peer, string(cmd), r) ws.call(peer, string(cmd), r)
} }
} }
} }
}(conn, accid) }(conn, accid)
} }
func (ws *WebsocketPeerHandler[T]) upgrade_nosession(w http.ResponseWriter, r *http.Request) { func (ws *websocketPeerHandler[T]) upgrade_nosession(w http.ResponseWriter, r *http.Request) {
// 클라이언트 접속 // 클라이언트 접속
defer func() { defer func() {
s := recover() s := recover()
@ -167,7 +286,7 @@ func (ws *WebsocketPeerHandler[T]) upgrade_nosession(w http.ResponseWriter, r *h
ws.upgrade_core(conn, accid, nonce) ws.upgrade_core(conn, accid, nonce)
} }
func (ws *WebsocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) { func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) {
// 클라이언트 접속 // 클라이언트 접속
defer func() { defer func() {
s := recover() s := recover()