채널간 publish marshalling을 gob으로 변경
This commit is contained in:
@ -1,7 +1,9 @@
|
|||||||
package wshandler
|
package wshandler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/gob"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -263,16 +265,19 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
|
|||||||
|
|
||||||
raw, err := pubsub.ReceiveMessage(ctx)
|
raw, err := pubsub.ReceiveMessage(ctx)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
buffer := bytes.NewBuffer([]byte(raw.Payload))
|
||||||
|
dec := gob.NewDecoder(buffer)
|
||||||
|
|
||||||
if raw.Channel == sh.redisMsgChanName {
|
if raw.Channel == sh.redisMsgChanName {
|
||||||
var msg UpstreamMessage
|
var msg UpstreamMessage
|
||||||
if err := json.Unmarshal([]byte(raw.Payload), &msg); err == nil {
|
if err := dec.Decode(&msg); err == nil {
|
||||||
sh.deliveryChan <- &msg
|
sh.deliveryChan <- &msg
|
||||||
} else {
|
} else {
|
||||||
logger.Println("decode UpstreamMessage failed :", err)
|
logger.Println("decode UpstreamMessage failed :", err)
|
||||||
}
|
}
|
||||||
} else if raw.Channel == sh.redisCmdChanName {
|
} else if raw.Channel == sh.redisCmdChanName {
|
||||||
var cmd commandMessage
|
var cmd commandMessage
|
||||||
if err := json.Unmarshal([]byte(raw.Payload), &cmd); err == nil {
|
if err := dec.Decode(&cmd); err == nil {
|
||||||
sh.deliveryChan <- &cmd
|
sh.deliveryChan <- &cmd
|
||||||
} else {
|
} else {
|
||||||
logger.Println("decode UpstreamMessage failed :", err)
|
logger.Println("decode UpstreamMessage failed :", err)
|
||||||
@ -305,6 +310,9 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
|
|||||||
|
|
||||||
// 유저에게서 온 메세지, 소켓 연결/해체 처리
|
// 유저에게서 온 메세지, 소켓 연결/해체 처리
|
||||||
for {
|
for {
|
||||||
|
buffer := bytes.NewBuffer(make([]byte, 0, 1024))
|
||||||
|
buffer.Reset()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case destroyedRoom := <-roomDestroyChan:
|
case destroyedRoom := <-roomDestroyChan:
|
||||||
delete(rooms, destroyedRoom)
|
delete(rooms, destroyedRoom)
|
||||||
@ -330,8 +338,10 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if bt, err := json.Marshal(usermsg); err == nil {
|
|
||||||
sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, bt).Result()
|
enc := gob.NewEncoder(buffer)
|
||||||
|
if err := enc.Encode(usermsg); err == nil {
|
||||||
|
sh.redisSync.Publish(context.Background(), sh.redisMsgChanName, buffer).Result()
|
||||||
}
|
}
|
||||||
|
|
||||||
case *commandMessage:
|
case *commandMessage:
|
||||||
@ -358,8 +368,9 @@ func (sh *subhandler) mainLoop(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다
|
// 위에서 break 안걸리면 나한테 없으므로 publish를 해야 함. 그러면 다른 호스트가 deliveryChan으로 받는다
|
||||||
if bt, err := json.Marshal(usermsg); err == nil {
|
enc := gob.NewEncoder(buffer)
|
||||||
sh.redisSync.Publish(context.Background(), sh.redisCmdChanName, bt).Result()
|
if err := enc.Encode(usermsg); err == nil {
|
||||||
|
sh.redisSync.Publish(context.Background(), sh.redisCmdChanName, buffer).Result()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user