Files
tavern/core/group_chat.go
2023-08-31 21:02:19 +09:00

358 lines
9.9 KiB
Go

package core
import (
"encoding/json"
"fmt"
"io"
"net/http"
"reflect"
"strings"
"time"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/wshandler"
)
func init() {
groupTypeContainer()["chat"] = reflect.TypeOf(&groupChat{})
}
type channelID = string
type channelConfig struct {
Capacity int64 `json:"capacity"`
Size int64 `json:"size"`
Key string `json:"key"`
emptyJson string
}
type chatConfig struct {
DefaultCapacity int64 `json:"default_capacity"`
Channels map[string]*channelConfig `json:"channels"`
}
type groupChat struct {
chatConfig
rh *gocommon.RedisonHandler
enterRoom func(channelID, accountID)
leaveRoom func(channelID, accountID)
sendUpstreamMessage func(msg *wshandler.UpstreamMessage)
}
var accidHex func(primitive.ObjectID) string
var accidstrHex func(string) string
func (gc *groupChat) Initialize(tv *Tavern, cfg configDocument) error {
rem, _ := json.Marshal(cfg)
if err := json.Unmarshal(rem, &gc.chatConfig); err != nil {
return err
}
gc.enterRoom = func(chanid channelID, accid accountID) {
tv.wsh.EnterRoom(string(chanid), accid)
}
gc.leaveRoom = func(chanid channelID, accid accountID) {
tv.wsh.LeaveRoom(string(chanid), accid)
}
gc.sendUpstreamMessage = func(msg *wshandler.UpstreamMessage) {
tv.wsh.SendUpstreamMessage(msg)
}
gc.rh = tv.redison
tv.apiFuncs.registApiFunction("FetchChattingChannels", gc.FetchChattingChannels)
tv.apiFuncs.registApiFunction("BroadcastMessageOnChannel", gc.BroadcastMessageOnChannel)
tv.apiFuncs.registApiFunction("QueryPlayerChattingChannel", gc.QueryPlayerChattingChannel)
tv.apiFuncs.registApiFunction("SendMessageOnChannel", gc.SendMessageOnChannel)
for name, cfg := range gc.chatConfig.Channels {
if cfg.Capacity == 0 {
cfg.Capacity = gc.chatConfig.DefaultCapacity
}
cfg.Key = name
cfg.Size = 0
jm, _ := json.Marshal(cfg)
cfg.emptyJson = fmt.Sprintf("[%s]", string(jm))
_, err := gc.rh.JSONSet(name, "$", cfg)
if *devflag && err != nil {
gc.rh.Del(gc.rh.Context(), name).Result()
_, err = gc.rh.JSONSet(name, "$", cfg)
}
if err != nil {
return err
}
}
ts := fmt.Sprintf("%x-", time.Now().Unix())
if *devflag {
accidHex = func(accid primitive.ObjectID) string {
return ts + accid.Hex()
}
accidstrHex = func(accid string) string {
return ts + accid
}
} else {
accidHex = func(accid primitive.ObjectID) string {
return accid.Hex()
}
accidstrHex = func(accid string) string {
return accid
}
}
return nil
}
func (gc *groupChat) ClientMessageReceived(sender *wshandler.Sender, mt wshandler.WebSocketMessageType, message any) {
if mt == wshandler.Disconnected {
if _, err := gc.rh.Del(gc.rh.Context(), accidHex(sender.Accid)).Result(); err != nil {
logger.Println(err)
}
} else if mt == wshandler.BinaryMessage {
commandline := message.([]any)
cmd := commandline[0].(string)
args := commandline[1:]
switch cmd {
case "EnterPublicChannel":
chanid := args[0].(string)
if cfg, ok := gc.chatConfig.Channels[chanid]; ok {
size, err := gc.rh.JSONGetInt64(chanid, "$.size")
if err != nil || len(size) == 0 {
logger.Println("JSONGetInt64 failed :", chanid, err)
} else if size[0] < cfg.Capacity {
// 입장
newsize, err := gc.rh.JSONNumIncrBy(chanid, "$.size", 1)
if err == nil {
gc.enterRoom(chanid, sender.Accid)
sender.RegistDisconnectedCallback(chanid, func() {
size, err := gc.rh.JSONNumIncrBy(chanid, "$.size", -1)
if err == nil {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"size": size},
Tag: []string{"ChattingChannelProperties"},
})
}
})
gc.rh.HSet(gc.rh.Context(), accidHex(sender.Accid), "cc_pub", chanid)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"size": newsize[0]},
Tag: []string{"ChattingChannelProperties"},
})
}
} else {
// 풀방
logger.Println("chatting channel is full :", chanid, size, cfg.Capacity)
}
} else {
logger.Println("chatting channel not valid :", chanid)
}
case "LeavePublicChannel":
chanid := args[0].(string)
gc.rh.HDel(gc.rh.Context(), accidHex(sender.Accid), "cc_pub")
gc.leaveRoom(chanid, sender.Accid)
if f := sender.PopDisconnectedCallback(chanid); f != nil {
f()
}
case "TextMessage":
chanid := args[0].(string)
msg := args[1].(string)
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + chanid,
Body: map[string]any{"sender": sender.Alias, "msg": msg},
Tag: []string{"TextMessage"},
})
case "EnterPrivateChannel":
typename := args[0].(string)
channel := args[1].(string)
var reason string
if len(args) > 2 {
reason = args[2].(string)
}
if len(reason) > 0 {
// 수락
ok, err := gc.rh.HSetNX(gc.rh.Context(), accidHex(sender.Accid), "cc_"+typename, channel).Result()
if err != nil || !ok {
// 이미 다른 private channel 참여 중
logger.Println("EnterPrivateChannel failed. HSetNX return err :", err, sender.Accid.Hex(), typename, channel)
return
}
gc.enterRoom(channel, sender.Accid)
sender.RegistDisconnectedCallback(channel, func() {
gc.rh.JSONDel(channel, "$."+sender.Accid.Hex())
cnt, _ := gc.rh.HDel(gc.rh.Context(), accidHex(sender.Accid), "cc_"+typename).Result()
if cnt > 0 {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: map[string]any{"sender": sender.Alias, "typename": typename},
Tag: []string{"LeavePrivateChannel"},
})
}
})
}
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: map[string]any{
"sender": sender.Alias,
"msg": reason,
"typename": typename,
},
Tag: []string{"EnterPrivateChannel"},
})
case "LeavePrivateChannel":
channel := args[1].(string)
gc.leaveRoom(channel, sender.Accid)
if f := sender.PopDisconnectedCallback(channel); f != nil {
f()
}
}
}
}
func (gc *groupChat) FetchChattingChannels(w http.ResponseWriter, r *http.Request) {
prefix, _ := gocommon.ReadStringFormValue(r.Form, "prefix")
if len(prefix) == 0 {
logger.Println("FetchChattingChannel failed. prefix is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
var rows []string
for name, cfg := range gc.chatConfig.Channels {
if len(prefix) > 0 {
if !strings.HasPrefix(name, prefix) {
continue
}
}
onechan, err := gc.rh.JSONGet(name, "$")
if err != nil && err != redis.Nil {
logger.Println("FetchChattingChannel failed. HGetAll return err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if err == redis.Nil || onechan == nil {
rows = append(rows, cfg.emptyJson)
} else {
rows = append(rows, onechan.(string))
}
}
if len(rows) == 0 {
w.Write([]byte("[]"))
} else if len(rows) == 1 {
w.Write([]byte(rows[0]))
} else {
first := rows[0]
w.Write([]byte(first[:len(first)-1]))
for i := 1; i < len(rows); i++ {
mid := rows[i]
w.Write([]byte(","))
w.Write([]byte(mid[1 : len(mid)-1]))
}
w.Write([]byte("]"))
}
}
func (gc *groupChat) QueryPlayerChattingChannel(w http.ResponseWriter, r *http.Request) {
accid, _ := gocommon.ReadStringFormValue(r.Form, "accid")
typename, _ := gocommon.ReadStringFormValue(r.Form, "typename")
var fields []string
if len(typename) == 0 {
fields = []string{"cc_pub"}
} else {
fields = []string{"cc_pub", "cc_" + typename}
}
chans, err := gc.rh.HMGet(gc.rh.Context(), accidstrHex(accid), fields...).Result()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
output := make(map[string]string)
if len(chans) > 0 && chans[0] != nil {
output["public"] = chans[0].(string)
}
if len(chans) > 1 && chans[1] != nil {
output[typename] = chans[1].(string)
}
enc := json.NewEncoder(w)
enc.Encode(output)
}
func (gc *groupChat) SendMessageOnChannel(w http.ResponseWriter, r *http.Request) {
channel, _ := gocommon.ReadStringFormValue(r.Form, "channel")
target, _ := gocommon.ReadStringFormValue(r.Form, "target")
tag, _ := gocommon.ReadStringFormValue(r.Form, "tag")
if len(channel) == 0 || len(target) == 0 || len(tag) == 0 {
logger.Println("SendMessageOnChannel failed. channel or target or tag is empty")
w.WriteHeader(http.StatusBadRequest)
return
}
var doc map[string]any
bt, err := io.ReadAll(r.Body)
if err != nil {
logger.Println("SendMessageOnChannel failed. io.ReadAll returns err :", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if len(bt) > 0 {
if err := bson.Unmarshal(bt, &doc); err != nil {
logger.Println("SendMessageOnChannel failed. decode returns err :", err)
w.WriteHeader(http.StatusBadRequest)
return
}
}
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: fmt.Sprintf("%s@%s", target, channel),
Body: doc,
Tag: []string{tag},
})
}
func (gc *groupChat) BroadcastMessageOnChannel(w http.ResponseWriter, r *http.Request) {
nickname, _ := gocommon.ReadStringFormValue(r.Form, "nickname")
channel, _ := gocommon.ReadStringFormValue(r.Form, "channel")
tag, _ := gocommon.ReadStringFormValue(r.Form, "tag")
text, _ := io.ReadAll(r.Body)
if len(tag) > 0 {
var doc map[string]any
if err := bson.Unmarshal(text, &doc); err == nil {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: doc,
Tag: []string{tag},
})
} else {
logger.Println("BroadcastMessageOnChannel failed :", err)
}
} else {
gc.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + channel,
Body: map[string]any{"sender": nickname, "msg": string(text)},
Tag: []string{"TextMessage"},
})
}
}