commit 1517f59fc3c73336dfec24f03458850ab67f52c2 Author: mountain Date: Wed May 24 16:10:00 2023 +0900 tavern 분리 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e899854 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.vscode/ +__debug_bin.exe +*.log diff --git a/config_template.json b/config_template.json new file mode 100644 index 0000000..261b84c --- /dev/null +++ b/config_template.json @@ -0,0 +1,66 @@ +{ + "region_storage" : { + "default" : { + "mongo" : "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", + "redis" : { + "url" : "redis://192.168.8.94:6379", + "offset" : { + "cache" : 0, + "session" : 1, + "ranking" : 2 + } + } + }, + "dev" : { + "mongo" : "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", + "redis" : { + "url" : "redis://192.168.8.94:6379", + "offset" : { + "cache" : 0, + "session" : 1, + "ranking" : 2 + } + } + } + }, + "maingate_mongodb_url" : "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", + "maingate_service_url" : "http://localhost/maingate", + "maingate_api_token" : "63d08aa34f0162622c11284b", + + "tavern_service_url" : "http://localhost/tavern", + "tavern_group_types" : { + "subjugate" : { + "text_search_field" : ["name"], + "unique_index" : ["name,_id", "_id,members", "name,hidden"], + "search_index" : ["rules"], + "member_index" : ["_gid,candidate,luts","_gid,luts","_gid,expiring"], + "invite_ttl" : 30, + "candidate_ttl" : 3600, + "invitee_exlusive" : true, + "invitee_is_member" : true, + "max_member" : 4 + }, + "lobby" : { + "max_member" : 3, + "invitee_exlusive" : true, + "invitee_is_member" : true, + "transient" : true, + "invite_ttl" : 30 + } + }, + + "ws_sync_pipeline" : "redis://192.168.8.94:6379/3", + + "services" : { + "kingdom" : { + "개발중" : { + "url" :"http://localhost/warehouse/dev", + "development" : true + }, + "개인서버" : { + "url" : "http://localhost/warehouse/private", + "development" : false + } + } + } +} \ No newline at end of file diff --git a/core/apiimpl.go b/core/apiimpl.go new file mode 100644 index 0000000..3ab6061 --- /dev/null +++ b/core/apiimpl.go @@ -0,0 +1,1067 @@ +package core + +import ( + "context" + "encoding/json" + "io" + "net/http" + "strings" + + common "repositories.action2quare.com/ayo/gocommon" + "repositories.action2quare.com/ayo/gocommon/logger" + "repositories.action2quare.com/ayo/gocommon/wshandler" + + "github.com/go-redis/redis/v8" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +func splitDocument(doc bson.M) bson.M { + setdoc := bson.M{} + unsetdoc := bson.M{} + findoc := bson.M{} + + for k, v := range doc { + if k == "$set" { + setdoc = v.(bson.M) + } else if k == "$unset" { + unsetdoc = v.(bson.M) + } + } + + for k, v := range doc { + if v == nil { + unsetdoc[k] = 1 + } else if k[0] != '$' { + setdoc[k] = v + } + } + + if len(setdoc) > 0 { + findoc["$set"] = setdoc + } + if len(unsetdoc) > 0 { + findoc["$unset"] = unsetdoc + } + + return findoc +} + +// CreateGroup : 그룹 생성 +// - 그룹 : 멤버와 권한을 관리할 수 있다. 그룹 타입에 따라 디비에 저장되거나 메모리에만 존재한다. +// - 생성 요청이 오면 파티를 만든다. 파티을 만들 수 있는지 여부는 서비스에서 결정할 것이고, 이 요청을 호출했다는 것은 서비스가 결정한 그룹 생성 조건을 다 통과했다는 의미이다. +// - parameter : +// - type : 그룹 종류. 그룹 종류에 따라 인덱스와 쿼리 가능 field가 다르다. +func (sub *subTavern) CreateGroup(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + grouptype := sub.groups[typename] + if grouptype == nil { + logger.Println("CreateGroup failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + doc := bson.M{} + if err := readBsonDoc(r.Body, &doc); err != nil { + logger.Error("CreateGroup failed. readBsonDoc returns err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + inserted, err := grouptype.Create(r.Form, doc) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + w.Write(inserted[:]) +} + +// JoinGroup : 그룹에 참가 +// - type : 그룹 타입 +// - 그룹 타입에 맞는 키(주로 _id) +// - member_id : 참가 멤버의 아이디 +// - body : 멤버의 속성 bson document +func (sub *subTavern) JoinGroup(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("JoinGroup failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + doc := bson.M{} + if err := readBsonDoc(r.Body, &doc); err != nil { + logger.Error("JoinGroup failed. readBsonDoc returns err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } + gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("JoinGroup failed. gid is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + midobj, midok := common.ReadObjectIDFormValue(r.Form, "mid") + tidobj, tidok := common.ReadObjectIDFormValue(r.Form, "tid") + if !midok && !tidok { + // 둘다 없네? + logger.Println("JoinGroup failed. tid or mid should be exist") + w.WriteHeader(http.StatusBadRequest) + return + } + + var err error + if candidate, ok := common.ReadBoolFormValue(r.Form, "candidate"); ok && candidate { + err = group.Candidate(gidobj, midobj, doc) + } else { + tidobj, err = group.Join(gidobj, midobj, tidobj, doc) + } + + if err == nil { + json.NewEncoder(w).Encode(map[string]string{ + "gid": gidobj.Hex(), + "tid": tidobj.Hex(), + }) + } else if err == errGroupNotExist { + w.Write([]byte("{}")) + } else if err != nil { + logger.Error("JoinGroup failed :", err) + w.WriteHeader(http.StatusInternalServerError) + } +} + +func (sub *subTavern) EnterCandidateChannel(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + if _, ok := sub.groups[typename]; !ok { + logger.Println("EnterCandidateChannel failed. group type is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + + midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("EnterCandidateChannel failed. mid is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("EnterCandidateChannel failed. gid is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + + // candidate channel은 big endian 최상위 비트가 1 + gidobj[0] |= 0x80 + + if conn := sub.wsh.Conn(sub.region, midobj); conn != nil { + richConnOuter{wsh: sub.wsh, rc: conn}.JoinTag(sub.region, gidobj, midobj, typename) + } else { + sub.wshRpc.caller.One(midobj).JoinTag(sub.region, gidobj, midobj, typename) + } +} + +func (sub *subTavern) LeaveCandidateChannel(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + if _, ok := sub.groups[typename]; !ok { + logger.Println("EnterCandidateChannel failed. group type is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + + midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("EnterCandidateChannel failed. mid is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("EnterCandidateChannel failed. gid is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + + // candidate channel은 big endian 최상위 비트가 1 + gidobj[0] |= 0x80 + + if conn := sub.wsh.Conn(sub.region, midobj); conn != nil { + richConnOuter{wsh: sub.wsh, rc: conn}.LeaveTag(sub.region, gidobj, midobj, typename) + } else { + sub.wshRpc.caller.One(midobj).LeaveTag(sub.region, gidobj, midobj, typename) + } +} + +func (sub *subTavern) EnterGroupChannel(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("EnterGroupChannel failed. group type is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + + midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("EnterGroupChannel failed. mid is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("EnterGroupChannel failed. gid is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + + tid := group.FindTicketID(gidobj, midobj) + if tid.IsZero() { + logger.Println("EnterGroupChannel failed. tid is zero") + w.WriteHeader(http.StatusBadRequest) + return + } + + if conn := sub.wsh.Conn(sub.region, midobj); conn != nil { + richConnOuter{wsh: sub.wsh, rc: conn}.JoinTag(sub.region, gidobj, tid, typename) + } else { + sub.wshRpc.caller.One(midobj).JoinTag(sub.region, gidobj, tid, typename) + } + writeBsonDoc(w, primitive.M{"_id": tid}) +} + +func (sub *subTavern) SetStateInGroup(w http.ResponseWriter, r *http.Request) { + gid, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("SetStateInGroup failed. tag is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("SetStateInGroup failed. mid form value is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + state, ok := common.ReadStringFormValue(r.Form, "state") + if !ok { + logger.Println("SetStateInGroup failed. state is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + + typename, ok := common.ReadStringFormValue(r.Form, "type") + if !ok { + logger.Println("SetStateInGroup failed. type is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + var doc bson.M + if err := readBsonDoc(r.Body, &doc); err != nil { + logger.Error("SetStateInGroup failed. readBsonDoc err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + tid := doc["_id"].(primitive.ObjectID) + if conn := sub.wsh.Conn(sub.region, mid); conn != nil { + richConnOuter{wsh: sub.wsh, rc: conn}.SetStateInTag(sub.region, gid, tid, state, typename) + } else { + sub.wshRpc.caller.One(mid).SetStateInTag(sub.region, gid, tid, state, typename) + } +} + +// Invite : 초대 +// - type : 초대 타입 (required) +// - from : 초대하는 자 (required) +// - to : 초대받는 자 (required) +// - timeout : 초대 유지시간(optional. 없으면 config 기본 값) +// - (body) : 검색시 노출되는 document +func (sub *subTavern) Invite(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("Invite failed. group type is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + gid, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("Invite failed. gid is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("Invite failed. mid is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + + var reqdoc struct { + Inviter bson.M `bson:"inviter"` + Invitee bson.M `bson:"invitee"` + } + if err := readBsonDoc(r.Body, &reqdoc); err != nil { + logger.Error("Invite failed. readBsonDoc returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + result, err := group.Invite(gid, mid, reqdoc.Inviter, reqdoc.Invitee) + if err != nil { + logger.Error("Invite failed. group.Invite returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Write([]byte(result)) +} + +func (sub *subTavern) UpdateGroupMember(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("UpdateGroupMember failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("UpdateGroupMember failed. gid is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + midobj, midok := common.ReadObjectIDFormValue(r.Form, "mid") + tidobj, tidok := common.ReadObjectIDFormValue(r.Form, "tid") + if !midok && !tidok { + // 둘다 없네? + logger.Println("JoinGroup failed. tid or mid should be exist") + w.WriteHeader(http.StatusBadRequest) + return + } + + var err error + delete, _ := common.ReadBoolFormValue(r.Form, "delete") + if delete { + err = group.UpdateGroupMember(gidobj, midobj, tidobj, nil) + } else { + var doc bson.M + if err := readBsonDoc(r.Body, &doc); err != nil { + logger.Error("UpdateGroupMember failed. readBsonDoc returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + err = group.UpdateGroupMember(gidobj, midobj, tidobj, doc) + } + + if err != nil { + logger.Println("UpdateGroupMember failed. Update returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +func (sub *subTavern) CancelInvitation(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("CancelInvitation failed. group type is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + tid, ok := common.ReadObjectIDFormValue(r.Form, "tid") + if !ok { + logger.Println("CancelInvitation failed. form value 'tid' is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + gid, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("CancelInvitation failed. form value 'gid' is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + if err := group.CancelInvitation(gid, tid); err != nil { + logger.Println("CancelInvitation failed. group.CancelInvitation returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + } +} + +func (sub *subTavern) AcceptInvitation(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("CancelInvitation failed. group type is missing :", r) + w.WriteHeader(http.StatusBadRequest) + return + } + + gid, _ := common.ReadObjectIDFormValue(r.Form, "gid") + mid, _ := common.ReadObjectIDFormValue(r.Form, "mid") + tid, ok := common.ReadObjectIDFormValue(r.Form, "tid") + if !ok { + logger.Println("CancelInvitation failed. form value 'tid' is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + var member bson.M + if err := readBsonDoc(r.Body, &member); err != nil { + logger.Error("AcceptInvitation failed. readBsonDoc returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + gidbytes, err := group.AcceptInvitation(gid, mid, tid, member) + if err != nil { + logger.Error("AcceptInvitation failed. group.AcceptInvitation returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Write([]byte(gidbytes.Hex())) +} + +func (sub *subTavern) DenyInvitation(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("DenyInvitation failed. group type is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + gid, _ := common.ReadObjectIDFormValue(r.Form, "gid") + mid, _ := common.ReadObjectIDFormValue(r.Form, "mid") + tid, ok := common.ReadObjectIDFormValue(r.Form, "tid") + if !ok { + logger.Println("DenyInvitation failed. tid is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + err := group.DenyInvitation(gid, mid, tid) + if err != nil { + logger.Error("DenyInvitation failed. group.DenyInvitation returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +func (sub *subTavern) QueryInvitations(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("QueryInvitations failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("QueryInvitations failed. mid is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + var after primitive.Timestamp + if v, ok := common.ReadStringFormValue(r.Form, "after"); ok && v != "0.0" { + after = common.DotStringToTimestamp(v) + } + + result, err := group.QueryInvitations(mid, after) + + if err != nil { + logger.Println("QueryInvitations failed. group.QueryInvitations returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if err := writeBsonArr(w, result); err != nil { + logger.Println("QueryInvitations failed. Encode returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +func (sub *subTavern) TurnGroupOnline(w http.ResponseWriter, r *http.Request) { + // group을 online 상태로 만든다. + // 요청을 보내는 클라이언트의 conn이 끊이면 online에서 제거한다. + // online인 group을 가지고 뭘 할지는 게임이 알아서... + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("TurnGroupOnline failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + gid, ok := common.ReadObjectIDFormValue(r.Form, "_id") + if !ok { + logger.Println("TurnGroupOnline failed. group id '_id' form value is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("TurnGroupOnline failed. mid form value is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + var filter bson.M + if err := readBsonDoc(r.Body, &filter); err != nil { + logger.Error("TurnGroupOnline failed. readBsonDoc return err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + exist, err := group.Exist(gid, filter) + if err != nil { + logger.Error("TurnGroupOnline failed. FindOne return err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + if !exist { + logger.Println("TurnGroupOnline failed. filter not match", filter) + w.WriteHeader(http.StatusBadRequest) + return + } + + score, ok := common.ReadFloatFormValue(r.Form, "score") + if !ok { + score = 100 + } + + if conn := sub.wsh.Conn(sub.region, mid); conn != nil { + err = richConnOuter{wsh: sub.wsh, rc: conn}.TurnGroupOnline(onlineGroupQueryKey(typename), gid, score) + } else { + err = sub.wshRpc.caller.One(mid).TurnGroupOnline(onlineGroupQueryKey(typename), gid, score) + } + if err != nil { + logger.Error("TurnGroupOnline failed. TurnGroupOnline err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } +} + +func (sub *subTavern) TurnGroupOffline(w http.ResponseWriter, r *http.Request) { + // group을 offline 상태로 만든다. + // 요청을 보내는 클라이언트의 conn이 끊이면 online에서 제거한다. + // online인 group을 가지고 뭘 할지는 게임이 알아서... + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("TurnGroupOffline failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + gid, ok := common.ReadObjectIDFormValue(r.Form, "_id") + if !ok { + logger.Println("TurnGroupOffline failed. group id '_id' form value is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("TurnGroupOffline failed. mid form value is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + // onlinename := onlineGroupQueryKey(typename) + // if onClose := conn.UnregistOnCloseFunc(onlinename); onClose != nil { + // onClose() + // } else { + // gid, ok := common.ReadStringFormValue(form, "_id") + // if ok { + // sub.redisSync.ZRem(context.Background(), onlinename, gid) + // } + // } + + var err error + if conn := sub.wsh.Conn(sub.region, mid); conn != nil { + err = richConnOuter{wsh: sub.wsh, rc: conn}.TurnGroupOffline(onlineGroupQueryKey(typename), gid) + } else { + err = sub.wshRpc.caller.One(mid).TurnGroupOffline(onlineGroupQueryKey(typename), gid) + } + + if err != nil { + logger.Error("TurnGroupOffline failed. TurnGroupOnline err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } +} + +func (sub *subTavern) QueryOnlineGroup(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("QueryOnlineGroup failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + var cmd *redis.StringSliceCmd + scoreStart, _ := common.ReadStringFormValue(r.Form, "score_start") + scoreStop, _ := common.ReadStringFormValue(r.Form, "score_stop") + + if len(scoreStart) > 0 || len(scoreStop) > 0 { + if len(scoreStart) == 0 { + scoreStart = "-inf" + } + if len(scoreStop) == 0 { + scoreStop = "+inf" + } + cmd = sub.wsh.RedisSync.ZRangeArgs(context.Background(), redis.ZRangeArgs{ + Key: onlineGroupQueryKey(typename), + ByScore: true, + Start: scoreStart, + Stop: scoreStop, + Rev: true, + Count: 1, + }) + } else { + // 아무거나 + cmd = sub.wsh.RedisSync.ZRandMember(context.Background(), onlineGroupQueryKey(typename), 1, false) + } + + result, err := cmd.Result() + if err != nil { + logger.Error("QueryOnlineGroup failed. redid.ZRandMember returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + writeBsonDoc(w, bson.M{"r": result}) +} + +func (sub *subTavern) SearchGroup(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("SearchGroup failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + projection, _ := common.ReadStringFormValue(r.Form, "projection") + + var filter bson.M + if err := readBsonDoc(r.Body, &filter); err != nil { + logger.Error("SearchGroup failed. readBsonDoc returns err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + result, err := group.FindAll(filter, projection, primitive.Timestamp{}) + if err != nil { + logger.Error("SearchGroup failed. FindAll err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if result == nil { + return + } + + if err := writeBsonArr(w, result); err != nil { + logger.Error("json marshal failed :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +func (sub *subTavern) QueryOnlineState(w http.ResponseWriter, r *http.Request) { + mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("IsOnline failed. mid is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + state, err := sub.wsh.GetState(mid) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write([]byte(state)) +} + +func (sub *subTavern) IsOnline(w http.ResponseWriter, r *http.Request) { + mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("IsOnline failed. mid is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + ok, err := sub.wsh.IsOnline(mid) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + if ok { + w.Write([]byte("true")) + } else { + w.Write([]byte("false")) + } +} + +// QueryGroup : 그룹조회 +// - type : 그룹 타입 +// - 그룹 타입에 맞는 키(주로 _id) +// - projection : select할 필드. ,로 구분 +func (sub *subTavern) QueryGroup(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("QueryGroup failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + gid, ok := common.ReadObjectIDFormValue(r.Form, "_id") + if !ok { + logger.Println("QueryGroup failed. _id is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + projection, _ := common.ReadStringFormValue(r.Form, "projection") + + after, _ := common.ReadStringFormValue(r.Form, "after") + if after != "0.0" { + projection += ",+luts" + } + + result, err := group.FindOne(gid, projection) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + if result == nil { + return + } + if len(after) > 0 { + if luts, ok := result["luts"].(primitive.Timestamp); ok { + afterts := common.DotStringToTimestamp(after) + if primitive.CompareTimestamp(luts, afterts) < 0 { + return + } + } + } + + if err := writeBsonDoc(w, result); err != nil { + logger.Error("json marshal failed :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +// QueryGroupMembers : 그룹내 멤버 조회 +// - type : 그룹 타입 +// - 그룹 타입에 맞는 키(주로 _id) +// - projection : select할 필드. ,로 구분 +func (sub *subTavern) QueryGroupMembers(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("QueryGroupMembers failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("QueryGroupMembers failed. _id is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + midobj, _ := common.ReadObjectIDFormValue(r.Form, "mid") + var after primitive.Timestamp + if ts, ok := common.ReadStringFormValue(r.Form, "after"); ok && ts != "0.0" { + after = common.DotStringToTimestamp(ts) + } + projection, _ := common.ReadStringFormValue(r.Form, "projection") + + result, err := group.QueryMembers(gidobj, midobj, projection, after) + if err != nil { + logger.Error("QueryGroupMembers failed. FindAll err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if result == nil { + return + } + + if err := writeBsonDoc(w, result); err != nil { + logger.Error("QueryGroupMembers failed. writeBsonArr err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +func (sub *subTavern) QueryGroupMember(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("QueryGroupMember failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + gid, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("QueryGroupMember failed. gid is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + mid, midok := common.ReadObjectIDFormValue(r.Form, "mid") + tid, tidok := common.ReadObjectIDFormValue(r.Form, "tid") + if !midok && !tidok { + // 둘 중 하나는 있어야지 + logger.Println("QueryGroupMember failed. tid and mid are both missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + projection, _ := common.ReadStringFormValue(r.Form, "projection") + result, err := group.QueryMember(gid, mid, tid, projection) + if err != nil { + logger.Println("QueryGroupMember failed. group.QueryMember returns err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + if result == nil { + return + } + + if err := writeBsonDoc(w, result); err != nil { + logger.Error("QueryGroupMember failed. writeBsonDoc err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +// LeaveGroup : 그룹에서 나감 or 내보냄 +// - type : 그룹 타입 +// - 그룹 타입에 맞는 키(주로 _id) +// - member_id : 나갈 멤버의 아이디 +func (sub *subTavern) LeaveGroup(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("LeaveGroup failed. group type is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + + gid, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("LeaveGroup failed. gid is missing :", r.Form) + w.WriteHeader(http.StatusBadRequest) + return + } + mid, midok := common.ReadObjectIDFormValue(r.Form, "mid") + tid, tidok := common.ReadObjectIDFormValue(r.Form, "tid") + if !midok && !tidok { + // 둘 중 하나는 있어야지 + logger.Println("LeaveGroup failed. tid and mid are both missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + if err := group.Leave(gid, mid, tid); err != nil { + // 둘 중 하나는 있어야지 + logger.Println("LeaveGroup failed. group.Leave returns err :", err) + w.WriteHeader(http.StatusBadRequest) + } +} + +func (sub *subTavern) UpdateMemberDocument(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("DismissGroup failed. type is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("UpdateMemberDocument failed. member_id is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("UpdateMemberDocument failed. _id is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + var updatedoc bson.M + if err := readBsonDoc(r.Body, &updatedoc); err != nil { + logger.Error("UpdateMemberDocument failed. body decoding error :", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + if err := group.UpdateMemberDocument(gidobj, midobj, updatedoc); err != nil { + logger.Println("UpdateMemberDocument failed :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +func (sub *subTavern) DismissGroup(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("DismissGroup failed. type is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + gid, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("DismissGroup failed. gid is missing :") + w.WriteHeader(http.StatusBadRequest) + return + } + + if err := group.Dismiss(gid); err != nil { + logger.Error("DismissGroup failed. group.Dismiss returns err :", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +func (sub *subTavern) UpdateGroupDocument(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("UpdateGroupDocument failed. type is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + gid, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("UpdateGroupDocument failed. gid is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + logger.Error("UpdateGroupDocument failed. readBsonDoc err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + if err := group.UpdateGroupDocument(gid, body); err != nil { + logger.Error("UpdateGroupDocument failed. group.UpdateGroupDocument returns err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } +} + +func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("DropDeadMember failed. type is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + gid, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("DropDeadMember failed. gid is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("DropDeadMember failed. mid is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + if err := group.DropPausedMember(gid, mid); err != nil { + logger.Error("DropDeadMember failed. group.DropDeadMember returns err :", err) + w.WriteHeader(http.StatusBadRequest) + return + } +} + +func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) { + defer func() { + r := recover() + if r != nil { + logger.Error(r) + } + }() + + redisSync := sub.wsh.RedisSync + for msg := range deliveryChan { + mid := msg.Alias + if msg.Body != nil { + buffer := msg.Body + + var channame string + for i, ch := range buffer { + if ch == 0 { + channame = string(buffer[:i]) + buffer = buffer[i+1:] + break + } + } + + if len(channame) == 0 { + continue + } + + buffer = append(mid[:], buffer...) + _, err := redisSync.Publish(context.Background(), channame, buffer).Result() + if err != nil { + logger.Error(err) + } + } + + if len(msg.Command) > 0 { + switch msg.Command { + case "pause": + gidtype := msg.Conn.GetTag("gid") + if len(gidtype) > 0 { + tokens := strings.SplitN(gidtype, "@", 2) + gidobj, _ := primitive.ObjectIDFromHex(tokens[0]) + gtype := tokens[1] + group := sub.groups[gtype] + if group != nil { + group.PauseMember(gidobj, msg.Alias, msg.Conn) + } + } + } + } + } + logger.Println("delivery chan fin") +} diff --git a/core/group.go b/core/group.go new file mode 100644 index 0000000..0c5a738 --- /dev/null +++ b/core/group.go @@ -0,0 +1,49 @@ +package core + +import ( + "net/url" + + "repositories.action2quare.com/ayo/gocommon/wshandler" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type groupConfig struct { + UniqueIndex []string `json:"unique_index"` + SearchIndex []string `json:"search_index"` + MemberIndex []string `json:"member_index"` + TextSearchFields []string `json:"text_search_field"` + InviteExpire int32 `json:"invite_ttl"` // 그룹이 개인에게 보낸 초대장 만료 기한 + CandidateExpire int32 `json:"candidate_ttl"` // 개인이 그룹에게 보낸 신청서 만료 기한 + InviteeExlusive bool `json:"invitee_exlusive"` + InviteeIsMember bool `json:"invitee_is_member"` + MaxMember int `json:"max_member"` + Transient bool `json:"transient"` + + Name string +} + +type group interface { + Create(form url.Values, doc bson.M) (primitive.ObjectID, error) + Candidate(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error + Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (newTicketID primitive.ObjectID, err error) + FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID + Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) + UpdateGroupMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) error + CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error + AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error) + DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error + QueryInvitations(memberID primitive.ObjectID, after primitive.Timestamp) ([]bson.M, error) + Exist(groupID primitive.ObjectID, filter bson.M) (bool, error) + FindAll(filter bson.M, projection string, after primitive.Timestamp) ([]bson.M, error) + FindOne(groupID primitive.ObjectID, projection string) (bson.M, error) + QueryMembers(groupID primitive.ObjectID, requesterID primitive.ObjectID, projection string, after primitive.Timestamp) (map[string]bson.M, error) + QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error) + Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error + DropPausedMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error + PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID, conn *wshandler.Richconn) error + UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error + Dismiss(groupID primitive.ObjectID) error + UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error +} diff --git a/core/group_memory.go b/core/group_memory.go new file mode 100644 index 0000000..33caff2 --- /dev/null +++ b/core/group_memory.go @@ -0,0 +1,1221 @@ +package core + +import ( + "context" + "crypto/md5" + "encoding/gob" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "net/url" + "os" + "path" + "reflect" + "runtime" + "strings" + "sync" + "time" + + common "repositories.action2quare.com/ayo/gocommon" + "repositories.action2quare.com/ayo/gocommon/logger" + "repositories.action2quare.com/ayo/gocommon/wshandler" + "repositories.action2quare.com/ayo/tavern/core/rpc" + + "github.com/go-redis/redis/v8" + "github.com/gorilla/websocket" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type accountID = primitive.ObjectID +type ticketID = primitive.ObjectID +type groupID = primitive.ObjectID + +var everyHost, _ = primitive.ObjectIDFromHex("010203040506070809101112") + +type Invitation struct { + GroupID groupID `json:"gid"` + TicketID ticketID `json:"tid"` + Inviter bson.M `json:"inviter"` + ExpireAtUTC int64 `json:"expire_at_utc"` +} + +type memberDocCommon struct { + Body bson.M + Invite bool + InviteExpire time.Time + JoinTime int64 +} + +// 플레이어한테 공유하는 멤버 정보 +type PublicMemberDoc struct { + memberDocCommon `json:",inline"` + Tid ticketID +} + +type FullGroupDoc struct { + Gid groupID + DM string + AllMembers []*PublicMemberDoc `json:",omitempty"` + Body GroupDocBody `json:",omitempty"` +} + +type GroupDocBody bson.M +type InvitationFail bson.M + +type memberDoc struct { + memberDocCommon `json:",inline"` + + // underscore keys in Hidden + Hidden bson.M + rconn *wshandler.Richconn + Mid accountID +} + +type groupDoc struct { + sync.Mutex + + Body GroupDocBody + + InCharge accountID + tickets map[ticketID]*memberDoc + createTime time.Time +} + +func init() { + gob.Register(PublicMemberDoc{}) +} + +func (gd *groupDoc) updateBodyWithBson(src []byte) ([]byte, error) { + gd.Lock() + defer gd.Unlock() + + err := bson.Unmarshal(src, &gd.Body) + if err != nil { + return nil, err + } + + return bson.Marshal(gd.Body) +} + +func (gd *groupDoc) updateBodyWithJson(src []byte) ([]byte, error) { + gd.Lock() + defer gd.Unlock() + + err := json.Unmarshal(src, &gd.Body) + if err != nil { + return nil, err + } + + return json.Marshal(makeTypeMessage(gd.Body)) +} + +func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) (jsonBt []byte, err error) { + gd.Lock() + defer gd.Unlock() + + err = bson.Unmarshal(bsonSrc, &gd.Body) + if err != nil { + return nil, err + } + + return json.Marshal(makeTypeMessage(gd.Body)) +} + +func (gd *groupDoc) updateBody(bsonSrc []byte) error { + gd.Lock() + defer gd.Unlock() + + return bson.Unmarshal(bsonSrc, &gd.Body) +} + +func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *wshandler.Richconn, ttl time.Duration, max int) (ticketID, *memberDoc) { + gd.Lock() + defer gd.Unlock() + + mid := inviteeDoc["_mid"].(accountID) + body := inviteeDoc["body"].(bson.M) + + // 초대 가능한 빈 자리가 있나 + now := time.Now().UTC() + if len(gd.tickets) < max { + tid := primitive.NewObjectID() + newdoc := &memberDoc{ + memberDocCommon: memberDocCommon{ + Body: body, + Invite: true, + InviteExpire: now.Add(ttl), + }, + rconn: rconn, + Mid: mid, + } + gd.tickets[tid] = newdoc + return tid, newdoc + } + + for oldtid, mem := range gd.tickets { + if !mem.Invite { + continue + } + if mem.InviteExpire.Before(now) { + delete(gd.tickets, oldtid) + tid := primitive.NewObjectID() + newdoc := &memberDoc{ + memberDocCommon: memberDocCommon{ + Body: body, + Invite: true, + InviteExpire: now.Add(ttl), + }, + rconn: rconn, + Mid: mid, + } + gd.tickets[tid] = newdoc + return tid, newdoc + } + } + + return primitive.NilObjectID, nil +} + +func seperateHidden(in bson.M) (public bson.M, hidden bson.M) { + for k, v := range in { + if k[0] == '_' { + if hidden == nil { + hidden = make(bson.M) + } + hidden[k] = v + } + } + + for k := range hidden { + delete(in, k) + } + return in, hidden +} + +func (gd *groupDoc) addInCharge(mid accountID, rconn *wshandler.Richconn, doc bson.M) (ticketID, *memberDoc) { + gd.Lock() + defer gd.Unlock() + + if !gd.InCharge.IsZero() { + return primitive.NilObjectID, nil + } + + gd.InCharge = mid + newtid := primitive.NewObjectID() + doc, hidden := seperateHidden(doc) + newdoc := &memberDoc{ + memberDocCommon: memberDocCommon{ + Body: doc, + Invite: false, + JoinTime: time.Now().UTC().Unix(), + }, + rconn: rconn, + Mid: mid, + Hidden: hidden, + } + + gd.tickets[newtid] = newdoc + if gd.Body == nil { + gd.Body = GroupDocBody(make(bson.M)) + } + gd.Body["incharge"] = newtid.Hex() + return newtid, newdoc +} + +func (gd *groupDoc) addMember(mid accountID, tid *ticketID, doc bson.M) (*memberDoc, bool) { + gd.Lock() + defer gd.Unlock() + + var memdoc *memberDoc + isNew := false + if tid.IsZero() { + for oldtid, d := range gd.tickets { + if d.Mid == mid { + memdoc = d + *tid = oldtid + isNew = true + break + } + } + } else { + var ok bool + memdoc, ok = gd.tickets[*tid] + if !ok { + // 티켓이 업네? + return nil, false + } + + if memdoc.Mid != mid { + // 내 티켓이 아니네? + return nil, false + } + } + + doc, hidden := seperateHidden(doc) + if memdoc != nil { + memdoc.Body = doc + memdoc.Hidden = hidden + + if memdoc.Invite { + isNew = true + memdoc.Invite = false + } + + if memdoc.JoinTime == 0 { + memdoc.JoinTime = time.Now().UTC().Unix() + } + } + return memdoc, isNew +} + +func (gd *groupDoc) removeMember(mid accountID, tid *ticketID) { + gd.Lock() + defer gd.Unlock() + + if tid.IsZero() { + for t, mem := range gd.tickets { + if mem.Mid == mid { + *tid = t + delete(gd.tickets, t) + return + } + } + } + + delete(gd.tickets, *tid) + + if gd.InCharge == mid { + gd.InCharge = primitive.NilObjectID + } +} + +func (gd *groupDoc) conns(includeInvitee bool) (out []*wshandler.Richconn) { + gd.Lock() + defer gd.Unlock() + + for _, mem := range gd.tickets { + if mem.rconn != nil { + if !includeInvitee && mem.Invite { + continue + } + out = append(out, mem.rconn) + } + } + return +} + +func (gd *groupDoc) ticket(mid accountID) ticketID { + gd.Lock() + defer gd.Unlock() + + if mid.IsZero() { + return primitive.NilObjectID + } + + for t, m := range gd.tickets { + if m.Mid == mid { + return t + } + } + return primitive.NilObjectID +} + +func (gd *groupDoc) member(tid ticketID) *memberDoc { + gd.Lock() + defer gd.Unlock() + + return gd.tickets[tid] +} + +func (gd *groupDoc) memberByAccount(mid accountID) (ticketID, *memberDoc) { + gd.Lock() + defer gd.Unlock() + + for tid, doc := range gd.tickets { + if doc.Mid == mid { + return tid, doc + } + } + return primitive.NilObjectID, nil +} + +func (gd *groupDoc) modifyMemberDocument(mid accountID, tid *ticketID, cb func(b *memberDoc)) *memberDoc { + gd.Lock() + defer gd.Unlock() + + if tid.IsZero() { + for t, mem := range gd.tickets { + if mem.Mid == mid { + *tid = t + break + } + } + } + + if tid.IsZero() { + return nil + } + + if mem := gd.tickets[*tid]; mem != nil { + cb(mem) + return mem + } + + return nil +} + +func (gd *groupDoc) overwriteMemberDocument(mid accountID, tid *ticketID, raw []byte) *memberDoc { + gd.Lock() + defer gd.Unlock() + + if tid.IsZero() { + for t, mem := range gd.tickets { + if mem.Mid == mid { + *tid = t + json.Unmarshal(raw, &mem.Body) + return mem + } + } + } + + if mem := gd.tickets[*tid]; mem != nil { + var newbody primitive.M + json.Unmarshal(raw, &newbody) + mem.Body = newbody + return mem + } + + return nil +} + +func (gd *groupDoc) iterateMembers(cb func(ticketID, *memberDoc)) { + gd.Lock() + defer gd.Unlock() + + for k, v := range gd.tickets { + cb(k, v) + } +} + +func (gd *groupDoc) serializeFull(gid groupID, directMessageChanName string) []byte { + gd.Lock() + defer gd.Unlock() + + var output []*PublicMemberDoc + for k, v := range gd.tickets { + if v.Invite { + // 아직 초대 중인 대상. 패스 + continue + } + + output = append(output, &PublicMemberDoc{ + memberDocCommon: v.memberDocCommon, + Tid: k, + }) + } + + bt, _ := json.Marshal(makeTypeMessage(FullGroupDoc{ + Gid: gid, + DM: directMessageChanName, + AllMembers: output, + Body: gd.Body, + })) + + return bt +} + +type groupContainer struct { + sync.Mutex + groupDocs map[groupID]*groupDoc +} + +type groupInMemory struct { + *groupConfig + groupDocSync func(groupID, []byte) error + memberSync func(groupID, accountID, ticketID, *memberDoc, bool) error + rpcCall func([]byte) error + hasConn func(accountID) *wshandler.Richconn + groups groupContainer +} + +func (gc *groupContainer) add(id groupID, doc *groupDoc) { + gc.Lock() + defer gc.Unlock() + + gc.groupDocs[id] = doc +} + +func (gc *groupContainer) createWithID(newid groupID, base bson.M) (groupID, *groupDoc) { + gc.Lock() + defer gc.Unlock() + + if _, ok := gc.groupDocs[newid]; ok { + return primitive.NilObjectID, nil + } + + newdoc := newGroupDoc(base) + gc.groupDocs[newid] = newdoc + + return newid, newdoc +} + +func (gc *groupContainer) delete(gid groupID) { + gc.Lock() + defer gc.Unlock() + + delete(gc.groupDocs, gid) +} + +func (gc *groupContainer) find(id groupID) *groupDoc { + gc.Lock() + defer gc.Unlock() + + if found, ok := gc.groupDocs[id]; ok { + return found + } + return nil +} + +func newGroupDoc(base bson.M) *groupDoc { + return &groupDoc{ + Body: GroupDocBody(base), + createTime: time.Now().UTC(), + tickets: make(map[ticketID]*memberDoc), + } +} + +func (gm *groupInMemory) Create(form url.Values, base bson.M) (groupID, error) { + return primitive.NilObjectID, nil +} + +func (gm *groupInMemory) Candidate(gid groupID, mid accountID, doc bson.M) error { + logger.Error("not implemented func : Canidate") + return nil +} + +var errGroupNotExist = errors.New("group does not exist") +var errFuncNameIsMissing = errors.New("how func name is missin") + +func (gm *groupInMemory) callProxyRpc(target accountID, name string, args ...any) error { + bt, err := rpc.Encode(target, name, args...) + if err != nil { + return err + } + + return gm.rpcCall(bt) +} + +type rpcTarget struct { + gm *groupInMemory + target accountID +} + +func (rt rpcTarget) call(args ...any) error { + pc := make([]uintptr, 1) + n := runtime.Callers(2, pc[:]) + if n < 1 { + return nil + } + + frame, _ := runtime.CallersFrames(pc).Next() + funcname := path.Ext(frame.Func.Name()) + if len(funcname) > 0 { + funcname = funcname[1:] + return rt.gm.callProxyRpc(rt.target, funcname, args...) + } + + return errFuncNameIsMissing +} + +func (gm *groupInMemory) rpc(target accountID) rpcTarget { + return rpcTarget{ + gm: gm, + target: target, + } +} + +var errNoEmptySlot = errors.New("no more seat in group") + +func (gm *groupInMemory) Join(gid groupID, mid accountID, tid ticketID, doc bson.M) (ticketID, error) { + group := gm.groups.find(gid) + if group == nil { + // 그룹이 없다. 실패 + return primitive.NilObjectID, errGroupNotExist + } + + // 내 정보 업데이트할 때에도 사용됨 + // 굳이 InCharge가 있는 호스트가 아니어도 가능 + if memdoc, isNew := group.addMember(mid, &tid, doc); memdoc != nil { + gm.memberSync(gid, mid, tid, memdoc, isNew) + } + + return tid, nil +} + +func (gm *groupInMemory) FindTicketID(gid groupID, mid groupID) ticketID { + return primitive.NilObjectID +} + +func makeTypeMessage[T any](msg T) bson.M { + var ptr *T + name := reflect.TypeOf(ptr).Elem().Name() + return bson.M{name: msg} +} + +func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) { + bt, _ := json.Marshal(makeTypeMessage(msg)) + rconn.WriteBytes(bt) +} + +func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) { + bt, _ := json.Marshal(makeTypeMessage(msg)) + gm.SendMessage(target, bt) +} + +func (gm *groupInMemory) SendMessage(target accountID, msg []byte) { + rconn := gm.hasConn(target) + if rconn != nil { + rconn.WriteBytes(msg) + } else { + gm.rpc(target).call(target, msg) + } +} + +func multicast(conns []*wshandler.Richconn, raw []byte) { + for _, rconn := range conns { + rconn.WriteBytes(raw) + } +} +func multicastTyped[T any](conns []*wshandler.Richconn, msg T) { + bt, _ := json.Marshal(makeTypeMessage(msg)) + go multicast(conns, bt) +} + +func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) { + if gd := gm.groups.find(gid); gd != nil { + bt, _ := json.Marshal(makeTypeMessage(msg)) + go multicast(gd.conns(false), bt) + } +} + +var errInviteeDocMidMissing = errors.New("inviteeDoc must have '_mid' field") + +func (gm *groupInMemory) SendInvitationFailed(mid accountID, inviteeDoc bson.M) error { + delete(inviteeDoc, "_mid") + rconn := gm.hasConn(mid) + if rconn == nil { + return gm.rpc(mid).call(mid, inviteeDoc) + } + + sendTypedMessage(gm, mid, InvitationFail(inviteeDoc)) + + return nil +} + +func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc bson.M, inviterDoc bson.M) error { + targetid := inviteeDoc["_mid"].(accountID) + + // invitee에게 알림 + // invitee의 rconn이 종료될 때 그룹에 반영해야 하므로 rconn을 찾자 + rconn := gm.hasConn(targetid) + if rconn == nil { + return gm.rpc(targetid).call(gid, inviteeDoc, inviterDoc) + } + + gd := gm.groups.find(gid) + if gd == nil { + return errGroupNotExist + } + + if rconn.HasOnCloseFunc("member_remove_invite") { + // 이미 초대 중이다. + // inviter한테 알려줘야 한다. + return gm.SendInvitationFailed(mid, inviteeDoc) + } + + tid, newdoc := gd.addInvite(inviteeDoc, rconn, time.Duration(gm.InviteExpire)*time.Second, gm.MaxMember) + if newdoc == nil { + return errNoEmptySlot + } + + rconn.RegistOnCloseFunc("member_remove_invite", func() { + gd.removeMember(targetid, &tid) + gm.memberSync(gid, targetid, tid, nil, false) + }) + + gm.memberSync(gid, targetid, tid, newdoc, false) + sendTypedMessage(gm, targetid, Invitation{ + GroupID: gid, + TicketID: tid, + Inviter: inviterDoc, + ExpireAtUTC: newdoc.InviteExpire.Unix(), + }) + + return nil +} + +var errAlreayMember = errors.New("this target is already member") + +func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) { + targetid, ok := inviteeDoc["_mid"].(accountID) + if !ok { + return "", errInviteeDocMidMissing + } + + if !gid.IsZero() { + if gd := gm.groups.find(gid); gd != nil { + if gd.InCharge != mid { + // 이러면 안된다. + // 초대는 InCharge만 할 수 있음 + return "", nil + } + } + } + + // gid는 미리 만들어 놔야함. + // 초대하는 클라이언트가 아직 group을 소유하지 않고 있을 수 있다. + // mid의 rconn이 이 호스트에 없더라도 gid는 이 request를 보낸 클라이언트가 받아야 하기 떄문 + if gid.IsZero() { + gid = primitive.NewObjectID() + } + + rconn := gm.hasConn(mid) + if rconn == nil { + // mid가 있는 곳에서 처리를 해야 접속 끊겼을 때 콜백을 먼저 등록할 수 있다. + // 콜백이 rconn에 먼저 등록되지 않으면 좀비 group이 생길 가능성이 생긴다. + return gid.Hex(), gm.rpc(mid).call(gid, mid, inviteeDoc, inviteeDoc) + } + + // 이제 여기는 mid가 InCharge이면서 rconn이 존재 + gd := gm.groups.find(gid) + if gd == nil { + _, gd = gm.groups.createWithID(gid, bson.M{}) + tid, newdoc := gd.addInCharge(mid, rconn, inviterDoc) + rconn.RegistOnCloseFunc("member_remove", func() { + // 내가 InCharge이므로 접속이 종료될 때 그룹을 해체한다. + gm.groupDocSync(gid, nil) + }) + bt, err := bson.Marshal(gd.Body) + if err != nil { + return "", err + } + gm.groupDocSync(gid, bt) + gm.memberSync(gid, mid, tid, newdoc, true) + } else { + // targetid가 이미 멤버인지 미리 확인 가능 + if !gd.ticket(targetid).IsZero() { + // 이미 멤버네 + return "", errAlreayMember + } + } + + return gid.Hex(), gm.InviteImplement(gid, mid, inviteeDoc, inviterDoc) +} + +func (gm *groupInMemory) UpdateGroupMember(gid groupID, mid accountID, tid ticketID, doc bson.M) error { + return nil +} +func (gm *groupInMemory) CancelInvitation(gid groupID, tid ticketID) error { + return nil +} + +func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, tid ticketID, member bson.M) (groupID, error) { + gd := gm.groups.find(gid) + if gd == nil { + return primitive.NilObjectID, errGroupNotExist + } + + rconn := gm.hasConn(mid) + if rconn == nil { + return gid, gm.rpc(mid).call(gid, mid, tid, member) + } + + oldFunc := rconn.UnregistOnCloseFunc("member_remove") + if oldFunc != nil { + // 기존 멤버였으면 탈퇴 처리 + oldFunc() + } + + inviteFunc := rconn.UnregistOnCloseFunc("member_remove_invite") + rconn.RegistOnCloseFunc("member_remove", inviteFunc) + + result, isNew := gd.addMember(mid, &tid, member) + if result != nil { + return gid, gm.memberSync(gid, mid, tid, result, isNew) + } + + // 실패 + return primitive.NilObjectID, nil +} + +func (gm *groupInMemory) DenyInvitation(gid groupID, mid accountID, tid ticketID) error { + gd := gm.groups.find(gid) + if gd == nil { + return errGroupNotExist + } + + rconn := gm.hasConn(mid) + if rconn == nil { + return gm.rpc(mid).call(gid, mid, tid) + } + + inviteFunc := rconn.UnregistOnCloseFunc("member_remove_invite") + if inviteFunc != nil { + inviteFunc() // removeMember는 여기에 들어있다. + return nil + } + + gd.removeMember(mid, &tid) + return gm.memberSync(gid, mid, tid, nil, false) +} + +func (gm *groupInMemory) QueryInvitations(mid accountID, after primitive.Timestamp) ([]bson.M, error) { + return nil, nil +} +func (gm *groupInMemory) Exist(gid groupID, filter bson.M) (bool, error) { + return false, nil +} +func (gm *groupInMemory) FindAll(filter bson.M, projection string, after primitive.Timestamp) ([]bson.M, error) { + return nil, nil +} +func (gm *groupInMemory) FindOne(gid groupID, projection string) (bson.M, error) { + return nil, nil +} + +func (gm *groupInMemory) DropPausedMember(gid primitive.ObjectID, mid primitive.ObjectID) error { + gd := gm.groups.find(gid) + if gd == nil { + return errGroupNotExist + } + + tid, memdoc := gd.memberByAccount(mid) + if memdoc == nil { + return errNotMember + } + + if _, ok := memdoc.Body["paused"]; ok { + // 드랍해야 한다. + if gd.InCharge == mid { + // 내가 방장인 경우 + gm.groupDocSync(gid, nil) + } else { + // 내가 방장이 아닌 경우 + gd.removeMember(mid, &tid) + gm.memberSync(gid, mid, tid, nil, false) + } + } + + return nil +} + +func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID, rconn *wshandler.Richconn) error { + rconn.UnregistOnCloseFunc("member_remove") + rconn.UnregistOnCloseFunc("member_remove_invite") + rconn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "pause"), time.Time{}) + + gd := gm.groups.find(gid) + if gd == nil { + return errGroupNotExist + } + + tid := primitive.NilObjectID + newdoc := gd.modifyMemberDocument(mid, &tid, func(memdoc *memberDoc) { + memdoc.Body["paused"] = true + memdoc.rconn = nil + }) + + return gm.memberSync(gid, mid, tid, newdoc, false) +} + +func (gm *groupInMemory) QueryMembers(gid groupID, reqID accountID, projection string, after primitive.Timestamp) (map[string]bson.M, error) { + gd := gm.groups.find(gid) + if gd == nil { + return nil, errGroupNotExist + } + + if gd.InCharge != reqID { + return nil, errGroupNotExist + } + + outdocs := make(map[string]bson.M) + if len(projection) > 0 { + projkeys := map[string]bool{} + for _, p := range strings.Split(projection, ",") { + if p[0] == '+' { + projkeys[strings.TrimSpace(p[1:])] = true + } else { + projkeys[strings.TrimSpace(p)] = true + } + } + + gd.iterateMembers(func(tid ticketID, memdoc *memberDoc) { + outdoc := bson.M{} + for k := range projkeys { + if k[0] == '_' { + outdoc[k] = memdoc.Hidden[k] + } else { + outdoc[k] = memdoc.Body[k] + } + } + outdocs[memdoc.Mid.Hex()] = outdoc + }) + } else { + gd.iterateMembers(func(tid ticketID, memdoc *memberDoc) { + outdoc := bson.M{} + for k, v := range memdoc.Hidden { + outdoc[k] = v + } + for k, v := range memdoc.Body { + outdoc[k] = v + } + outdocs[memdoc.Mid.Hex()] = outdoc + }) + } + + return outdocs, nil +} + +func (gm *groupInMemory) QueryMember(gid groupID, mid accountID, tid ticketID, projection string) (bson.M, error) { + return nil, nil +} + +var errHaveNoAuthority = errors.New("cannot kick other member") +var errNotMember = errors.New("ticket is not in this group") + +func (gm *groupInMemory) Leave(gid groupID, mid accountID, tid ticketID) error { + gd := gm.groups.find(gid) + if gd == nil { + return errGroupNotExist + } + + // mid가 InCharge인 경우에는 tid가 누구든 내쫓고, + // mid가 InCharge가 아닌 경우는 tid가 mid일 경우에만 나갈 수 있다. + memdoc := gd.member(tid) + if memdoc == nil { + return errNotMember + } + targetmid := memdoc.Mid + + // 내가 방장이면 아무나 내보낼 수 있다. + if gd.InCharge != mid && targetmid != mid { + // targetmid와 mid가 같아야 한다. 방장이 아니므로 나는 나만 내보낼 수 있다. + return errHaveNoAuthority + } + + // targetmid의 "member_remove" 함수를 등록 해제해야 하므로 rconn이 있는 곳에서 하자 + rconn := gm.hasConn(targetmid) + if rconn == nil { + return gm.rpc(targetmid).call(gid, mid, tid) + } + + if oldfunc := rconn.UnregistOnCloseFunc("member_remove"); oldfunc != nil { + oldfunc() // 이 안에 다 있다. + } + + // 나한테는 빈 FullGroupDoc을 보낸다. + sendTypedMessageDirect(rconn, FullGroupDoc{ + Gid: gid, + }) + + return nil +} + +func (gm *groupInMemory) UpdateMemberDocument(gid groupID, mid accountID, doc bson.M) error { + return nil +} +func (gm *groupInMemory) Dismiss(gid groupID) error { + return nil +} +func (gm *groupInMemory) UpdateGroupDocument(gid groupID, body []byte) error { + gd := gm.groups.find(gid) + if gd == nil { + return errGroupNotExist + } + + newbody, err := gd.updateBodyWithBson(body) + if err != nil { + return err + } + + return gm.groupDocSync(gid, newbody) +} + +func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, typename string, wsh *wshandler.WebsocketHandler) (group, error) { + // group document + // member document + groupDocSyncChanName := fmt.Sprintf("d_mgc_%s_%s", region, typename) + memberSyncChanName := fmt.Sprintf("m_mgc_%s_%s", region, typename) + rpcChanName := fmt.Sprintf("r_mgc_%s_%s", region, typename) + clientMessageChanName := fmt.Sprintf("c_mgc_%s_%s", region, typename) + + toHashHex := func(name string) string { + hash := md5.New() + hash.Write([]byte(name)) + if *common.Devflag { + hn, _ := os.Hostname() + hash.Write([]byte(hn)) + } + + return hex.EncodeToString(hash.Sum(nil)[:8]) + } + + groupDocSyncChanName = toHashHex(groupDocSyncChanName) + memberSyncChanName = toHashHex(memberSyncChanName) + rpcChanName = toHashHex(rpcChanName) + clientMessageChanName = toHashHex(clientMessageChanName) + + // 여기서는 subscribe channel + // 각 함수에서는 publish + gm := &groupInMemory{ + groupConfig: cfg, + groupDocSync: func(gid groupID, newbody []byte) error { + bt := []byte(fmt.Sprintf("%s%s", config.macAddr, gid.Hex())) + bt = append(bt, newbody...) + _, err := wsh.RedisSync.Publish(ctx, groupDocSyncChanName, bt).Result() + return err + }, + memberSync: func(gid groupID, mid accountID, tid ticketID, doc *memberDoc, newmember bool) error { + var payload string + if doc != nil { + bt, _ := json.Marshal(doc) + newmemberflag := func() string { + if newmember { + return "t" + } else { + return "f" + } + }() + payload = fmt.Sprintf("%s%s%s%s%s%s", config.macAddr, gid.Hex(), mid.Hex(), tid.Hex(), newmemberflag, string(bt)) + } else { + payload = fmt.Sprintf("%s%s%s%s", config.macAddr, gid.Hex(), mid.Hex(), tid.Hex()) + } + _, err := wsh.RedisSync.Publish(ctx, memberSyncChanName, payload).Result() + return err + }, + rpcCall: func(bt []byte) error { + _, err := wsh.RedisSync.Publish(ctx, rpcChanName, bt).Result() + return err + }, + hasConn: func(t accountID) *wshandler.Richconn { + return wsh.Conn(region, t) + }, + groups: groupContainer{ + groupDocs: make(map[groupID]*groupDoc), + }, + } + + // TODO : processChannelMessage 스레드 분리해보자 + processChannelMessage := func(gm *groupInMemory, pubsub *redis.PubSub) *redis.PubSub { + defer func() { + r := recover() + if r != nil { + logger.Error(r) + } + }() + + for msg := range pubsub.Channel() { + if msg == nil { + pubsub = nil + break + } + + switch msg.Channel { + case clientMessageChanName: + bt := []byte(msg.Payload) + if len(bt) < 24 { + break + } + + // 주!! mid가 먼저 + var mid groupID + copy(mid[:], bt[:12]) + bt = bt[12:] + + var gid groupID + copy(gid[:], bt[:12]) + bt = bt[12:] + + gd := gm.groups.find(gid) + if gd == nil { + break + } + tid, _ := gd.memberByAccount(mid) + if !tid.IsZero() { + personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt))) + if after, err := gd.updateBodyWithJson(personalized); err == nil { + go multicast(gd.conns(false), after) + } + } + + case groupDocSyncChanName: + payload := []byte(msg.Payload) + if len(payload) < len(config.macAddr) { + break + } + + senderHost, remain := payload[:len(config.macAddr)], payload[len(config.macAddr):] + if len(remain) < 24 { + break + } + + idstr, remain := remain[:24], remain[24:] + gid, _ := primitive.ObjectIDFromHex(string(idstr)) + gd := gm.groups.find(gid) + if gd != nil { + if len(remain) == 0 { + // gid 그룹 삭제 + // 그룹 안에 있는 멤버에게 알림 + bt, _ := json.Marshal(makeTypeMessage(FullGroupDoc{ + Gid: gid, + })) + go multicast(gd.conns(true), bt) + gm.groups.delete(gid) + } else if string(senderHost) != config.macAddr { + if r, err := gd.updateBodyBsonToJson(remain); err != nil { + logger.Error("groupDocSyncChanName message decode failed :", remain, err) + } else { + go multicast(gd.conns(true), r) + } + } + } else if string(senderHost) != config.macAddr { + var newDoc groupDoc + if err := newDoc.updateBody(remain); err != nil { + logger.Error("groupDocSyncChanName message decode failed :", remain, err) + } else { + gm.groups.add(gid, &newDoc) + } + } + + case memberSyncChanName: + if len(msg.Payload) < len(config.macAddr) { + break + } + + senderHost, remain := msg.Payload[:len(config.macAddr)], msg.Payload[len(config.macAddr):] + if len(remain) < 24 { + break + } + + idstr, remain := remain[:24], remain[24:] + gid, _ := primitive.ObjectIDFromHex(idstr) + gd := gm.groups.find(gid) + if gd == nil { + // 미리 그룹을 없애고 싱크 메시지를 보낸후 받은 것일 수 있다. + break + } + + idstr, remain = remain[:24], remain[24:] + mid, _ := primitive.ObjectIDFromHex(idstr) + idstr, remain = remain[:24], remain[24:] + tid, _ := primitive.ObjectIDFromHex(idstr) + + isNewMember := false + if len(remain) > 0 { + idstr, remain = remain[:1], remain[1:] + isNewMember = idstr == "t" + } + + var updated *memberDoc + rconn := wsh.Conn(region, mid) + + if senderHost != config.macAddr { + // 내가 보낸 메시지가 아니면 멤버 도큐먼트 업데이트 하고 브로드캐스팅 + if len(remain) == 0 { + // mid 삭제 + gd.removeMember(mid, &tid) + updated = nil + } else { + updated = gd.overwriteMemberDocument(mid, &tid, []byte(remain)) + } + } else { + updated = gd.member(tid) + } + + if updated == nil { + // 멤버 삭제 알림 + if rconn != nil { + // gid에 이미 다른 값이 있을 수 있다. + // 정확하게 이 값이면 제거하고, 아니면 넘어간다. + rconn.RemoveTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) + } + broadcastTypedMessage(gm, gid, PublicMemberDoc{Tid: tid}) + } else { + if isNewMember && updated.rconn == nil && rconn != nil { + updated.rconn = rconn + } + // 업데이트 된 플레이어(새로 들어온 플레이어 포함)를 모두에게 알려준다. 본인 포함, invitee 제외 + broadcastTypedMessage(gm, gid, PublicMemberDoc{ + Tid: tid, + memberDocCommon: updated.memberDocCommon, + }) + } + + if isNewMember { + if rconn != nil { + // 새 멤버이므로 기존 멤버를 다 보내준다. + rconn.AddTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) + rconn.WriteBytes(gd.serializeFull(gid, clientMessageChanName)) + } + } + + case rpcChanName: + targetbt, fn, params, err := rpc.Decode[accountID]([]byte(msg.Payload)) + if err != nil { + logger.Error("rpcChanName message decode failed :", msg.Payload, err) + break + } + + call := func() { + method, ok := reflect.TypeOf(gm).MethodByName(fn) + if !ok { + logger.Println("%s message decode failed :", targetbt, msg.Payload, err) + } + + args := []reflect.Value{ + reflect.ValueOf(gm), + } + for _, arg := range params { + args = append(args, reflect.ValueOf(arg)) + } + + method.Func.Call(args) + } + + if *targetbt == everyHost { + call() + } else if rconn := wsh.Conn(region, *targetbt); rconn != nil { + call() + } + default: + logger.Println("unknown channel") + } + } + return pubsub + } + + go func() { + defer func() { + r := recover() + if r != nil { + logger.Error(r) + } + }() + + var pubsub *redis.PubSub + for { + if pubsub == nil { + pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName, clientMessageChanName) + } + + if pubsub == nil { + time.Sleep(time.Second) + continue + } + pubsub = processChannelMessage(gm, pubsub) + } + }() + + return gm, nil +} diff --git a/core/group_mongo.go b/core/group_mongo.go new file mode 100644 index 0000000..5d3cb48 --- /dev/null +++ b/core/group_mongo.go @@ -0,0 +1,1050 @@ +package core + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/url" + "strings" + "time" + + common "repositories.action2quare.com/ayo/gocommon" + "repositories.action2quare.com/ayo/gocommon/logger" + "repositories.action2quare.com/ayo/gocommon/wshandler" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/bsonrw" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type groupMongo struct { + *groupConfig + + mongoClient common.MongoClient + hints map[string][]string + collectionName common.CollectionName + memberCollectionName common.CollectionName + inviteCollectionName common.CollectionName +} + +func (gm *groupMongo) Create(r url.Values, doc bson.M) (primitive.ObjectID, error) { + var filter bson.M + var hint string + + for h, fields := range gm.hints { + candidate := bson.M{} + for _, f := range fields { + if fv := r.Get(f); len(fv) == 0 { + break + } else if f == "_id" { + candidate["_id"], _ = primitive.ObjectIDFromHex(fv) + } else if f == "after" { + candidate["luts"] = bson.M{ + "$gt": common.DotStringToTimestamp(fv), + } + } else { + candidate[f] = fv + } + } + + if len(filter) < len(candidate) { + filter = candidate + hint = h + } + } + + if len(filter) == 1 { + if _, ok := filter["_id"]; ok { + hint = "_id_" + } + } + + if len(filter) == 0 { + return primitive.NilObjectID, fmt.Errorf("CreateGroup failed. filter is missing : %v", r) + } + + for f := range filter { + delete(doc, f) + } + + doc["members"] = []primitive.ObjectID{} + _, inserted, err := gm.mongoClient.Update(gm.collectionName, filter, bson.M{ + "$setOnInsert": doc, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }, options.Update().SetUpsert(true).SetHint(hint)) + + if err != nil { + return primitive.NilObjectID, err + } + + if inserted == nil { + return primitive.NilObjectID, errors.New("name is duplicated") + } + + return inserted.(primitive.ObjectID), nil +} + +var errAlreadyMemberOrDeletedMember = errors.New("JoinGroup failed. already member or deleting member") + +func (gm *groupMongo) Candidate(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error { + expireAt := time.Now().UTC().Add(time.Second * time.Duration(gm.CandidateExpire)).Unix() + doc["expiring"] = expireAt + doc["_candidate"] = true + + success, _, err := gm.mongoClient.Update(gm.memberCollectionName, bson.M{ + "_gid": groupID, + "_mid": memberID, + }, bson.M{ + "$setOnInsert": doc, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }, options.Update().SetUpsert(true)) + if err != nil { + logger.Error("JoinGroup failed. update candidate member collection err :", groupID, memberID, err) + return err + } + + if !success { + // 중복해서 보내지 말아라 + // 거절된 candidate가 또 요청을 보내더라도 expire될 때까지 계속 거절된다. + logger.Println("JoinGroup failed. already member or deleting member :", groupID, memberID) + return errAlreadyMemberOrDeletedMember + } + + return nil +} + +func (gm *groupMongo) Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (primitive.ObjectID, error) { + if ticketID == primitive.NilObjectID { + ticketID = primitive.NewObjectID() + doc["_gid"] = groupID + doc["_mid"] = memberID + } + + // member collection에 추가. 추가된 _id를 group Member에 push한다. + // 이렇게 하는 이유는 member document가 delete될때 _id만 알 수 있기 때문. + // 클라이언트는 _id와 member id와의 관계를 알 수 있어야 한다. + filter := bson.M{"_id": groupID} + + if gm.MaxMember > 0 { + // 풀방 플래그 + filter[fmt.Sprintf("members.%d", gm.MaxMember-1)] = bson.M{"$exists": false} + } + + success, _, err := gm.mongoClient.Update(gm.collectionName, filter, bson.M{ + "$push": bson.M{ + "members": ticketID, + }, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }, options.Update().SetUpsert(false).SetHint("_idmembers")) + + if err != nil { + logger.Error("JoinGroup failed :", err) + return primitive.NilObjectID, err + } + + if !success { + // 갑자기 풀방이 되었거나 이미 멤버다 + logger.Println("JoinGroup failed. push member failed :", groupID, memberID) + return primitive.NilObjectID, errAlreadyMemberOrDeletedMember + } + + doc["_ts"] = nil + doc["expiring"] = nil + doc["_candidate"] = nil + findoc := splitDocument(doc) + findoc["$currentDate"] = bson.M{ + "luts": bson.M{"$type": "timestamp"}, + } + + success, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ + "_id": ticketID, + }, findoc, options.Update().SetUpsert(true)) + if err != nil || !success { + gm.mongoClient.Update(gm.collectionName, bson.M{ + "_id": groupID, + }, bson.M{ + "$pull": bson.M{ + "members": ticketID, + }, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }, options.Update().SetUpsert(false)) + + logger.Error("JoinGroup failed. update member collection err :", err) + return primitive.NilObjectID, err + } + return ticketID, nil +} + +func (gm *groupMongo) FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID { + tid, err := gm.mongoClient.FindOne(gm.memberCollectionName, bson.M{ + "_gid": groupID, + "_mid": memberID, + }, options.FindOne().SetHint("gidmid").SetProjection(bson.M{"_id": 1})) + + if err != nil { + logger.Error("FindTicketID failed :", err) + return primitive.NilObjectID + } + + if tid == nil { + logger.Error("FindTicketID failed. tid not found :", groupID, memberID) + return primitive.NilObjectID + } + + return tid["_id"].(primitive.ObjectID) +} + +var errAlradyInvited = errors.New("already invited user") + +func (gm *groupMongo) Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) { + if gm.InviteeIsMember && gm.MaxMember > 0 { + vacant, err := gm.mongoClient.FindOne(gm.collectionName, bson.M{ + "_id": groupID, + fmt.Sprintf("members.%d", gm.MaxMember-1): bson.M{"$exists": false}, + }, options.FindOne().SetProjection(bson.M{"_id": 1})) + + if err != nil { + return "", err + } + + if vacant == nil { + // 빈 자리가 없다 + return "failed:full", nil + } + } + + expireAt := time.Now().UTC().Add(time.Second * time.Duration(gm.InviteExpire)).Unix() + + var success bool + var err error + tid := primitive.NewObjectID() + // invitee에게 초대장 보내기 + inviterDoc["_gid"] = groupID + inviterDoc["_mid"] = memberID + inviterDoc["expiring"] = expireAt + success, _, err = gm.mongoClient.Update(gm.inviteCollectionName, bson.M{ + "_id": tid, + }, bson.M{ + "$setOnInsert": inviterDoc, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + "_ts": bson.M{"$type": "date"}, + }, + }, options.Update().SetUpsert(true)) + if err != nil { + return "", err + } + if !success { + return "", errAlradyInvited + } + + inviteeDoc["expiring"] = expireAt + inviteeDoc["_gid"] = groupID + inviteeDoc["_mid"] = memberID + success, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ + "_id": tid, // inviteCollectionName에 추가된 _id와 동일하게 맞춘다. 검색에 용이 + }, bson.M{ + "$setOnInsert": inviteeDoc, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + "_ts": bson.M{"$type": "date"}, + }, + }, options.Update().SetHint("gidmid").SetUpsert(true)) + if err != nil { + return "", err + } + if !success { + return "", errAlradyInvited + } + + if gm.InviteeIsMember { + // 멤버로도 추가 + pushfilter := bson.M{"_id": groupID} + if gm.MaxMember > 0 { + pushfilter[fmt.Sprintf("members.%d", gm.MaxMember-1)] = bson.M{"$exists": false} + } + success, _, err = gm.mongoClient.Update(gm.collectionName, pushfilter, bson.M{ + "$push": bson.M{ + "members": tid, + }, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }, options.Update().SetUpsert(false)) + if err != nil { + return "", err + } + if !success { + return "", errAlradyInvited + } + + if !success { + // 이미 풀방.아래 Delete/Update는 실패해도 괜찮다. + gm.mongoClient.Delete(gm.memberCollectionName, bson.M{"_id": tid}) + gm.mongoClient.Update(gm.inviteCollectionName, bson.M{ + "_id": tid, + }, bson.M{ + "$set": bson.M{"name": ""}, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }, options.Update().SetUpsert(false)) + return "failed:full", nil + } + } + + return "success", nil +} + +func (gm *groupMongo) UpdateGroupMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (err error) { + var findoc bson.M + if doc == nil { + findoc = bson.M{ + "$set": bson.M{ + "_delete": true, + }, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + "_ts": bson.M{"$type": "date"}, + }, + } + } else { + findoc = splitDocument(doc) + findoc["$currentDate"] = bson.M{ + "luts": bson.M{"$type": "timestamp"}, + } + } + + if ticketID.IsZero() { + _, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ + "_gid": groupID, + "_mid": memberID, + }, findoc, options.Update().SetHint("gidmid").SetUpsert(false)) + } else { + _, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ + "_id": ticketID, + }, findoc, options.Update().SetUpsert(false)) + } + + return +} + +func (gm *groupMongo) CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error { + if gm.InviteeIsMember { + pulled, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ + "_id": groupID, + }, bson.M{ + "$pull": bson.M{ + "members": ticketID, + }, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }, options.Update().SetUpsert(false)) + if err != nil { + return err + } + if !pulled { + return nil + } + } + + // member collection 삭제 + _, err := gm.mongoClient.Delete(gm.memberCollectionName, bson.M{ + "_id": ticketID, + }) + if err != nil { + return err + } + + // 초대를 삭제하면 안된다. + // expiring될 때까지 냅두고, 클라이언트가 expiring을 보고 알아서 지우게 한다. + _, _, err = gm.mongoClient.Update(gm.inviteCollectionName, bson.M{ + "_id": ticketID, + }, bson.M{ + "$set": bson.M{"expiring": -1}, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }, options.Update().SetUpsert(false)) + + return err +} + +var errInvitationExpired = errors.New("invitation is expired") + +func (gm *groupMongo) AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error) { + gdoc, err := gm.mongoClient.FindOneAndUpdate(gm.memberCollectionName, bson.M{ + "_id": ticketID, + }, bson.M{ + "$set": member, + "$unset": bson.M{ + "expiring": 1, + "_ts": 1, + }, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }, options.FindOneAndUpdate().SetProjection(bson.M{"_gid": 1})) + if err != nil { + return primitive.NilObjectID, err + } + + if gdoc == nil { + // 만료되었다. + return primitive.NilObjectID, errInvitationExpired + } + + // 여기서는 삭제해도 된다. + gm.mongoClient.Delete(gm.inviteCollectionName, bson.M{ + "_id": ticketID, + }) + + gidbytes := gdoc["_gid"].(primitive.ObjectID) + return gidbytes, nil +} + +var errNotInvited = errors.New("invitation is not mine") + +func (gm *groupMongo) DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error { + // 여기서는 삭제해도 된다. + invdoc, err := gm.mongoClient.FindOne(gm.inviteCollectionName, bson.M{ + "_id": ticketID, + }, options.FindOne().SetProjection("_gid")) + if err != nil { + return err + } + + gid := invdoc["_gid"].(primitive.ObjectID) + success, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ + "_id": gid, + }, bson.M{ + "$pull": bson.M{ + "members": ticketID, + }, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }) + if err != nil { + return err + } + if !success { + return errNotInvited + } + + gm.mongoClient.Update(gm.memberCollectionName, bson.M{ + "_id": ticketID, + }, bson.M{ + "_delete": true, + }, options.Update().SetUpsert(false)) + gm.mongoClient.Delete(gm.inviteCollectionName, bson.M{ + "_id": ticketID, + }) + return nil +} + +func (gm *groupMongo) QueryInvitations(memberID primitive.ObjectID, after primitive.Timestamp) ([]bson.M, error) { + filter := bson.M{"_mid": memberID} + if !after.IsZero() { + filter["luts"] = bson.M{"$gt": after} + } + + return gm.mongoClient.FindAll(gm.inviteCollectionName, filter, options.Find().SetLimit(20).SetHint("mid")) +} + +func (gm *groupMongo) Exist(groupID primitive.ObjectID, filter bson.M) (bool, error) { + if filter == nil { + filter = bson.M{"_id": groupID} + } else { + filter["_id"] = groupID + } + found, err := gm.mongoClient.FindOne(gm.collectionName, filter, options.FindOne().SetProjection(bson.M{"_id": 1})) + if err != nil { + return false, err + } + + if found == nil { + return false, nil + } + return true, nil +} + +func (gm *groupMongo) FindAll(filter bson.M, projection string, after primitive.Timestamp) ([]bson.M, error) { + opt := options.Find().SetBatchSize(10) + if len(projection) > 0 { + projM := bson.M{} + for _, proj := range strings.Split(projection, ",") { + projM[proj] = 1 + } + } + + if !after.IsZero() { + filter["luts"] = bson.M{"$gt": after} + } + + return gm.mongoClient.FindAll(gm.collectionName, filter, opt) +} + +func (gm *groupMongo) FindOne(groupID primitive.ObjectID, projection string) (bson.M, error) { + op := options.FindOne() + if len(projection) > 0 { + proj := bson.M{} + for _, p := range strings.Split(projection, ",") { + if p[0] == '-' { + proj[strings.TrimSpace(p[1:])] = 0 + } else if p[0] == '+' { + proj[strings.TrimSpace(p[1:])] = 1 + } else { + proj[strings.TrimSpace(p)] = 1 + } + } + op = op.SetProjection(proj) + } + + return gm.mongoClient.FindOne(gm.collectionName, bson.M{"_id": groupID}, op) +} + +func (gm *groupMongo) QueryMembers(groupID primitive.ObjectID, reqID primitive.ObjectID, projection string, after primitive.Timestamp) (map[string]bson.M, error) { + op := options.Find() + if len(projection) > 0 { + proj := bson.M{} + for _, p := range strings.Split(projection, ",") { + if p[0] == '-' { + proj[strings.TrimSpace(p[1:])] = 0 + } else if p[0] == '+' { + proj[strings.TrimSpace(p[1:])] = 1 + } else { + proj[strings.TrimSpace(p)] = 1 + } + } + op = op.SetProjection(proj) + } + + filter := bson.M{"_gid": groupID} + + if after.IsZero() { + gm.mongoClient.Delete(gm.memberCollectionName, bson.M{ + "_gid": groupID, + "expiring": bson.M{"$lt": time.Now().UTC().Unix()}, + }) + } else { + filter["luts"] = bson.M{"$gt": after} + } + + all, err := gm.mongoClient.FindAll(gm.memberCollectionName, filter, op) + if err != nil { + return nil, err + } + + output := make(map[string]bson.M) + for _, m := range all { + output[m["_mid"].(primitive.ObjectID).Hex()] = m + } + return output, nil +} + +func (gm *groupMongo) QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error) { + filter := bson.M{"_gid": groupID} + if !ticketID.IsZero() { + filter["_id"] = ticketID + } else { + filter["_mid"] = memberID + } + + op := options.FindOne().SetHint("gidmid") + var projdoc bson.M + if len(projection) > 0 { + projdoc = bson.M{ + "_delete": 1, + } + for _, proj := range strings.Split(projection, ",") { + projdoc[proj] = 1 + } + } else { + projdoc = bson.M{ + "_ts": 0, + "_gid": 0, + "_mid": 0, + } + } + + op.SetProjection(projdoc) + + return gm.mongoClient.FindOne(gm.memberCollectionName, filter, op) +} + +func (gm *groupMongo) Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error { + if ticketID.IsZero() { + poptarget, err := gm.mongoClient.FindOne(gm.memberCollectionName, bson.M{ + "_gid": groupID, + "_mid": memberID, + }, options.FindOne().SetProjection(bson.M{"_id": 1}).SetHint("gidmid")) + if err != nil { + return err + } + + // Find와 Delete를 나눠야 한다. + // pull 하는 것이 더 중요하기 때문. + // pull에 실패하더라도 _id가 남아있어야 다시 시도가 가능하다 + if poptarget == nil { + // 왠지 만료되었거나 문제가 잇다 + return nil + } + + if _, ok := poptarget["_id"]; !ok { + return nil + } + + ticketID = poptarget["_id"].(primitive.ObjectID) + } + + _, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ + "_id": groupID, + }, bson.M{ + "$pull": bson.M{ + "members": ticketID, + }, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }) + if err != nil { + return err + } + + // Delete는 실패해도 넘어간다. + gm.mongoClient.Update(gm.memberCollectionName, bson.M{ + "_id": ticketID, + }, bson.M{ + "$set": bson.M{ + "_delete": true, + }, + "$currentDate": bson.M{ + "_ts": bson.M{"$type": "date"}, + }, + }) + return nil +} + +func (gm *groupMongo) UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error { + _, _, err := gm.mongoClient.Update(gm.memberCollectionName, bson.M{ + "_gid": groupID, + "_mid": memberID, + }, bson.M{ + "$set": doc, + "$currentDate": bson.M{ + "luts": bson.M{"$type": "timestamp"}, + }, + }, options.Update().SetUpsert(false).SetHint("gidmid")) + + return err +} + +func (gm *groupMongo) Dismiss(groupID primitive.ObjectID) error { + _, err := gm.mongoClient.Delete(gm.collectionName, bson.M{ + "_id": groupID, + }) + if err != nil { + return err + } + + gm.mongoClient.Delete(gm.memberCollectionName, bson.M{ + "_gid": groupID, + }) + return nil +} + +var errUpdateGroupDocumentFailed = errors.New("update group document failed") + +func (gm *groupMongo) UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error { + groupDoc, err := gm.mongoClient.FindOne(gm.collectionName, bson.M{ + "_id": groupID, + }, nil) + if err != nil { + return err + } + + decoder, err := bson.NewDecoder(bsonrw.NewBSONDocumentReader(body)) + if err != nil { + return err + } + + if err := decoder.Decode(&groupDoc); err != nil { + return err + } + + success, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ + "_id": groupID, + }, bson.M{ + "$set": groupDoc, + }, options.Update().SetUpsert(false)) + + if err != nil { + return err + } + + if !success { + return errUpdateGroupDocumentFailed + } + + return nil +} + +func (gm *groupMongo) DropPausedMember(gid primitive.ObjectID, mid primitive.ObjectID) error { + return nil +} + +func (gm *groupMongo) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID, conn *wshandler.Richconn) error { + return nil +} + +type mongowatcher struct { + collection common.CollectionName + pipeline mongo.Pipeline + op options.FullDocument + onChanged func(string, *groupPipelineDocument) +} + +func (w *mongowatcher) callOnChanged(region string, c *groupPipelineDocument) { + defer func() { + r := recover() + if r != nil { + logger.Error(r) + } + }() + w.onChanged(region, c) +} + +func (w *mongowatcher) monitorfunc(parentctx context.Context, region string, mongoClient common.MongoClient) { + defer func() { + r := recover() + if r != nil { + logger.Error(r) + } + logger.Println("watcher.,monitorfunc finished") + }() + + var groupstream *mongo.ChangeStream + var err error + var ctx context.Context + + defer func() { + if groupstream != nil { + groupstream.Close(ctx) + } + }() + + for { + err = nil + + if groupstream == nil { + groupstream, err = mongoClient.Watch(w.collection, w.pipeline, options.ChangeStream().SetFullDocument(w.op)) + if err != nil { + logger.Error(err) + time.Sleep(time.Minute) + } + ctx = context.TODO() + } + + if groupstream != nil { + changed := groupstream.TryNext(ctx) + if ctx.Err() != nil { + logger.Error("tavern monitorfunc TryNext error") + logger.Error(ctx.Err()) + groupstream.Close(ctx) + groupstream = nil + continue + } + + if changed { + var data groupPipelineDocument + if err := groupstream.Decode(&data); err == nil { + w.callOnChanged(region, &data) + } + } else if groupstream.Err() != nil || groupstream.ID() == 0 { + logger.Error(groupstream.Err()) + groupstream.Close(ctx) + groupstream = nil + } + } + } +} + +func (w mongowatcher) start(ctx context.Context, region string, mongoClient common.MongoClient) { + go w.monitorfunc(ctx, region, mongoClient) +} + +func (cfg *groupConfig) preparePersistent(ctx context.Context, region string, dbconn common.MongoClient, wsh *wshandler.WebsocketHandler) (group, error) { + uniqueindices := map[string]bson.D{} + hints := map[string][]string{} + + for _, ui := range cfg.UniqueIndex { + indexname := strings.ReplaceAll(ui, ",", "") + keys := strings.Split(ui, ",") + keydef := bson.D{} + for _, k := range keys { + keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1}) + } + + uniqueindices[indexname] = keydef + hints[indexname] = keys + } + + collectionName := common.CollectionName(cfg.Name) + memberCollectionName := common.CollectionName(cfg.Name + "-members") + inviteCollectionName := common.CollectionName(cfg.Name + "-invites") + + err := dbconn.MakeUniqueIndices(collectionName, uniqueindices) + if err != nil { + return nil, err + } + + indices := map[string]bson.D{} + for _, ui := range cfg.SearchIndex { + indexname := strings.ReplaceAll(ui, ",", "") + keys := strings.Split(ui, ",") + keydef := bson.D{} + for _, k := range keys { + keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1}) + } + + indices[indexname] = keydef + hints[indexname] = keys + } + + if _, ok := indices["_idmembers"]; !ok { + indices["_idmembers"] = bson.D{ + {Key: "_id", Value: 1}, + {Key: "members", Value: 1}, + } + } + + if len(cfg.TextSearchFields) > 0 { + var tsi bson.D + for _, stf := range cfg.TextSearchFields { + tsi = append(tsi, bson.E{Key: stf, Value: "text"}) + } + indices[cfg.TextSearchFields[0]+"_text"] = tsi + } + + err = dbconn.MakeIndices(collectionName, indices) + if err != nil { + return nil, err + } + + dbconn.MakeUniqueIndices(inviteCollectionName, map[string]bson.D{ + "gidmid": {bson.E{Key: "_gid", Value: 1}, bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}}, + }) + + if cfg.InviteeExlusive { + err = dbconn.MakeUniqueIndices(inviteCollectionName, map[string]bson.D{ + "mid": {bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}}, + }) + } else { + err = dbconn.MakeIndices(inviteCollectionName, map[string]bson.D{ + "mid": {bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}}, + }) + } + if err != nil { + return nil, err + } + + if cfg.InviteExpire > 0 { + err = dbconn.MakeExpireIndex(inviteCollectionName, cfg.InviteExpire) + if err != nil { + return nil, err + } + } + + err = dbconn.MakeUniqueIndices(memberCollectionName, map[string]primitive.D{ + "gidmid": {{Key: "_gid", Value: 1}, {Key: "_mid", Value: 1}}, + }) + if err != nil { + return nil, err + } + + for _, mi := range cfg.MemberIndex { + indexname := strings.ReplaceAll(mi, ",", "") + keys := strings.Split(mi, ",") + keydef := bson.D{} + for _, k := range keys { + keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1}) + } + err = dbconn.MakeIndices(memberCollectionName, map[string]bson.D{ + indexname: keydef, + }) + if err != nil { + return nil, err + } + } + + if cfg.InviteExpire > 0 { + err = dbconn.MakeExpireIndex(memberCollectionName, cfg.InviteExpire) + if err != nil { + return nil, err + } + } + + groupwatcher := mongowatcher{ + collection: collectionName, + op: options.Default, + pipeline: mongo.Pipeline{ + bson.D{ + { + Key: "$match", Value: bson.D{ + {Key: "operationType", Value: bson.D{ + {Key: "$in", Value: bson.A{"update"}}, + }}, + }, + }, + }, + }, + onChanged: func(r string, data *groupPipelineDocument) { + updates := data.UpdateDescription.UpdatedFields + gid := data.DocumentKey.Id + updates["_id"] = gid + updates["_hint"] = cfg.Name + wsh.Broadcast(r, gid, updates) + }, + } + groupwatcher.start(ctx, region, dbconn) + + m1 := &mongowatcher{ + collection: memberCollectionName, + op: options.Default, + pipeline: mongo.Pipeline{ + bson.D{ + { + Key: "$match", Value: bson.D{ + {Key: "operationType", Value: bson.D{ + {Key: "$in", Value: bson.A{"insert"}}, + }}, + }, + }, + }, + }, + onChanged: func(r string, data *groupPipelineDocument) { + gid := data.FullDocument["_gid"].(primitive.ObjectID) + delete(data.FullDocument, "_gid") + delete(data.FullDocument, "_mid") + delete(data.FullDocument, "_ts") + data.FullDocument["_hint"] = cfg.Name + data.FullDocument["_fullDocument"] = true + if _, candidate := data.FullDocument["_candidate"]; candidate { + gid[0] |= 0x80 + delete(data.FullDocument, "_candidate") + } + wsh.Broadcast(r, gid, data.FullDocument) + }, + } + m1.start(ctx, region, dbconn) + + m2 := &mongowatcher{ + collection: memberCollectionName, + op: options.UpdateLookup, + pipeline: mongo.Pipeline{ + bson.D{ + { + Key: "$match", Value: bson.D{ + {Key: "operationType", Value: bson.D{ + {Key: "$in", Value: bson.A{"update"}}, + }}, + }, + }, + }, + bson.D{ + { + Key: "$project", Value: bson.M{ + "fullDocument._id": 1, + "fullDocument._gid": 1, + "fullDocument._mid": 1, + "fullDocument._candidate": 1, + "updateDescription": 1, + }, + }, + }, + }, + onChanged: func(r string, data *groupPipelineDocument) { + gid := data.FullDocument["_gid"].(primitive.ObjectID) + updates := data.UpdateDescription.UpdatedFields + updates["_id"] = data.FullDocument["_id"] + updates["_hint"] = cfg.Name + for _, r := range data.UpdateDescription.RemovedFileds { + updates[r.(string)] = nil + } + + if _, candidate := data.FullDocument["_candidate"]; candidate { + // _candidate는 candidate 채널로 broadcast + gid[0] |= 0x80 + delete(data.FullDocument, "_candidate") + } + + if _, ok := updates["_delete"]; ok { + mid := data.FullDocument["_mid"].(primitive.ObjectID) + if conn := wsh.Conn(r, mid); conn != nil { + conn.Close() + } + } + + if v, ok := updates["_candidate"]; ok && v == nil { + // candidate에서 벗어났네? 접속을 끊고 재접속 유도 + mid := data.FullDocument["_mid"].(primitive.ObjectID) + if conn := wsh.Conn(r, mid); conn != nil { + conn.Close() + } + } + wsh.Broadcast(r, gid, updates) + }, + } + m2.start(ctx, region, dbconn) + + i1 := &mongowatcher{ + collection: inviteCollectionName, + op: options.UpdateLookup, + pipeline: mongo.Pipeline{ + bson.D{ + { + Key: "$match", Value: bson.D{ + {Key: "operationType", Value: bson.D{ + {Key: "$in", Value: bson.A{"insert", "update"}}, + }}, + }, + }, + }, + }, + onChanged: func(r string, data *groupPipelineDocument) { + alias := data.FullDocument["_mid"].(primitive.ObjectID) + conn := wsh.Conn(r, alias) + if conn != nil { + delete(data.FullDocument, "_ts") + delete(data.FullDocument, "_mid") + data.FullDocument["_fullDocument"] = true + data.FullDocument["_hint"] = inviteCollectionName + bt, _ := json.Marshal(data.FullDocument) + conn.WriteBytes(bt) + } + }, + } + i1.start(ctx, region, dbconn) + + return &groupMongo{ + groupConfig: cfg, + mongoClient: dbconn, + hints: hints, + collectionName: collectionName, + memberCollectionName: memberCollectionName, + inviteCollectionName: inviteCollectionName, + }, nil +} diff --git a/core/richconn.go b/core/richconn.go new file mode 100644 index 0000000..a359c56 --- /dev/null +++ b/core/richconn.go @@ -0,0 +1,73 @@ +package core + +import ( + "context" + + "repositories.action2quare.com/ayo/gocommon/logger" + "repositories.action2quare.com/ayo/gocommon/wshandler" + + "github.com/go-redis/redis/v8" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type richConnOuter struct { + wsh *wshandler.WebsocketHandler + rc *wshandler.Richconn +} + +func (sub richConnOuter) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error { + sub.wsh.JoinTag(region, tag, tid, sub.rc, hint) + + wsh := sub.wsh + sub.rc.RegistOnCloseFunc(tag.Hex(), func() { + wsh.LeaveTag(region, tag, tid) + }) + return nil +} + +func (sub richConnOuter) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error { + sub.SetStateInTag(region, tag, tid, "", hint) + return sub.wsh.LeaveTag(region, tag, tid) +} + +func (sub richConnOuter) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error { + return sub.wsh.SetStateInTag(region, tag, tid, state, hint) +} + +func (sub richConnOuter) TurnGroupOnline(key string, gid primitive.ObjectID, score float64) error { + gidhex := gid.Hex() + _, err := sub.wsh.RedisSync.ZAdd(context.Background(), key, &redis.Z{Score: score, Member: gidhex}).Result() + if err != nil { + logger.Error("TurnGroupOnline failed. redis.ZAdd return err :", err) + return err + } + + sub.rc.RegistOnCloseFunc(key, func() { + sub.wsh.RedisSync.ZRem(context.Background(), key, gidhex) + }) + + return nil +} + +func (sub richConnOuter) TurnGroupOffline(key string, gid primitive.ObjectID) error { + f := sub.rc.UnregistOnCloseFunc(key) + if f != nil { + f() + } else { + sub.wsh.RedisSync.ZRem(context.Background(), key, gid.Hex()) + } + return nil +} + +func (sub richConnOuter) SendMessage(doc []byte) error { + return sub.rc.WriteBytes(doc) +} + +func (sub richConnOuter) SendMessageToTag(region string, tag primitive.ObjectID, msg []byte) error { + sub.wsh.BroadcastRaw(region, tag, msg) + return nil +} + +func (sub richConnOuter) CloseOnPurpose() error { + return sub.rc.Close() +} diff --git a/core/rpc/connrpc.go b/core/rpc/connrpc.go new file mode 100644 index 0000000..75b02f4 --- /dev/null +++ b/core/rpc/connrpc.go @@ -0,0 +1,195 @@ +package rpc + +import ( + "bytes" + "encoding/gob" + "fmt" + "reflect" + + "repositories.action2quare.com/ayo/gocommon/logger" + "repositories.action2quare.com/ayo/gocommon/wshandler" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +var Everybody = primitive.ObjectID([12]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}) + +func init() { + gob.Register(bson.M{}) + gob.Register(primitive.ObjectID{}) + gob.Register(primitive.Timestamp{}) +} + +type RpcCaller struct { + publish func(bt []byte) error +} + +func NewRpcCaller(f func(bt []byte) error) RpcCaller { + return RpcCaller{ + publish: f, + } +} + +type rpcCallContext struct { + alias primitive.ObjectID + publish func(bt []byte) error +} + +func (c *RpcCaller) One(alias primitive.ObjectID) rpcCallContext { + return rpcCallContext{ + alias: alias, + publish: c.publish, + } +} + +func (c *RpcCaller) Everybody() rpcCallContext { + return rpcCallContext{ + alias: Everybody, + publish: c.publish, + } +} + +func IsCallerCalleeMethodMatch[Callee any]() error { + var caller rpcCallContext + var callee Callee + + callerType := reflect.TypeOf(caller) + calleeType := reflect.TypeOf(callee) + for i := 0; i < callerType.NumMethod(); i++ { + callerMethod := callerType.Method(i) + calleeMethod, ok := calleeType.MethodByName(callerMethod.Name) + if !ok { + return fmt.Errorf("method '%s' of '%s' is missing", callerMethod.Name, calleeType.Name()) + } + + if calleeMethod.Func.Type().NumIn() != callerMethod.Func.Type().NumIn() { + return fmt.Errorf("method '%s' argument num is not match", callerMethod.Name) + } + + if calleeMethod.Func.Type().NumOut() != callerMethod.Func.Type().NumOut() { + return fmt.Errorf("method '%s' out num is not match", callerMethod.Name) + } + + for i := 1; i < calleeMethod.Func.Type().NumIn(); i++ { + if calleeMethod.Func.Type().In(i) != callerMethod.Func.Type().In(i) { + return fmt.Errorf("method '%s' argument is not match. %s-%s", callerMethod.Name, calleeMethod.Func.Type().In(i).Name(), callerMethod.Func.Type().In(i).Name()) + } + } + + } + + return nil +} + +type fnsig struct { + FunctionName string `bson:"fn"` + Args []any `bson:"args"` +} + +func Encode[T any](prefix T, fn string, args ...any) ([]byte, error) { + m := append([]any{ + prefix, + fn, + }, args...) + + buff := new(bytes.Buffer) + encoder := gob.NewEncoder(buff) + err := encoder.Encode(m) + if err != nil { + logger.Error("rpcCallContext.send err :", err) + return nil, err + } + + return buff.Bytes(), nil +} + +func Decode[T any](src []byte) (*T, string, []any, error) { + var m []any + decoder := gob.NewDecoder(bytes.NewReader(src)) + if err := decoder.Decode(&m); err != nil { + logger.Error("RpcCallee.Call err :", err) + return nil, "", nil, err + } + + prfix := m[0].(T) + fn := m[1].(string) + + return &prfix, fn, m[2:], nil +} + +func decode(src []byte) (string, []any, error) { + var sig fnsig + decoder := gob.NewDecoder(bytes.NewReader(src)) + if err := decoder.Decode(&sig); err != nil { + logger.Error("RpcCallee.Call err :", err) + return "", nil, err + } + + return sig.FunctionName, sig.Args, nil +} + +func (c *rpcCallContext) send(fn string, args ...any) error { + bt, err := Encode(c.alias, fn, args...) + if err != nil { + return err + } + return c.publish(bt) +} + +type RpcCallee[T any] struct { + methods map[string]reflect.Method + create func(*wshandler.Richconn) *T +} + +func NewRpcCallee[T any](createReceiverFunc func(*wshandler.Richconn) *T) RpcCallee[T] { + out := RpcCallee[T]{ + methods: make(map[string]reflect.Method), + create: createReceiverFunc, + } + + var tmp *T + + tp := reflect.TypeOf(tmp) + for i := 0; i < tp.NumMethod(); i++ { + method := tp.Method(i) + out.methods[method.Name] = method + } + return out +} + +func (r RpcCallee[T]) Call(rc *wshandler.Richconn, src []byte) error { + defer func() { + s := recover() + if s != nil { + logger.Error(s) + } + }() + + fn, params, err := decode(src) + if err != nil { + logger.Error("RpcCallee.Call err :", err) + return err + } + + method, ok := r.methods[fn] + if !ok { + err := fmt.Errorf("method '%s' is missing", fn) + logger.Error("RpcCallee.Call err :", err) + return err + } + + receiver := r.create(rc) + args := []reflect.Value{ + reflect.ValueOf(receiver), + } + for _, arg := range params { + args = append(args, reflect.ValueOf(arg)) + } + + rets := method.Func.Call(args) + if len(rets) > 0 && rets[len(rets)-1].Interface() != nil { + return rets[len(rets)-1].Interface().(error) + } + return nil +} diff --git a/core/rpc/proxy.go b/core/rpc/proxy.go new file mode 100644 index 0000000..8996b4b --- /dev/null +++ b/core/rpc/proxy.go @@ -0,0 +1,33 @@ +package rpc + +import ( + "go.mongodb.org/mongo-driver/bson/primitive" +) + +func (c rpcCallContext) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error { + return c.send("JoinTag", region, tag, tid, hint) +} + +func (c rpcCallContext) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error { + return c.send("LeaveTag", region, tag, tid, hint) +} + +func (c rpcCallContext) TurnGroupOnline(key string, gid primitive.ObjectID, score float64) error { + return c.send("TurnGroupOnline", key, gid, score) +} + +func (c rpcCallContext) TurnGroupOffline(key string, gid primitive.ObjectID) error { + return c.send("TurnGroupOffline", key, gid) +} +func (c rpcCallContext) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error { + return c.send("SetStateInTag", region, tag, tid, state, hint) +} +func (c rpcCallContext) SendMessage(doc []byte) error { + return c.send("SendMessage", doc) +} +func (c rpcCallContext) SendMessageToTag(region string, gid primitive.ObjectID, msg []byte) error { + return c.send("SendMessageToTag", region, gid, msg) +} +func (c rpcCallContext) CloseOnPurpose() error { + return c.send("CloseOnPurpose") +} diff --git a/core/tavern.go b/core/tavern.go new file mode 100644 index 0000000..46581f6 --- /dev/null +++ b/core/tavern.go @@ -0,0 +1,317 @@ +package core + +import ( + "context" + "errors" + "flag" + "io" + "net" + "net/http" + "reflect" + "strings" + + common "repositories.action2quare.com/ayo/gocommon" + "repositories.action2quare.com/ayo/gocommon/logger" + "repositories.action2quare.com/ayo/gocommon/wshandler" + "repositories.action2quare.com/ayo/tavern/core/rpc" + + "github.com/go-redis/redis/v8" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/bsonrw" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +const ( + defaultMaxMemory = 32 << 10 // 32 KB +) + +func writeBsonArr(w io.Writer, src []bson.M) error { + return writeBsonDoc(w, bson.M{ + "r": src, + }) +} + +func onlineGroupQueryKey(prefix string) string { + return prefix + "_olg" +} + +func writeBsonDoc[T any](w io.Writer, src T) error { + rw, err := bsonrw.NewBSONValueWriter(w) + if err != nil { + return err + } + + enc, err := bson.NewEncoder(rw) + if err != nil { + return err + } + + return enc.Encode(src) +} + +func readBsonDoc(r io.Reader, src any) error { + body, err := io.ReadAll(r) + if err != nil { + return err + } + + if len(body) == 0 { + return nil + } + + decoder, err := bson.NewDecoder(bsonrw.NewBSONDocumentReader(body)) + if err != nil { + return err + } + + err = decoder.Decode(src) + if err != nil { + return err + } + + return nil +} + +type rpcCallDomain[T any] struct { + rpcCallChanName string + caller rpc.RpcCaller + callee rpc.RpcCallee[T] + methods map[string]reflect.Method +} + +func createRpcCallDomain[CalleeType any](syncConn *redis.Client, creator func(*wshandler.Richconn) *CalleeType) rpcCallDomain[CalleeType] { + var tmp *CalleeType + methods := make(map[string]reflect.Method) + tp := reflect.TypeOf(tmp) + for i := 0; i < tp.NumMethod(); i++ { + method := tp.Method(i) + methods[method.Name] = method + } + + rpcChanName := "conn_rpc_channel_" + tp.Name() + publishFunc := func(bt []byte) error { + _, err := syncConn.Publish(context.Background(), rpcChanName, bt).Result() + return err + } + + return rpcCallDomain[CalleeType]{ + rpcCallChanName: rpcChanName, + caller: rpc.NewRpcCaller(publishFunc), + callee: rpc.NewRpcCallee(creator), + methods: methods, + } +} + +type TavernConfig struct { + common.RegionStorageConfig `json:",inline"` + + GroupTypes map[string]*groupConfig `json:"tavern_group_types"` + MaingateApiToken string `json:"maingate_api_token"` + macAddr string +} + +var config TavernConfig + +type Tavern struct { + subTaverns []*subTavern + wsh *wshandler.WebsocketHandler +} + +type subTavern struct { + mongoClient common.MongoClient + wsh *wshandler.WebsocketHandler + region string + groups map[string]group + methods map[string]reflect.Method + wshRpc rpcCallDomain[richConnOuter] +} + +func getMacAddr() (string, error) { + ifas, err := net.Interfaces() + if err != nil { + return "", err + } + + for _, ifa := range ifas { + a := ifa.HardwareAddr.String() + if a != "" { + a = strings.ReplaceAll(a, ":", "") + return a, nil + } + } + return "", errors.New("no net interface") +} + +// New : +func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *TavernConfig) (*Tavern, error) { + if !flag.Parsed() { + flag.Parse() + } + + if inconfig == nil { + var loaded TavernConfig + if err := common.LoadConfig(&loaded); err != nil { + return nil, err + } + inconfig = &loaded + } + + config = *inconfig + macaddr, err := getMacAddr() + if err != nil { + return nil, err + } + config.macAddr = macaddr + tv := Tavern{ + wsh: wsh, + } + + if err = tv.prepare(context); err != nil { + logger.Println("tavern prepare() failed :", err) + return nil, err + } + + return &tv, nil +} + +func (tv *Tavern) Destructor() { + tv.wsh.Destructor() + for _, st := range tv.subTaverns { + st.mongoClient.Close() + } +} + +type groupPipelineDocument struct { + OperationType string `bson:"operationType"` + FullDocument map[string]any `bson:"fullDocument"` + DocumentKey struct { + Id primitive.ObjectID `bson:"_id"` + } `bson:"documentKey"` + UpdateDescription struct { + UpdatedFields bson.M `bson:"updatedFields"` + RemovedFileds bson.A `bson:"removedFields"` + TruncatedArrays bson.A `bson:"truncatedArrays"` + } `bson:"updateDescription"` +} + +func (tv *Tavern) prepare(ctx context.Context) error { + for region, url := range config.RegionStorage { + var dbconn common.MongoClient + var err error + var groupinstance group + + if err := rpc.IsCallerCalleeMethodMatch[richConnOuter](); err != nil { + return err + } + + var tmp *subTavern + methods := make(map[string]reflect.Method) + tp := reflect.TypeOf(tmp) + for i := 0; i < tp.NumMethod(); i++ { + method := tp.Method(i) + methods[method.Name] = method + } + + sub := &subTavern{ + wsh: tv.wsh, + mongoClient: dbconn, + region: region, + methods: methods, + } + + sub.wshRpc = createRpcCallDomain(tv.wsh.RedisSync, func(rc *wshandler.Richconn) *richConnOuter { + return &richConnOuter{wsh: sub.wsh, rc: rc} + }) + + groups := make(map[string]group) + for typename, cfg := range config.GroupTypes { + cfg.Name = typename + if cfg.Transient { + groupinstance, err = cfg.prepareInMemory(ctx, region, typename, tv.wsh) + } else { + if !dbconn.Connected() { + dbconn, err = common.NewMongoClient(ctx, url.Mongo, region) + if err != nil { + return err + } + } + groupinstance, err = cfg.preparePersistent(ctx, region, dbconn, tv.wsh) + } + if err != nil { + return err + } + groups[typename] = groupinstance + } + sub.groups = groups + + tv.subTaverns = append(tv.subTaverns, sub) + } + + return nil +} + +func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { + // request는 항상 서비스 서버를 거쳐서 들어온다. [client] <--tls--> [service server] <--http--> tavern + // 클라이언트는 tavern으로부터 메시지를 수신할 뿐, 송신하지 못한다. + // 단, 요청은 https 서비스 서버를 통해 들어오고 클라이언트는 ws으로 수신만 한다는 원칙이 유지되어야 한다.(채팅 메시지는 예외?) + for _, sub := range tv.subTaverns { + var pattern string + if sub.region == "default" { + pattern = common.MakeHttpHandlerPattern(prefix, "api") + } else { + pattern = common.MakeHttpHandlerPattern(prefix, sub.region, "api") + } + serveMux.HandleFunc(pattern, sub.api) + + deliveryChan := tv.wsh.DeliveryChannel(sub.region) + go sub.deliveryMessageHandler(deliveryChan) + } + + return nil +} + +func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) { + defer func() { + s := recover() + if s != nil { + logger.Error(s) + } + io.Copy(io.Discard, r.Body) + r.Body.Close() + }() + + // 서버에서 오는 요청만 처리 + apitoken := r.Header.Get("MG-X-API-TOKEN") + if apitoken != config.MaingateApiToken { + // 서버가 보내는 쿼리만 허용 + logger.Println("MG-X-API-TOKEN is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + operation := r.URL.Query().Get("operation") + if len(operation) == 0 { + w.WriteHeader(http.StatusBadRequest) + return + } + + method, ok := sub.methods[operation] + if !ok { + // 없는 operation + logger.Println("fail to call api. operation is not valid :", operation) + w.WriteHeader(http.StatusBadRequest) + return + } + + if r.PostForm == nil { + r.ParseMultipartForm(defaultMaxMemory) + } + + args := []reflect.Value{ + reflect.ValueOf(sub), + reflect.ValueOf(w), + reflect.ValueOf(r), + } + + method.Func.Call(args) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..6cd9319 --- /dev/null +++ b/go.mod @@ -0,0 +1,27 @@ +module repositories.action2quare.com/ayo/tavern + +go 1.19 + +require ( + github.com/go-redis/redis/v8 v8.11.5 + github.com/gorilla/websocket v1.5.0 + go.mongodb.org/mongo-driver v1.11.6 + repositories.action2quare.com/ayo/gocommon v0.0.0-20230524065958-abddf26379d1 +) + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/golang/snappy v0.0.1 // indirect + github.com/klauspost/compress v1.13.6 // indirect + github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect + github.com/pires/go-proxyproto v0.7.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.1 // indirect + github.com/xdg-go/stringprep v1.0.3 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + golang.org/x/crypto v0.9.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/text v0.9.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..aea4837 --- /dev/null +++ b/go.sum @@ -0,0 +1,78 @@ +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/pires/go-proxyproto v0.7.0 h1:IukmRewDQFWC7kfnb66CSomk2q/seBuilHBYFwyq0Hs= +github.com/pires/go-proxyproto v0.7.0/go.mod h1:Vz/1JPY/OACxWGQNIRY2BeyDmpoaWmEP40O9LbuiFR4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +go.mongodb.org/mongo-driver v1.11.6 h1:XM7G6PjiGAO5betLF13BIa5TlLUUE3uJ/2Ox3Lz1K+o= +go.mongodb.org/mongo-driver v1.11.6/go.mod h1:G9TgswdsWjX4tmDA5zfs2+6AEPpYJwqblyjsfuh8oXY= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= +golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230524065958-abddf26379d1 h1:0OInnPqpzQhRf1zwVTbWqWByWs8MTYbZc8c95099bSM= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230524065958-abddf26379d1/go.mod h1:5RmALPCFGFmqXa+AAPLsQaSlBVBafwX1H2CnIhsCM50= diff --git a/main.go b/main.go new file mode 100644 index 0000000..543b13a --- /dev/null +++ b/main.go @@ -0,0 +1,46 @@ +// warroom project main.go +package main + +import ( + "context" + "flag" + "net/http" + + "repositories.action2quare.com/ayo/gocommon/wshandler" + "repositories.action2quare.com/ayo/tavern/core" + + common "repositories.action2quare.com/ayo/gocommon" + "repositories.action2quare.com/ayo/gocommon/logger" +) + +func main() { + if !flag.Parsed() { + flag.Parse() + } + + ctx, cancel := context.WithCancel(context.Background()) + var config core.TavernConfig + if err := common.LoadConfig(&config); err != nil { + panic(err) + } + + authcache, err := common.NewAuthCollectionGlobal(ctx, config.MaingateApiToken) + if err != nil { + panic(err) + } + + wsh := wshandler.NewWebsocketHandler(authcache) + if tv, err := core.New(ctx, wsh, &config); err != nil { + panic(err) + } else { + serveMux := http.NewServeMux() + wsh.RegisterHandlers(ctx, serveMux, *common.PrefixPtr) + tv.RegisterHandlers(ctx, serveMux, *common.PrefixPtr) + server := common.NewHTTPServer(serveMux) + logger.Println("tavern is started") + server.Start() + cancel() + tv.Destructor() + wsh.Destructor() + } +}