코드 정리 및 websocket도 http와 비슷하게 api handler로 통일

This commit is contained in:
2023-09-08 11:35:57 +09:00
parent eb54fa2e3a
commit 6cbf32c386
10 changed files with 216 additions and 130 deletions

2
go.mod
View File

@ -2,8 +2,6 @@ module repositories.action2quare.com/ayo/gocommon
go 1.20 go 1.20
replace repositories.action2quare.com/ayo/gocommon => ./
require ( require (
github.com/go-redis/redis/v8 v8.11.5 github.com/go-redis/redis/v8 v8.11.5
github.com/golang-jwt/jwt v3.2.2+incompatible github.com/golang-jwt/jwt v3.2.2+incompatible

View File

@ -195,6 +195,17 @@ func (rh *RedisonHandler) JSONGetString(key, path string) ([]string, error) {
return respToArray[string](rh.JSONResp(key, path)) return respToArray[string](rh.JSONResp(key, path))
} }
func (rh *RedisonHandler) JSONGetDocuments(key, path string) ([]map[string]any, error) {
resp, err := rh.JSONGet(key, path)
if err != nil && err != redis.Nil {
return nil, err
}
var objs []map[string]any
err = json.Unmarshal([]byte(resp.(string)), &objs)
return objs, err
}
func (rh *RedisonHandler) JSONGetInt64(key, path string) ([]int64, error) { func (rh *RedisonHandler) JSONGetInt64(key, path string) ([]int64, error) {
return respToArray[int64](rh.JSONResp(key, path)) return respToArray[int64](rh.JSONResp(key, path))
} }

View File

@ -618,11 +618,11 @@ func MakeHttpRequestForLogging(r *http.Request) *http.Request {
} }
type apiFuncType func(http.ResponseWriter, *http.Request) type apiFuncType func(http.ResponseWriter, *http.Request)
type HttpApiReceiver struct { type HttpApiHandler struct {
methods map[string]apiFuncType methods map[string]apiFuncType
} }
func MakeHttpApiReceiver[T any](receiver *T, receiverName string) HttpApiReceiver { func MakeHttpApiHandler[T any](receiver *T, receiverName string) HttpApiHandler {
methods := make(map[string]apiFuncType) methods := make(map[string]apiFuncType)
tp := reflect.TypeOf(receiver) tp := reflect.TypeOf(receiver)
@ -662,7 +662,7 @@ func MakeHttpApiReceiver[T any](receiver *T, receiverName string) HttpApiReceive
} }
} }
return HttpApiReceiver{ return HttpApiHandler{
methods: methods, methods: methods,
} }
} }
@ -671,7 +671,7 @@ type HttpApiHandlerContainer struct {
methods map[string]apiFuncType methods map[string]apiFuncType
} }
func (hc *HttpApiHandlerContainer) RegistReceiver(receiver HttpApiReceiver) { func (hc *HttpApiHandlerContainer) RegisterApiHandler(receiver HttpApiHandler) {
if hc.methods == nil { if hc.methods == nil {
hc.methods = make(map[string]apiFuncType) hc.methods = make(map[string]apiFuncType)
} }

View File

@ -10,12 +10,6 @@ import (
"time" "time"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
)
const (
communication_channel_name_prefix = "_sess_comm_chan_name"
session_collection_name = gocommon.CollectionName("session")
) )
type Authorization struct { type Authorization struct {

View File

@ -33,13 +33,6 @@ func (c *consumer_common[T]) add_internal(sk storagekey, si T) {
delete(c.stages[1].deleted, sk) delete(c.stages[1].deleted, sk)
} }
func (c *consumer_common[T]) add(sk storagekey, si T) {
c.lock.Lock()
defer c.lock.Unlock()
c.add_internal(sk, si)
}
func (c *consumer_common[T]) delete_internal(sk storagekey) { func (c *consumer_common[T]) delete_internal(sk storagekey) {
delete(c.stages[0].cache, sk) delete(c.stages[0].cache, sk)
c.stages[0].deleted[sk] = true c.stages[0].deleted[sk] = true

View File

@ -12,6 +12,10 @@ import (
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
) )
const (
session_collection_name = gocommon.CollectionName("session")
)
type provider_mongo struct { type provider_mongo struct {
mongoClient gocommon.MongoClient mongoClient gocommon.MongoClient
} }

View File

@ -2,6 +2,7 @@ package session
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
@ -11,6 +12,10 @@ import (
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
) )
const (
communication_channel_name_prefix = "_sess_comm_chan_name"
)
type sessionRedis struct { type sessionRedis struct {
*Authorization *Authorization
expireAt time.Time expireAt time.Time
@ -18,7 +23,6 @@ type sessionRedis struct {
type provider_redis struct { type provider_redis struct {
redisClient *redis.Client redisClient *redis.Client
updateChannel string
deleteChannel string deleteChannel string
ttl time.Duration ttl time.Duration
ctx context.Context ctx context.Context
@ -32,8 +36,7 @@ func newProviderWithRedis(ctx context.Context, redisUrl string, ttl time.Duratio
return &provider_redis{ return &provider_redis{
redisClient: redisClient, redisClient: redisClient,
updateChannel: communication_channel_name_prefix + "_u", deleteChannel: fmt.Sprintf("%s_%d", communication_channel_name_prefix, redisClient.Options().DB),
deleteChannel: communication_channel_name_prefix + "_d",
ttl: ttl, ttl: ttl,
ctx: ctx, ctx: ctx,
}, nil }, nil
@ -51,7 +54,6 @@ func (p *provider_redis) New(input *Authorization) (string, error) {
return "", err return "", err
} }
_, err = p.redisClient.Publish(p.ctx, p.updateChannel, string(sk)).Result()
return string(storagekey_to_publickey(sk)), err return string(storagekey_to_publickey(sk)), err
} }
@ -122,9 +124,8 @@ func newConsumerWithRedis(ctx context.Context, redisUrl string, ttl time.Duratio
redisClient: redisClient, redisClient: redisClient,
} }
updateChannel := communication_channel_name_prefix + "_u"
deleteChannel := communication_channel_name_prefix + "_d" deleteChannel := communication_channel_name_prefix + "_d"
sub := redisClient.Subscribe(ctx, updateChannel, deleteChannel) sub := redisClient.Subscribe(ctx, deleteChannel)
go func() { go func() {
stageswitch := time.Now().Add(ttl) stageswitch := time.Now().Add(ttl)
@ -151,20 +152,6 @@ func newConsumerWithRedis(ctx context.Context, redisUrl string, ttl time.Duratio
} }
switch msg.Channel { switch msg.Channel {
case updateChannel:
sk := storagekey(msg.Payload)
raw, err := redisClient.Get(ctx, string(sk)).Result()
if err != nil {
logger.Println(err)
} else if len(raw) > 0 {
var si Authorization
if bson.Unmarshal([]byte(raw), &si) == nil {
consumer.add(sk, &sessionRedis{
Authorization: &si,
expireAt: time.Now().Add(consumer.ttl),
})
}
}
case deleteChannel: case deleteChannel:
sk := storagekey(msg.Payload) sk := storagekey(msg.Payload)
consumer.delete(sk) consumer.delete(sk)

139
wshandler/api_handler.go Normal file
View File

@ -0,0 +1,139 @@
package wshandler
import (
"encoding/json"
"io"
"reflect"
"unsafe"
"repositories.action2quare.com/ayo/gocommon/logger"
)
const (
ClientConnected = "ClientConnected"
ClientDisconnected = "ClientDisconnected"
)
type apiFuncType func(ApiCallContext)
type WebsocketApiHandler struct {
methods map[string]apiFuncType
connfunc apiFuncType
disconnfunc apiFuncType
}
type ApiCallContext struct {
CallBy *Sender
Arguments []any
}
func MakeWebsocketApiHandler[T any](receiver *T, receiverName string) WebsocketApiHandler {
methods := make(map[string]apiFuncType)
tp := reflect.TypeOf(receiver)
if len(receiverName) == 0 {
receiverName = tp.Elem().Name()
}
var connfunc apiFuncType
var disconnfunc apiFuncType
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
if method.Type.NumIn() != 2 {
continue
}
if method.Type.In(0) != tp {
continue
}
if method.Type.In(1) != reflect.TypeOf((*ApiCallContext)(nil)).Elem() {
continue
}
funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1)
testfunc := (*func(*T, ApiCallContext))(p2)
if method.Name == ClientConnected {
connfunc = func(ctx ApiCallContext) {
(*testfunc)(receiver, ctx)
}
} else if method.Name == ClientDisconnected {
disconnfunc = func(ctx ApiCallContext) {
(*testfunc)(receiver, ctx)
}
} else {
methods[receiverName+"."+method.Name] = func(ctx ApiCallContext) {
(*testfunc)(receiver, ctx)
}
}
}
return WebsocketApiHandler{
methods: methods,
connfunc: connfunc,
disconnfunc: disconnfunc,
}
}
type WebsocketApiBroker struct {
methods map[string]apiFuncType
connFuncs []apiFuncType
disconnFuncs []apiFuncType
}
func (hc *WebsocketApiBroker) AddHandler(receiver WebsocketApiHandler) {
if hc.methods == nil {
hc.methods = make(map[string]apiFuncType)
}
for k, v := range receiver.methods {
logger.Println("http api registered :", k)
hc.methods[k] = v
}
if receiver.connfunc != nil {
hc.connFuncs = append(hc.connFuncs, receiver.connfunc)
}
if receiver.disconnfunc != nil {
// disconnfunc은 역순
hc.disconnFuncs = append([]apiFuncType{receiver.disconnfunc}, hc.disconnFuncs...)
}
}
func (hc *WebsocketApiBroker) Call(callby *Sender, funcname string, r io.Reader) {
if funcname == ClientConnected {
for _, v := range hc.connFuncs {
v(ApiCallContext{
CallBy: callby,
Arguments: nil,
})
}
} else if funcname == ClientDisconnected {
for _, v := range hc.disconnFuncs {
v(ApiCallContext{
CallBy: callby,
Arguments: nil,
})
}
} else if found := hc.methods[funcname]; found != nil {
var args []any
if r != nil {
dec := json.NewDecoder(r)
if err := dec.Decode(&args); err != nil {
logger.Println("WebsocketApiBroker.Call failed. decode returns err :", err)
}
}
found(ApiCallContext{
CallBy: callby,
Arguments: args,
})
} else {
logger.Println("api is not found :", funcname)
}
}

View File

@ -0,0 +1,30 @@
// package main ...
package wshandler
import (
"fmt"
"testing"
)
type TestReceiver struct {
}
func (tr *TestReceiver) Func1([]any) {
}
func (tr *TestReceiver) Func2(args []any) {
fmt.Println(args...)
}
func TestExpTable(t *testing.T) {
// src := []any{"a", 1, false}
// payload, _ := json.Marshal(src)
// tr := new(TestReceiver)
// receiver := MakeWebsocketApiHandler(tr, "test")
// var con WebsocketApiBroker
// con.AddHandler(receiver)
}

View File

@ -90,30 +90,11 @@ const (
CloseMessage = WebSocketMessageType(websocket.CloseMessage) CloseMessage = WebSocketMessageType(websocket.CloseMessage)
PingMessage = WebSocketMessageType(websocket.PingMessage) PingMessage = WebSocketMessageType(websocket.PingMessage)
PongMessage = WebSocketMessageType(websocket.PongMessage) PongMessage = WebSocketMessageType(websocket.PongMessage)
Connected = WebSocketMessageType(100)
Disconnected = WebSocketMessageType(101)
) )
type Sender struct { type Sender struct {
Accid primitive.ObjectID Accid primitive.ObjectID
Alias string Alias string
disconnectedCallbacks map[string]func()
}
func (s *Sender) RegistDisconnectedCallback(name string, f func()) (old func()) {
if s.disconnectedCallbacks == nil {
s.disconnectedCallbacks = make(map[string]func())
}
old = s.disconnectedCallbacks[name]
s.disconnectedCallbacks[name] = f
return
}
func (s *Sender) PopDisconnectedCallback(name string) func() {
old := s.disconnectedCallbacks[name]
delete(s.disconnectedCallbacks, name)
return old
} }
type EventReceiver interface { type EventReceiver interface {
@ -136,9 +117,9 @@ type WebsocketHandler struct {
deliveryChan chan any deliveryChan chan any
localDeliveryChan chan any localDeliveryChan chan any
sendMsgChan chan send_msg_queue_elem sendMsgChan chan send_msg_queue_elem
callReceiver EventReceiver
wsApiBroker WebsocketApiBroker
connWaitGroup sync.WaitGroup connWaitGroup sync.WaitGroup
receiverChain []EventReceiver
sessionConsumer session.Consumer sessionConsumer session.Consumer
} }
@ -156,13 +137,13 @@ func init() {
gob.Register([]any{}) gob.Register([]any{})
} }
func NewWebsocketHandler(consumer session.Consumer) (*WebsocketHandler, error) { func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) {
var config wsConfig var config wsConfig
if err := gocommon.LoadConfig(&config); err != nil { if err := gocommon.LoadConfig(&config); err != nil {
return nil, err return nil, err
} }
redisSync, err := gocommon.NewRedisClient(config.Redis["wshandler"]) redisSync, err := gocommon.NewRedisClient(redisUrl)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -196,49 +177,11 @@ func NewWebsocketHandler(consumer session.Consumer) (*WebsocketHandler, error) {
}, nil }, nil
} }
func (ws *WebsocketHandler) RegisterReceiver(receiver EventReceiver) { func (ws *WebsocketHandler) RegisterApiHandler(handler WebsocketApiHandler) {
ws.receiverChain = append(ws.receiverChain, receiver) ws.wsApiBroker.AddHandler(handler)
}
type nilReceiver struct{}
func (r *nilReceiver) OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) {
}
func (r *nilReceiver) OnRoomCreated(name string) {}
func (r *nilReceiver) OnRoomDestroyed(name string) {}
type chainReceiver struct {
chain []EventReceiver
}
func (r *chainReceiver) OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) {
for _, cr := range r.chain {
cr.OnClientMessageReceived(sender, messageType, body)
}
}
func (r *chainReceiver) OnRoomCreated(name string) {
for _, cr := range r.chain {
cr.OnRoomCreated(name)
}
}
func (r *chainReceiver) OnRoomDestroyed(name string) {
for _, cr := range r.chain {
cr.OnRoomDestroyed(name)
}
} }
func (ws *WebsocketHandler) Start(ctx context.Context) { func (ws *WebsocketHandler) Start(ctx context.Context) {
chain := ws.receiverChain
if len(chain) == 0 {
ws.callReceiver = &nilReceiver{}
} else if len(chain) == 1 {
ws.callReceiver = chain[0]
} else {
ws.callReceiver = &chainReceiver{chain: ws.receiverChain}
}
ws.connWaitGroup.Add(1) ws.connWaitGroup.Add(1)
go ws.mainLoop(ctx) go ws.mainLoop(ctx)
} }
@ -334,19 +277,14 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
room = makeRoom(name, roomDestroyChan, ws.sendMsgChan) room = makeRoom(name, roomDestroyChan, ws.sendMsgChan)
rooms[name] = room rooms[name] = room
room.start(ctx) room.start(ctx)
go ws.callReceiver.OnRoomCreated(name) //go ws.callReceiver.OnRoomCreated(name)
} }
return room return room
} }
defer func() { defer func() {
for _, conn := range entireConns { for _, conn := range entireConns {
var roomnames []string ws.wsApiBroker.Call(conn.sender, ClientDisconnected, nil)
for _, room := range conn.joinedRooms {
roomnames = append(roomnames, room.name)
}
bt, _ := json.Marshal(roomnames)
ws.callReceiver.OnClientMessageReceived(conn.sender, Disconnected, bytes.NewBuffer(bt))
conn.Close() conn.Close()
} }
}() }()
@ -442,7 +380,7 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
case destroyedRoom := <-roomDestroyChan: case destroyedRoom := <-roomDestroyChan:
delete(rooms, destroyedRoom) delete(rooms, destroyedRoom)
go ws.callReceiver.OnRoomDestroyed(destroyedRoom) //go ws.callReceiver.OnRoomDestroyed(destroyedRoom)
case usermsg := <-ws.localDeliveryChan: case usermsg := <-ws.localDeliveryChan:
// 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널 // 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널
@ -509,18 +447,14 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
case c := <-ws.connInOutChan: case c := <-ws.connInOutChan:
if c.Conn == nil { if c.Conn == nil {
delete(entireConns, c.sender.Accid.Hex()) delete(entireConns, c.sender.Accid.Hex())
var roomnames []string
for _, room := range c.joinedRooms { for _, room := range c.joinedRooms {
roomnames = append(roomnames, room.name)
room.out(c) room.out(c)
} }
c.joinedRooms = nil c.joinedRooms = nil
go ws.wsApiBroker.Call(c.sender, ClientDisconnected, nil)
bt, _ := json.Marshal(roomnames)
go ws.callReceiver.OnClientMessageReceived(c.sender, Disconnected, bytes.NewBuffer(bt))
} else { } else {
entireConns[c.sender.Accid.Hex()] = c entireConns[c.sender.Accid.Hex()] = c
go ws.callReceiver.OnClientMessageReceived(c.sender, Connected, nil) go ws.wsApiBroker.Call(c.sender, ClientConnected, nil)
} }
} }
} }
@ -549,16 +483,12 @@ func upgrade_core(ws *WebsocketHandler, conn *websocket.Conn, accid primitive.Ob
break break
} }
if messageType == websocket.TextMessage { if messageType == websocket.BinaryMessage {
// 유저가 직접 보낸 메시지 var size [1]byte
ws.callReceiver.OnClientMessageReceived(c.sender, TextMessage, r) r.Read(size[:])
} else if messageType == websocket.BinaryMessage { cmd := make([]byte, size[0])
ws.callReceiver.OnClientMessageReceived(c.sender, BinaryMessage, r) r.Read(cmd)
} ws.wsApiBroker.Call(newconn.sender, string(cmd), r)
}
if c.sender.disconnectedCallbacks != nil {
for _, f := range c.sender.disconnectedCallbacks {
f()
} }
} }
ws.connWaitGroup.Done() ws.connWaitGroup.Done()