peer 코드 정리

This commit is contained in:
2023-12-23 15:35:10 +09:00
parent c281bf1cfa
commit 02512a48cf
2 changed files with 14 additions and 65 deletions

View File

@ -4,13 +4,9 @@ package wshandler
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"reflect" "reflect"
"testing" "testing"
"github.com/gorilla/websocket"
"repositories.action2quare.com/ayo/gocommon/session"
) )
type TestReceiver struct { type TestReceiver struct {
@ -60,57 +56,6 @@ func TestUnmarshalToken(t *testing.T) {
// con.AddHandler(receiver) // con.AddHandler(receiver)
} }
type testpeer struct {
id string
}
func (ph *testpeer) ApiFunc1(arg1 string, arg2 int) error {
fmt.Println("ApiFunc1", ph.id, arg1, arg2)
return errors.New("fake")
}
func (ph *testpeer) ApiFunc2(arg1 string, arg2 map[string]int) (string, error) {
fmt.Println("ApiFunc2", ph.id, arg1, arg2)
return "success", nil
}
func (ph *testpeer) ApiFunc3(arg1 float64, arg2 []int) {
fmt.Println("ApiFunc3", ph.id, arg1, arg2)
}
func (ph *testpeer) ClientDisconnected(reason string) {
}
func (ph *testpeer) ClientConnected(*websocket.Conn) {
}
type dummySessionConsumer struct {
}
func (dsc *dummySessionConsumer) Query(string) (session.Authorization, error) {
return session.Authorization{}, nil
}
func (dsc *dummySessionConsumer) Touch(string) (session.Authorization, error) {
return session.Authorization{}, nil
}
func TestPeerApiBroker(t *testing.T) {
ws := NewWebsocketPeerHandler[*testpeer](&dummySessionConsumer{}, "test")
tp := &testpeer{
id: "onlyone",
}
func1args, _ := json.Marshal([]any{string("arg1"), int(100)})
ws.Call(tp, "test.ApiFunc1", bytes.NewBuffer(func1args))
func1args, _ = json.Marshal([]any{string("arg1"), map[string]int{"arg2.key": 99}})
ws.Call(tp, "test.ApiFunc2", bytes.NewBuffer(func1args))
func1args, _ = json.Marshal([]any{float64(111.1), []int{99, 98}})
ws.Call(tp, "test.ApiFunc3", bytes.NewBuffer(func1args))
}
func TestUnmarshal(t *testing.T) { func TestUnmarshal(t *testing.T) {
src := []byte(`{"123" :"str1", "456": "str2"}`) src := []byte(`{"123" :"str1", "456": "str2"}`)
var test map[int]string var test map[int]string

View File

@ -19,7 +19,11 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
type WebsocketPeerHandler[T PeerInterface] struct { type WebsocketPeerHandler interface {
RegisterHandlers(serveMux *http.ServeMux, prefix string) error
}
type websocketPeerHandler[T PeerInterface] struct {
methods map[string]peerApiFuncType[T] methods map[string]peerApiFuncType[T]
CreatePeer func(primitive.ObjectID) T CreatePeer func(primitive.ObjectID) T
sessionConsumer session.Consumer sessionConsumer session.Consumer
@ -36,7 +40,7 @@ type websocketPeerApiHandler[T PeerInterface] struct {
originalReceiverName string originalReceiverName string
} }
func (hc *WebsocketPeerHandler[T]) Call(recv T, funcname string, r io.Reader) (v any, e error) { func (hc *websocketPeerHandler[T]) call(recv T, funcname string, r io.Reader) (v any, e error) {
defer func() { defer func() {
r := recover() r := recover()
if r != nil { if r != nil {
@ -131,7 +135,7 @@ func makeWebsocketPeerApiHandler[T PeerInterface](receiverName string) websocket
} }
} }
func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer, receiverName string) WebsocketPeerHandler[T] { func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer, receiverName string) WebsocketPeerHandler {
methods := make(map[string]peerApiFuncType[T]) methods := make(map[string]peerApiFuncType[T])
receiver := makeWebsocketPeerApiHandler[T](receiverName) receiver := makeWebsocketPeerApiHandler[T](receiverName)
@ -141,13 +145,13 @@ func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer, receive
methods[k] = v methods[k] = v
} }
return WebsocketPeerHandler[T]{ return &websocketPeerHandler[T]{
sessionConsumer: consumer, sessionConsumer: consumer,
methods: methods, methods: methods,
} }
} }
func (ws *WebsocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, prefix string) error { func (ws *websocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, prefix string) error {
url := gocommon.MakeHttpHandlerPattern(prefix, "ws") url := gocommon.MakeHttpHandlerPattern(prefix, "ws")
if *noAuthFlag { if *noAuthFlag {
serveMux.HandleFunc(url, ws.upgrade_nosession) serveMux.HandleFunc(url, ws.upgrade_nosession)
@ -158,7 +162,7 @@ func (ws *WebsocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, pre
return nil return nil
} }
func (ws *WebsocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) { func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) {
go func(c *websocket.Conn, accid primitive.ObjectID) { go func(c *websocket.Conn, accid primitive.ObjectID) {
peer := ws.CreatePeer(accid) peer := ws.CreatePeer(accid)
var closeReason string var closeReason string
@ -200,7 +204,7 @@ func (ws *WebsocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid prim
cmd := make([]byte, size[0]) cmd := make([]byte, size[0])
r.Read(cmd) r.Read(cmd)
result, err := ws.Call(peer, string(cmd), r) result, err := ws.call(peer, string(cmd), r)
if err != nil { if err != nil {
response[0] = 21 // 21 : Negative Ack response[0] = 21 // 21 : Negative Ack
@ -230,14 +234,14 @@ func (ws *WebsocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid prim
} else { } else {
cmd := make([]byte, flag[0]) cmd := make([]byte, flag[0])
r.Read(cmd) r.Read(cmd)
ws.Call(peer, string(cmd), r) ws.call(peer, string(cmd), r)
} }
} }
} }
}(conn, accid) }(conn, accid)
} }
func (ws *WebsocketPeerHandler[T]) upgrade_nosession(w http.ResponseWriter, r *http.Request) { func (ws *websocketPeerHandler[T]) upgrade_nosession(w http.ResponseWriter, r *http.Request) {
// 클라이언트 접속 // 클라이언트 접속
defer func() { defer func() {
s := recover() s := recover()
@ -287,7 +291,7 @@ func (ws *WebsocketPeerHandler[T]) upgrade_nosession(w http.ResponseWriter, r *h
ws.upgrade_core(conn, accid, nonce) ws.upgrade_core(conn, accid, nonce)
} }
func (ws *WebsocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) { func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) {
// 클라이언트 접속 // 클라이언트 접속
defer func() { defer func() {
s := recover() s := recover()