From 8d0f21077d8701fb6eba39ca035823fcd1e6ab89 Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 6 Jul 2023 00:53:53 +0900 Subject: [PATCH] =?UTF-8?q?wshandler=EC=99=80=20=EB=B6=84=EB=A6=AC=20?= =?UTF-8?q?=EC=A4=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/apiimpl.go | 268 +---------- core/group_memory.go | 6 +- core/group_mongo.go | 1050 ------------------------------------------ core/richconn.go | 132 +++--- core/tavern.go | 66 +-- go.mod | 2 +- go.sum | 6 + 7 files changed, 108 insertions(+), 1422 deletions(-) delete mode 100644 core/group_mongo.go diff --git a/core/apiimpl.go b/core/apiimpl.go index 3ab6061..8232570 100644 --- a/core/apiimpl.go +++ b/core/apiimpl.go @@ -132,147 +132,6 @@ func (sub *subTavern) JoinGroup(w http.ResponseWriter, r *http.Request) { } } -func (sub *subTavern) EnterCandidateChannel(w http.ResponseWriter, r *http.Request) { - typename, _ := common.ReadStringFormValue(r.Form, "type") - if _, ok := sub.groups[typename]; !ok { - logger.Println("EnterCandidateChannel failed. group type is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - - midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid") - if !ok { - logger.Println("EnterCandidateChannel failed. mid is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") - if !ok { - logger.Println("EnterCandidateChannel failed. gid is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - - // candidate channel은 big endian 최상위 비트가 1 - gidobj[0] |= 0x80 - - if conn := sub.wsh.Conn(sub.region, midobj); conn != nil { - richConnOuter{wsh: sub.wsh, rc: conn}.JoinTag(sub.region, gidobj, midobj, typename) - } else { - sub.wshRpc.caller.One(midobj).JoinTag(sub.region, gidobj, midobj, typename) - } -} - -func (sub *subTavern) LeaveCandidateChannel(w http.ResponseWriter, r *http.Request) { - typename, _ := common.ReadStringFormValue(r.Form, "type") - if _, ok := sub.groups[typename]; !ok { - logger.Println("EnterCandidateChannel failed. group type is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - - midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid") - if !ok { - logger.Println("EnterCandidateChannel failed. mid is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") - if !ok { - logger.Println("EnterCandidateChannel failed. gid is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - - // candidate channel은 big endian 최상위 비트가 1 - gidobj[0] |= 0x80 - - if conn := sub.wsh.Conn(sub.region, midobj); conn != nil { - richConnOuter{wsh: sub.wsh, rc: conn}.LeaveTag(sub.region, gidobj, midobj, typename) - } else { - sub.wshRpc.caller.One(midobj).LeaveTag(sub.region, gidobj, midobj, typename) - } -} - -func (sub *subTavern) EnterGroupChannel(w http.ResponseWriter, r *http.Request) { - typename, _ := common.ReadStringFormValue(r.Form, "type") - group := sub.groups[typename] - if group == nil { - logger.Println("EnterGroupChannel failed. group type is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - - midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid") - if !ok { - logger.Println("EnterGroupChannel failed. mid is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") - if !ok { - logger.Println("EnterGroupChannel failed. gid is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - - tid := group.FindTicketID(gidobj, midobj) - if tid.IsZero() { - logger.Println("EnterGroupChannel failed. tid is zero") - w.WriteHeader(http.StatusBadRequest) - return - } - - if conn := sub.wsh.Conn(sub.region, midobj); conn != nil { - richConnOuter{wsh: sub.wsh, rc: conn}.JoinTag(sub.region, gidobj, tid, typename) - } else { - sub.wshRpc.caller.One(midobj).JoinTag(sub.region, gidobj, tid, typename) - } - writeBsonDoc(w, primitive.M{"_id": tid}) -} - -func (sub *subTavern) SetStateInGroup(w http.ResponseWriter, r *http.Request) { - gid, ok := common.ReadObjectIDFormValue(r.Form, "gid") - if !ok { - logger.Println("SetStateInGroup failed. tag is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") - if !ok { - logger.Println("SetStateInGroup failed. mid form value is missing") - w.WriteHeader(http.StatusBadRequest) - return - } - - state, ok := common.ReadStringFormValue(r.Form, "state") - if !ok { - logger.Println("SetStateInGroup failed. state is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - - typename, ok := common.ReadStringFormValue(r.Form, "type") - if !ok { - logger.Println("SetStateInGroup failed. type is missing :", r) - w.WriteHeader(http.StatusBadRequest) - return - } - var doc bson.M - if err := readBsonDoc(r.Body, &doc); err != nil { - logger.Error("SetStateInGroup failed. readBsonDoc err :", err) - w.WriteHeader(http.StatusBadRequest) - return - } - - tid := doc["_id"].(primitive.ObjectID) - if conn := sub.wsh.Conn(sub.region, mid); conn != nil { - richConnOuter{wsh: sub.wsh, rc: conn}.SetStateInTag(sub.region, gid, tid, state, typename) - } else { - sub.wshRpc.caller.One(mid).SetStateInTag(sub.region, gid, tid, state, typename) - } -} - // Invite : 초대 // - type : 초대 타입 (required) // - from : 초대하는 자 (required) @@ -490,119 +349,6 @@ func (sub *subTavern) QueryInvitations(w http.ResponseWriter, r *http.Request) { } } -func (sub *subTavern) TurnGroupOnline(w http.ResponseWriter, r *http.Request) { - // group을 online 상태로 만든다. - // 요청을 보내는 클라이언트의 conn이 끊이면 online에서 제거한다. - // online인 group을 가지고 뭘 할지는 게임이 알아서... - typename, _ := common.ReadStringFormValue(r.Form, "type") - group := sub.groups[typename] - if group == nil { - logger.Println("TurnGroupOnline failed. group type is missing :", r.Form) - w.WriteHeader(http.StatusBadRequest) - return - } - - gid, ok := common.ReadObjectIDFormValue(r.Form, "_id") - if !ok { - logger.Println("TurnGroupOnline failed. group id '_id' form value is missing :", r.Form) - w.WriteHeader(http.StatusBadRequest) - return - } - - mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") - if !ok { - logger.Println("TurnGroupOnline failed. mid form value is missing") - w.WriteHeader(http.StatusBadRequest) - return - } - - var filter bson.M - if err := readBsonDoc(r.Body, &filter); err != nil { - logger.Error("TurnGroupOnline failed. readBsonDoc return err :", err) - w.WriteHeader(http.StatusBadRequest) - return - } - - exist, err := group.Exist(gid, filter) - if err != nil { - logger.Error("TurnGroupOnline failed. FindOne return err :", err) - w.WriteHeader(http.StatusBadRequest) - return - } - - if !exist { - logger.Println("TurnGroupOnline failed. filter not match", filter) - w.WriteHeader(http.StatusBadRequest) - return - } - - score, ok := common.ReadFloatFormValue(r.Form, "score") - if !ok { - score = 100 - } - - if conn := sub.wsh.Conn(sub.region, mid); conn != nil { - err = richConnOuter{wsh: sub.wsh, rc: conn}.TurnGroupOnline(onlineGroupQueryKey(typename), gid, score) - } else { - err = sub.wshRpc.caller.One(mid).TurnGroupOnline(onlineGroupQueryKey(typename), gid, score) - } - if err != nil { - logger.Error("TurnGroupOnline failed. TurnGroupOnline err :", err) - w.WriteHeader(http.StatusBadRequest) - return - } -} - -func (sub *subTavern) TurnGroupOffline(w http.ResponseWriter, r *http.Request) { - // group을 offline 상태로 만든다. - // 요청을 보내는 클라이언트의 conn이 끊이면 online에서 제거한다. - // online인 group을 가지고 뭘 할지는 게임이 알아서... - typename, _ := common.ReadStringFormValue(r.Form, "type") - group := sub.groups[typename] - if group == nil { - logger.Println("TurnGroupOffline failed. group type is missing :", r.Form) - w.WriteHeader(http.StatusBadRequest) - return - } - - gid, ok := common.ReadObjectIDFormValue(r.Form, "_id") - if !ok { - logger.Println("TurnGroupOffline failed. group id '_id' form value is missing :", r.Form) - w.WriteHeader(http.StatusBadRequest) - return - } - - mid, ok := common.ReadObjectIDFormValue(r.Form, "mid") - if !ok { - logger.Println("TurnGroupOffline failed. mid form value is missing") - w.WriteHeader(http.StatusBadRequest) - return - } - - // onlinename := onlineGroupQueryKey(typename) - // if onClose := conn.UnregistOnCloseFunc(onlinename); onClose != nil { - // onClose() - // } else { - // gid, ok := common.ReadStringFormValue(form, "_id") - // if ok { - // sub.redisSync.ZRem(context.Background(), onlinename, gid) - // } - // } - - var err error - if conn := sub.wsh.Conn(sub.region, mid); conn != nil { - err = richConnOuter{wsh: sub.wsh, rc: conn}.TurnGroupOffline(onlineGroupQueryKey(typename), gid) - } else { - err = sub.wshRpc.caller.One(mid).TurnGroupOffline(onlineGroupQueryKey(typename), gid) - } - - if err != nil { - logger.Error("TurnGroupOffline failed. TurnGroupOnline err :", err) - w.WriteHeader(http.StatusBadRequest) - return - } -} - func (sub *subTavern) QueryOnlineGroup(w http.ResponseWriter, r *http.Request) { typename, _ := common.ReadStringFormValue(r.Form, "type") group := sub.groups[typename] @@ -690,11 +436,7 @@ func (sub *subTavern) QueryOnlineState(w http.ResponseWriter, r *http.Request) { return } - state, err := sub.wsh.GetState(mid) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } + state := sub.wsh.GetState(sub.region, mid) w.Write([]byte(state)) } @@ -706,13 +448,7 @@ func (sub *subTavern) IsOnline(w http.ResponseWriter, r *http.Request) { return } - ok, err := sub.wsh.IsOnline(mid) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - if ok { + if state := sub.wsh.GetState(sub.region, mid); len(state) > 0 { w.Write([]byte("true")) } else { w.Write([]byte("false")) diff --git a/core/group_memory.go b/core/group_memory.go index def455f..a1e976b 100644 --- a/core/group_memory.go +++ b/core/group_memory.go @@ -588,10 +588,6 @@ func multicast(conns []*wshandler.Richconn, raw []byte) { rconn.WriteBytes(raw) } } -func multicastTyped[T any](conns []*wshandler.Richconn, msg T) { - bt, _ := json.Marshal(makeTypeMessage(msg)) - go multicast(conns, bt) -} func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) { if gd := gm.groups.find(gid); gd != nil { @@ -1172,7 +1168,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type call := func() { method, ok := reflect.TypeOf(gm).MethodByName(fn) if !ok { - logger.Printf("%s message decode failed :", targetbt, msg.Payload, err) + logger.Println("message decode failed :", err, targetbt, msg.Payload) } args := []reflect.Value{ diff --git a/core/group_mongo.go b/core/group_mongo.go deleted file mode 100644 index 5d3cb48..0000000 --- a/core/group_mongo.go +++ /dev/null @@ -1,1050 +0,0 @@ -package core - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "net/url" - "strings" - "time" - - common "repositories.action2quare.com/ayo/gocommon" - "repositories.action2quare.com/ayo/gocommon/logger" - "repositories.action2quare.com/ayo/gocommon/wshandler" - - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/bsonrw" - "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -type groupMongo struct { - *groupConfig - - mongoClient common.MongoClient - hints map[string][]string - collectionName common.CollectionName - memberCollectionName common.CollectionName - inviteCollectionName common.CollectionName -} - -func (gm *groupMongo) Create(r url.Values, doc bson.M) (primitive.ObjectID, error) { - var filter bson.M - var hint string - - for h, fields := range gm.hints { - candidate := bson.M{} - for _, f := range fields { - if fv := r.Get(f); len(fv) == 0 { - break - } else if f == "_id" { - candidate["_id"], _ = primitive.ObjectIDFromHex(fv) - } else if f == "after" { - candidate["luts"] = bson.M{ - "$gt": common.DotStringToTimestamp(fv), - } - } else { - candidate[f] = fv - } - } - - if len(filter) < len(candidate) { - filter = candidate - hint = h - } - } - - if len(filter) == 1 { - if _, ok := filter["_id"]; ok { - hint = "_id_" - } - } - - if len(filter) == 0 { - return primitive.NilObjectID, fmt.Errorf("CreateGroup failed. filter is missing : %v", r) - } - - for f := range filter { - delete(doc, f) - } - - doc["members"] = []primitive.ObjectID{} - _, inserted, err := gm.mongoClient.Update(gm.collectionName, filter, bson.M{ - "$setOnInsert": doc, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }, options.Update().SetUpsert(true).SetHint(hint)) - - if err != nil { - return primitive.NilObjectID, err - } - - if inserted == nil { - return primitive.NilObjectID, errors.New("name is duplicated") - } - - return inserted.(primitive.ObjectID), nil -} - -var errAlreadyMemberOrDeletedMember = errors.New("JoinGroup failed. already member or deleting member") - -func (gm *groupMongo) Candidate(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error { - expireAt := time.Now().UTC().Add(time.Second * time.Duration(gm.CandidateExpire)).Unix() - doc["expiring"] = expireAt - doc["_candidate"] = true - - success, _, err := gm.mongoClient.Update(gm.memberCollectionName, bson.M{ - "_gid": groupID, - "_mid": memberID, - }, bson.M{ - "$setOnInsert": doc, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }, options.Update().SetUpsert(true)) - if err != nil { - logger.Error("JoinGroup failed. update candidate member collection err :", groupID, memberID, err) - return err - } - - if !success { - // 중복해서 보내지 말아라 - // 거절된 candidate가 또 요청을 보내더라도 expire될 때까지 계속 거절된다. - logger.Println("JoinGroup failed. already member or deleting member :", groupID, memberID) - return errAlreadyMemberOrDeletedMember - } - - return nil -} - -func (gm *groupMongo) Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (primitive.ObjectID, error) { - if ticketID == primitive.NilObjectID { - ticketID = primitive.NewObjectID() - doc["_gid"] = groupID - doc["_mid"] = memberID - } - - // member collection에 추가. 추가된 _id를 group Member에 push한다. - // 이렇게 하는 이유는 member document가 delete될때 _id만 알 수 있기 때문. - // 클라이언트는 _id와 member id와의 관계를 알 수 있어야 한다. - filter := bson.M{"_id": groupID} - - if gm.MaxMember > 0 { - // 풀방 플래그 - filter[fmt.Sprintf("members.%d", gm.MaxMember-1)] = bson.M{"$exists": false} - } - - success, _, err := gm.mongoClient.Update(gm.collectionName, filter, bson.M{ - "$push": bson.M{ - "members": ticketID, - }, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }, options.Update().SetUpsert(false).SetHint("_idmembers")) - - if err != nil { - logger.Error("JoinGroup failed :", err) - return primitive.NilObjectID, err - } - - if !success { - // 갑자기 풀방이 되었거나 이미 멤버다 - logger.Println("JoinGroup failed. push member failed :", groupID, memberID) - return primitive.NilObjectID, errAlreadyMemberOrDeletedMember - } - - doc["_ts"] = nil - doc["expiring"] = nil - doc["_candidate"] = nil - findoc := splitDocument(doc) - findoc["$currentDate"] = bson.M{ - "luts": bson.M{"$type": "timestamp"}, - } - - success, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ - "_id": ticketID, - }, findoc, options.Update().SetUpsert(true)) - if err != nil || !success { - gm.mongoClient.Update(gm.collectionName, bson.M{ - "_id": groupID, - }, bson.M{ - "$pull": bson.M{ - "members": ticketID, - }, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }, options.Update().SetUpsert(false)) - - logger.Error("JoinGroup failed. update member collection err :", err) - return primitive.NilObjectID, err - } - return ticketID, nil -} - -func (gm *groupMongo) FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID { - tid, err := gm.mongoClient.FindOne(gm.memberCollectionName, bson.M{ - "_gid": groupID, - "_mid": memberID, - }, options.FindOne().SetHint("gidmid").SetProjection(bson.M{"_id": 1})) - - if err != nil { - logger.Error("FindTicketID failed :", err) - return primitive.NilObjectID - } - - if tid == nil { - logger.Error("FindTicketID failed. tid not found :", groupID, memberID) - return primitive.NilObjectID - } - - return tid["_id"].(primitive.ObjectID) -} - -var errAlradyInvited = errors.New("already invited user") - -func (gm *groupMongo) Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) { - if gm.InviteeIsMember && gm.MaxMember > 0 { - vacant, err := gm.mongoClient.FindOne(gm.collectionName, bson.M{ - "_id": groupID, - fmt.Sprintf("members.%d", gm.MaxMember-1): bson.M{"$exists": false}, - }, options.FindOne().SetProjection(bson.M{"_id": 1})) - - if err != nil { - return "", err - } - - if vacant == nil { - // 빈 자리가 없다 - return "failed:full", nil - } - } - - expireAt := time.Now().UTC().Add(time.Second * time.Duration(gm.InviteExpire)).Unix() - - var success bool - var err error - tid := primitive.NewObjectID() - // invitee에게 초대장 보내기 - inviterDoc["_gid"] = groupID - inviterDoc["_mid"] = memberID - inviterDoc["expiring"] = expireAt - success, _, err = gm.mongoClient.Update(gm.inviteCollectionName, bson.M{ - "_id": tid, - }, bson.M{ - "$setOnInsert": inviterDoc, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - "_ts": bson.M{"$type": "date"}, - }, - }, options.Update().SetUpsert(true)) - if err != nil { - return "", err - } - if !success { - return "", errAlradyInvited - } - - inviteeDoc["expiring"] = expireAt - inviteeDoc["_gid"] = groupID - inviteeDoc["_mid"] = memberID - success, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ - "_id": tid, // inviteCollectionName에 추가된 _id와 동일하게 맞춘다. 검색에 용이 - }, bson.M{ - "$setOnInsert": inviteeDoc, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - "_ts": bson.M{"$type": "date"}, - }, - }, options.Update().SetHint("gidmid").SetUpsert(true)) - if err != nil { - return "", err - } - if !success { - return "", errAlradyInvited - } - - if gm.InviteeIsMember { - // 멤버로도 추가 - pushfilter := bson.M{"_id": groupID} - if gm.MaxMember > 0 { - pushfilter[fmt.Sprintf("members.%d", gm.MaxMember-1)] = bson.M{"$exists": false} - } - success, _, err = gm.mongoClient.Update(gm.collectionName, pushfilter, bson.M{ - "$push": bson.M{ - "members": tid, - }, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }, options.Update().SetUpsert(false)) - if err != nil { - return "", err - } - if !success { - return "", errAlradyInvited - } - - if !success { - // 이미 풀방.아래 Delete/Update는 실패해도 괜찮다. - gm.mongoClient.Delete(gm.memberCollectionName, bson.M{"_id": tid}) - gm.mongoClient.Update(gm.inviteCollectionName, bson.M{ - "_id": tid, - }, bson.M{ - "$set": bson.M{"name": ""}, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }, options.Update().SetUpsert(false)) - return "failed:full", nil - } - } - - return "success", nil -} - -func (gm *groupMongo) UpdateGroupMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (err error) { - var findoc bson.M - if doc == nil { - findoc = bson.M{ - "$set": bson.M{ - "_delete": true, - }, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - "_ts": bson.M{"$type": "date"}, - }, - } - } else { - findoc = splitDocument(doc) - findoc["$currentDate"] = bson.M{ - "luts": bson.M{"$type": "timestamp"}, - } - } - - if ticketID.IsZero() { - _, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ - "_gid": groupID, - "_mid": memberID, - }, findoc, options.Update().SetHint("gidmid").SetUpsert(false)) - } else { - _, _, err = gm.mongoClient.Update(gm.memberCollectionName, bson.M{ - "_id": ticketID, - }, findoc, options.Update().SetUpsert(false)) - } - - return -} - -func (gm *groupMongo) CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error { - if gm.InviteeIsMember { - pulled, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ - "_id": groupID, - }, bson.M{ - "$pull": bson.M{ - "members": ticketID, - }, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }, options.Update().SetUpsert(false)) - if err != nil { - return err - } - if !pulled { - return nil - } - } - - // member collection 삭제 - _, err := gm.mongoClient.Delete(gm.memberCollectionName, bson.M{ - "_id": ticketID, - }) - if err != nil { - return err - } - - // 초대를 삭제하면 안된다. - // expiring될 때까지 냅두고, 클라이언트가 expiring을 보고 알아서 지우게 한다. - _, _, err = gm.mongoClient.Update(gm.inviteCollectionName, bson.M{ - "_id": ticketID, - }, bson.M{ - "$set": bson.M{"expiring": -1}, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }, options.Update().SetUpsert(false)) - - return err -} - -var errInvitationExpired = errors.New("invitation is expired") - -func (gm *groupMongo) AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error) { - gdoc, err := gm.mongoClient.FindOneAndUpdate(gm.memberCollectionName, bson.M{ - "_id": ticketID, - }, bson.M{ - "$set": member, - "$unset": bson.M{ - "expiring": 1, - "_ts": 1, - }, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }, options.FindOneAndUpdate().SetProjection(bson.M{"_gid": 1})) - if err != nil { - return primitive.NilObjectID, err - } - - if gdoc == nil { - // 만료되었다. - return primitive.NilObjectID, errInvitationExpired - } - - // 여기서는 삭제해도 된다. - gm.mongoClient.Delete(gm.inviteCollectionName, bson.M{ - "_id": ticketID, - }) - - gidbytes := gdoc["_gid"].(primitive.ObjectID) - return gidbytes, nil -} - -var errNotInvited = errors.New("invitation is not mine") - -func (gm *groupMongo) DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error { - // 여기서는 삭제해도 된다. - invdoc, err := gm.mongoClient.FindOne(gm.inviteCollectionName, bson.M{ - "_id": ticketID, - }, options.FindOne().SetProjection("_gid")) - if err != nil { - return err - } - - gid := invdoc["_gid"].(primitive.ObjectID) - success, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ - "_id": gid, - }, bson.M{ - "$pull": bson.M{ - "members": ticketID, - }, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }) - if err != nil { - return err - } - if !success { - return errNotInvited - } - - gm.mongoClient.Update(gm.memberCollectionName, bson.M{ - "_id": ticketID, - }, bson.M{ - "_delete": true, - }, options.Update().SetUpsert(false)) - gm.mongoClient.Delete(gm.inviteCollectionName, bson.M{ - "_id": ticketID, - }) - return nil -} - -func (gm *groupMongo) QueryInvitations(memberID primitive.ObjectID, after primitive.Timestamp) ([]bson.M, error) { - filter := bson.M{"_mid": memberID} - if !after.IsZero() { - filter["luts"] = bson.M{"$gt": after} - } - - return gm.mongoClient.FindAll(gm.inviteCollectionName, filter, options.Find().SetLimit(20).SetHint("mid")) -} - -func (gm *groupMongo) Exist(groupID primitive.ObjectID, filter bson.M) (bool, error) { - if filter == nil { - filter = bson.M{"_id": groupID} - } else { - filter["_id"] = groupID - } - found, err := gm.mongoClient.FindOne(gm.collectionName, filter, options.FindOne().SetProjection(bson.M{"_id": 1})) - if err != nil { - return false, err - } - - if found == nil { - return false, nil - } - return true, nil -} - -func (gm *groupMongo) FindAll(filter bson.M, projection string, after primitive.Timestamp) ([]bson.M, error) { - opt := options.Find().SetBatchSize(10) - if len(projection) > 0 { - projM := bson.M{} - for _, proj := range strings.Split(projection, ",") { - projM[proj] = 1 - } - } - - if !after.IsZero() { - filter["luts"] = bson.M{"$gt": after} - } - - return gm.mongoClient.FindAll(gm.collectionName, filter, opt) -} - -func (gm *groupMongo) FindOne(groupID primitive.ObjectID, projection string) (bson.M, error) { - op := options.FindOne() - if len(projection) > 0 { - proj := bson.M{} - for _, p := range strings.Split(projection, ",") { - if p[0] == '-' { - proj[strings.TrimSpace(p[1:])] = 0 - } else if p[0] == '+' { - proj[strings.TrimSpace(p[1:])] = 1 - } else { - proj[strings.TrimSpace(p)] = 1 - } - } - op = op.SetProjection(proj) - } - - return gm.mongoClient.FindOne(gm.collectionName, bson.M{"_id": groupID}, op) -} - -func (gm *groupMongo) QueryMembers(groupID primitive.ObjectID, reqID primitive.ObjectID, projection string, after primitive.Timestamp) (map[string]bson.M, error) { - op := options.Find() - if len(projection) > 0 { - proj := bson.M{} - for _, p := range strings.Split(projection, ",") { - if p[0] == '-' { - proj[strings.TrimSpace(p[1:])] = 0 - } else if p[0] == '+' { - proj[strings.TrimSpace(p[1:])] = 1 - } else { - proj[strings.TrimSpace(p)] = 1 - } - } - op = op.SetProjection(proj) - } - - filter := bson.M{"_gid": groupID} - - if after.IsZero() { - gm.mongoClient.Delete(gm.memberCollectionName, bson.M{ - "_gid": groupID, - "expiring": bson.M{"$lt": time.Now().UTC().Unix()}, - }) - } else { - filter["luts"] = bson.M{"$gt": after} - } - - all, err := gm.mongoClient.FindAll(gm.memberCollectionName, filter, op) - if err != nil { - return nil, err - } - - output := make(map[string]bson.M) - for _, m := range all { - output[m["_mid"].(primitive.ObjectID).Hex()] = m - } - return output, nil -} - -func (gm *groupMongo) QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error) { - filter := bson.M{"_gid": groupID} - if !ticketID.IsZero() { - filter["_id"] = ticketID - } else { - filter["_mid"] = memberID - } - - op := options.FindOne().SetHint("gidmid") - var projdoc bson.M - if len(projection) > 0 { - projdoc = bson.M{ - "_delete": 1, - } - for _, proj := range strings.Split(projection, ",") { - projdoc[proj] = 1 - } - } else { - projdoc = bson.M{ - "_ts": 0, - "_gid": 0, - "_mid": 0, - } - } - - op.SetProjection(projdoc) - - return gm.mongoClient.FindOne(gm.memberCollectionName, filter, op) -} - -func (gm *groupMongo) Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error { - if ticketID.IsZero() { - poptarget, err := gm.mongoClient.FindOne(gm.memberCollectionName, bson.M{ - "_gid": groupID, - "_mid": memberID, - }, options.FindOne().SetProjection(bson.M{"_id": 1}).SetHint("gidmid")) - if err != nil { - return err - } - - // Find와 Delete를 나눠야 한다. - // pull 하는 것이 더 중요하기 때문. - // pull에 실패하더라도 _id가 남아있어야 다시 시도가 가능하다 - if poptarget == nil { - // 왠지 만료되었거나 문제가 잇다 - return nil - } - - if _, ok := poptarget["_id"]; !ok { - return nil - } - - ticketID = poptarget["_id"].(primitive.ObjectID) - } - - _, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ - "_id": groupID, - }, bson.M{ - "$pull": bson.M{ - "members": ticketID, - }, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }) - if err != nil { - return err - } - - // Delete는 실패해도 넘어간다. - gm.mongoClient.Update(gm.memberCollectionName, bson.M{ - "_id": ticketID, - }, bson.M{ - "$set": bson.M{ - "_delete": true, - }, - "$currentDate": bson.M{ - "_ts": bson.M{"$type": "date"}, - }, - }) - return nil -} - -func (gm *groupMongo) UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error { - _, _, err := gm.mongoClient.Update(gm.memberCollectionName, bson.M{ - "_gid": groupID, - "_mid": memberID, - }, bson.M{ - "$set": doc, - "$currentDate": bson.M{ - "luts": bson.M{"$type": "timestamp"}, - }, - }, options.Update().SetUpsert(false).SetHint("gidmid")) - - return err -} - -func (gm *groupMongo) Dismiss(groupID primitive.ObjectID) error { - _, err := gm.mongoClient.Delete(gm.collectionName, bson.M{ - "_id": groupID, - }) - if err != nil { - return err - } - - gm.mongoClient.Delete(gm.memberCollectionName, bson.M{ - "_gid": groupID, - }) - return nil -} - -var errUpdateGroupDocumentFailed = errors.New("update group document failed") - -func (gm *groupMongo) UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error { - groupDoc, err := gm.mongoClient.FindOne(gm.collectionName, bson.M{ - "_id": groupID, - }, nil) - if err != nil { - return err - } - - decoder, err := bson.NewDecoder(bsonrw.NewBSONDocumentReader(body)) - if err != nil { - return err - } - - if err := decoder.Decode(&groupDoc); err != nil { - return err - } - - success, _, err := gm.mongoClient.Update(gm.collectionName, bson.M{ - "_id": groupID, - }, bson.M{ - "$set": groupDoc, - }, options.Update().SetUpsert(false)) - - if err != nil { - return err - } - - if !success { - return errUpdateGroupDocumentFailed - } - - return nil -} - -func (gm *groupMongo) DropPausedMember(gid primitive.ObjectID, mid primitive.ObjectID) error { - return nil -} - -func (gm *groupMongo) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID, conn *wshandler.Richconn) error { - return nil -} - -type mongowatcher struct { - collection common.CollectionName - pipeline mongo.Pipeline - op options.FullDocument - onChanged func(string, *groupPipelineDocument) -} - -func (w *mongowatcher) callOnChanged(region string, c *groupPipelineDocument) { - defer func() { - r := recover() - if r != nil { - logger.Error(r) - } - }() - w.onChanged(region, c) -} - -func (w *mongowatcher) monitorfunc(parentctx context.Context, region string, mongoClient common.MongoClient) { - defer func() { - r := recover() - if r != nil { - logger.Error(r) - } - logger.Println("watcher.,monitorfunc finished") - }() - - var groupstream *mongo.ChangeStream - var err error - var ctx context.Context - - defer func() { - if groupstream != nil { - groupstream.Close(ctx) - } - }() - - for { - err = nil - - if groupstream == nil { - groupstream, err = mongoClient.Watch(w.collection, w.pipeline, options.ChangeStream().SetFullDocument(w.op)) - if err != nil { - logger.Error(err) - time.Sleep(time.Minute) - } - ctx = context.TODO() - } - - if groupstream != nil { - changed := groupstream.TryNext(ctx) - if ctx.Err() != nil { - logger.Error("tavern monitorfunc TryNext error") - logger.Error(ctx.Err()) - groupstream.Close(ctx) - groupstream = nil - continue - } - - if changed { - var data groupPipelineDocument - if err := groupstream.Decode(&data); err == nil { - w.callOnChanged(region, &data) - } - } else if groupstream.Err() != nil || groupstream.ID() == 0 { - logger.Error(groupstream.Err()) - groupstream.Close(ctx) - groupstream = nil - } - } - } -} - -func (w mongowatcher) start(ctx context.Context, region string, mongoClient common.MongoClient) { - go w.monitorfunc(ctx, region, mongoClient) -} - -func (cfg *groupConfig) preparePersistent(ctx context.Context, region string, dbconn common.MongoClient, wsh *wshandler.WebsocketHandler) (group, error) { - uniqueindices := map[string]bson.D{} - hints := map[string][]string{} - - for _, ui := range cfg.UniqueIndex { - indexname := strings.ReplaceAll(ui, ",", "") - keys := strings.Split(ui, ",") - keydef := bson.D{} - for _, k := range keys { - keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1}) - } - - uniqueindices[indexname] = keydef - hints[indexname] = keys - } - - collectionName := common.CollectionName(cfg.Name) - memberCollectionName := common.CollectionName(cfg.Name + "-members") - inviteCollectionName := common.CollectionName(cfg.Name + "-invites") - - err := dbconn.MakeUniqueIndices(collectionName, uniqueindices) - if err != nil { - return nil, err - } - - indices := map[string]bson.D{} - for _, ui := range cfg.SearchIndex { - indexname := strings.ReplaceAll(ui, ",", "") - keys := strings.Split(ui, ",") - keydef := bson.D{} - for _, k := range keys { - keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1}) - } - - indices[indexname] = keydef - hints[indexname] = keys - } - - if _, ok := indices["_idmembers"]; !ok { - indices["_idmembers"] = bson.D{ - {Key: "_id", Value: 1}, - {Key: "members", Value: 1}, - } - } - - if len(cfg.TextSearchFields) > 0 { - var tsi bson.D - for _, stf := range cfg.TextSearchFields { - tsi = append(tsi, bson.E{Key: stf, Value: "text"}) - } - indices[cfg.TextSearchFields[0]+"_text"] = tsi - } - - err = dbconn.MakeIndices(collectionName, indices) - if err != nil { - return nil, err - } - - dbconn.MakeUniqueIndices(inviteCollectionName, map[string]bson.D{ - "gidmid": {bson.E{Key: "_gid", Value: 1}, bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}}, - }) - - if cfg.InviteeExlusive { - err = dbconn.MakeUniqueIndices(inviteCollectionName, map[string]bson.D{ - "mid": {bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}}, - }) - } else { - err = dbconn.MakeIndices(inviteCollectionName, map[string]bson.D{ - "mid": {bson.E{Key: "_mid", Value: 1}, bson.E{Key: "_ts", Value: -1}}, - }) - } - if err != nil { - return nil, err - } - - if cfg.InviteExpire > 0 { - err = dbconn.MakeExpireIndex(inviteCollectionName, cfg.InviteExpire) - if err != nil { - return nil, err - } - } - - err = dbconn.MakeUniqueIndices(memberCollectionName, map[string]primitive.D{ - "gidmid": {{Key: "_gid", Value: 1}, {Key: "_mid", Value: 1}}, - }) - if err != nil { - return nil, err - } - - for _, mi := range cfg.MemberIndex { - indexname := strings.ReplaceAll(mi, ",", "") - keys := strings.Split(mi, ",") - keydef := bson.D{} - for _, k := range keys { - keydef = append(keydef, bson.E{Key: strings.TrimSpace(k), Value: 1}) - } - err = dbconn.MakeIndices(memberCollectionName, map[string]bson.D{ - indexname: keydef, - }) - if err != nil { - return nil, err - } - } - - if cfg.InviteExpire > 0 { - err = dbconn.MakeExpireIndex(memberCollectionName, cfg.InviteExpire) - if err != nil { - return nil, err - } - } - - groupwatcher := mongowatcher{ - collection: collectionName, - op: options.Default, - pipeline: mongo.Pipeline{ - bson.D{ - { - Key: "$match", Value: bson.D{ - {Key: "operationType", Value: bson.D{ - {Key: "$in", Value: bson.A{"update"}}, - }}, - }, - }, - }, - }, - onChanged: func(r string, data *groupPipelineDocument) { - updates := data.UpdateDescription.UpdatedFields - gid := data.DocumentKey.Id - updates["_id"] = gid - updates["_hint"] = cfg.Name - wsh.Broadcast(r, gid, updates) - }, - } - groupwatcher.start(ctx, region, dbconn) - - m1 := &mongowatcher{ - collection: memberCollectionName, - op: options.Default, - pipeline: mongo.Pipeline{ - bson.D{ - { - Key: "$match", Value: bson.D{ - {Key: "operationType", Value: bson.D{ - {Key: "$in", Value: bson.A{"insert"}}, - }}, - }, - }, - }, - }, - onChanged: func(r string, data *groupPipelineDocument) { - gid := data.FullDocument["_gid"].(primitive.ObjectID) - delete(data.FullDocument, "_gid") - delete(data.FullDocument, "_mid") - delete(data.FullDocument, "_ts") - data.FullDocument["_hint"] = cfg.Name - data.FullDocument["_fullDocument"] = true - if _, candidate := data.FullDocument["_candidate"]; candidate { - gid[0] |= 0x80 - delete(data.FullDocument, "_candidate") - } - wsh.Broadcast(r, gid, data.FullDocument) - }, - } - m1.start(ctx, region, dbconn) - - m2 := &mongowatcher{ - collection: memberCollectionName, - op: options.UpdateLookup, - pipeline: mongo.Pipeline{ - bson.D{ - { - Key: "$match", Value: bson.D{ - {Key: "operationType", Value: bson.D{ - {Key: "$in", Value: bson.A{"update"}}, - }}, - }, - }, - }, - bson.D{ - { - Key: "$project", Value: bson.M{ - "fullDocument._id": 1, - "fullDocument._gid": 1, - "fullDocument._mid": 1, - "fullDocument._candidate": 1, - "updateDescription": 1, - }, - }, - }, - }, - onChanged: func(r string, data *groupPipelineDocument) { - gid := data.FullDocument["_gid"].(primitive.ObjectID) - updates := data.UpdateDescription.UpdatedFields - updates["_id"] = data.FullDocument["_id"] - updates["_hint"] = cfg.Name - for _, r := range data.UpdateDescription.RemovedFileds { - updates[r.(string)] = nil - } - - if _, candidate := data.FullDocument["_candidate"]; candidate { - // _candidate는 candidate 채널로 broadcast - gid[0] |= 0x80 - delete(data.FullDocument, "_candidate") - } - - if _, ok := updates["_delete"]; ok { - mid := data.FullDocument["_mid"].(primitive.ObjectID) - if conn := wsh.Conn(r, mid); conn != nil { - conn.Close() - } - } - - if v, ok := updates["_candidate"]; ok && v == nil { - // candidate에서 벗어났네? 접속을 끊고 재접속 유도 - mid := data.FullDocument["_mid"].(primitive.ObjectID) - if conn := wsh.Conn(r, mid); conn != nil { - conn.Close() - } - } - wsh.Broadcast(r, gid, updates) - }, - } - m2.start(ctx, region, dbconn) - - i1 := &mongowatcher{ - collection: inviteCollectionName, - op: options.UpdateLookup, - pipeline: mongo.Pipeline{ - bson.D{ - { - Key: "$match", Value: bson.D{ - {Key: "operationType", Value: bson.D{ - {Key: "$in", Value: bson.A{"insert", "update"}}, - }}, - }, - }, - }, - }, - onChanged: func(r string, data *groupPipelineDocument) { - alias := data.FullDocument["_mid"].(primitive.ObjectID) - conn := wsh.Conn(r, alias) - if conn != nil { - delete(data.FullDocument, "_ts") - delete(data.FullDocument, "_mid") - data.FullDocument["_fullDocument"] = true - data.FullDocument["_hint"] = inviteCollectionName - bt, _ := json.Marshal(data.FullDocument) - conn.WriteBytes(bt) - } - }, - } - i1.start(ctx, region, dbconn) - - return &groupMongo{ - groupConfig: cfg, - mongoClient: dbconn, - hints: hints, - collectionName: collectionName, - memberCollectionName: memberCollectionName, - inviteCollectionName: inviteCollectionName, - }, nil -} diff --git a/core/richconn.go b/core/richconn.go index a359c56..8af6653 100644 --- a/core/richconn.go +++ b/core/richconn.go @@ -1,73 +1,97 @@ package core import ( - "context" - - "repositories.action2quare.com/ayo/gocommon/logger" - "repositories.action2quare.com/ayo/gocommon/wshandler" - - "github.com/go-redis/redis/v8" - "go.mongodb.org/mongo-driver/bson/primitive" + "fmt" + "strings" + "sync" ) -type richConnOuter struct { - wsh *wshandler.WebsocketHandler - rc *wshandler.Richconn +type connection struct { + locker sync.Mutex + alias string + tags []string + onClose map[string]func() } -func (sub richConnOuter) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error { - sub.wsh.JoinTag(region, tag, tid, sub.rc, hint) +func (rc *connection) addTag(name, val string) { + rc.locker.Lock() + defer rc.locker.Unlock() - wsh := sub.wsh - sub.rc.RegistOnCloseFunc(tag.Hex(), func() { - wsh.LeaveTag(region, tag, tid) - }) - return nil -} - -func (sub richConnOuter) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error { - sub.SetStateInTag(region, tag, tid, "", hint) - return sub.wsh.LeaveTag(region, tag, tid) -} - -func (sub richConnOuter) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error { - return sub.wsh.SetStateInTag(region, tag, tid, state, hint) -} - -func (sub richConnOuter) TurnGroupOnline(key string, gid primitive.ObjectID, score float64) error { - gidhex := gid.Hex() - _, err := sub.wsh.RedisSync.ZAdd(context.Background(), key, &redis.Z{Score: score, Member: gidhex}).Result() - if err != nil { - logger.Error("TurnGroupOnline failed. redis.ZAdd return err :", err) - return err + prefix := name + "=" + for i, tag := range rc.tags { + if strings.HasPrefix(tag, prefix) { + rc.tags[i] = prefix + val + return + } } - - sub.rc.RegistOnCloseFunc(key, func() { - sub.wsh.RedisSync.ZRem(context.Background(), key, gidhex) - }) - - return nil + rc.tags = append(rc.tags, prefix+val) } -func (sub richConnOuter) TurnGroupOffline(key string, gid primitive.ObjectID) error { - f := sub.rc.UnregistOnCloseFunc(key) - if f != nil { +func (rc *connection) getTag(name string) string { + rc.locker.Lock() + defer rc.locker.Unlock() + + prefix := name + "=" + for _, tag := range rc.tags { + if strings.HasPrefix(tag, prefix) { + return tag[len(prefix):] + } + } + return "" +} + +func (rc *connection) removeTag(name string, val string) { + rc.locker.Lock() + defer rc.locker.Unlock() + + whole := fmt.Sprintf("%s=%s", name, val) + for i, tag := range rc.tags { + if tag == whole { + if i == 0 && len(rc.tags) == 1 { + rc.tags = nil + } else { + lastidx := len(rc.tags) - 1 + if i < lastidx { + rc.tags[i] = rc.tags[lastidx] + } + rc.tags = rc.tags[:lastidx] + } + return + } + } +} + +func (rc *connection) registOnCloseFunc(name string, f func()) { + rc.locker.Lock() + defer rc.locker.Unlock() + + if rc.onClose == nil { f() - } else { - sub.wsh.RedisSync.ZRem(context.Background(), key, gid.Hex()) + return } - return nil + rc.onClose[name] = f } -func (sub richConnOuter) SendMessage(doc []byte) error { - return sub.rc.WriteBytes(doc) +func (rc *connection) hasOnCloseFunc(name string) bool { + rc.locker.Lock() + defer rc.locker.Unlock() + + if rc.onClose == nil { + return false + } + + _, ok := rc.onClose[name] + return ok } -func (sub richConnOuter) SendMessageToTag(region string, tag primitive.ObjectID, msg []byte) error { - sub.wsh.BroadcastRaw(region, tag, msg) - return nil -} +func (rc *connection) unregistOnCloseFunc(name string) (out func()) { + rc.locker.Lock() + defer rc.locker.Unlock() -func (sub richConnOuter) CloseOnPurpose() error { - return sub.rc.Close() + if rc.onClose == nil { + return + } + out = rc.onClose[name] + delete(rc.onClose, name) + return } diff --git a/core/tavern.go b/core/tavern.go index 03850e9..d250693 100644 --- a/core/tavern.go +++ b/core/tavern.go @@ -9,6 +9,7 @@ import ( "reflect" "strings" + "repositories.action2quare.com/ayo/gocommon" common "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/wshandler" @@ -71,41 +72,12 @@ func readBsonDoc(r io.Reader, src any) error { return nil } -type rpcCallDomain[T any] struct { - rpcCallChanName string - caller rpc.RpcCaller - callee rpc.RpcCallee[T] - methods map[string]reflect.Method -} - -func createRpcCallDomain[CalleeType any](syncConn *redis.Client, creator func(*wshandler.Richconn) *CalleeType) rpcCallDomain[CalleeType] { - var tmp *CalleeType - methods := make(map[string]reflect.Method) - tp := reflect.TypeOf(tmp) - for i := 0; i < tp.NumMethod(); i++ { - method := tp.Method(i) - methods[method.Name] = method - } - - rpcChanName := "conn_rpc_channel_" + tp.Name() - publishFunc := func(bt []byte) error { - _, err := syncConn.Publish(context.Background(), rpcChanName, bt).Result() - return err - } - - return rpcCallDomain[CalleeType]{ - rpcCallChanName: rpcChanName, - caller: rpc.NewRpcCaller(publishFunc), - callee: rpc.NewRpcCallee(creator), - methods: methods, - } -} - type TavernConfig struct { common.RegionStorageConfig `json:",inline"` GroupTypes map[string]*groupConfig `json:"tavern_group_types"` MaingateApiToken string `json:"maingate_api_token"` + RedisURL string `json:"tavern_redis_url"` macAddr string } @@ -122,7 +94,8 @@ type subTavern struct { region string groups map[string]group methods map[string]reflect.Method - wshRpc rpcCallDomain[richConnOuter] + + redisClient *redis.Client } func getMacAddr() (string, error) { @@ -190,12 +163,18 @@ type groupPipelineDocument struct { } func (tv *Tavern) prepare(ctx context.Context) error { + redisClient, err := gocommon.NewRedisClient(config.RedisURL, 0) + if err != nil { + logger.Error("config tavern_redis_url is not valid or missing") + return err + } + for region, url := range config.RegionStorage { var dbconn common.MongoClient var err error var groupinstance group - if err := rpc.IsCallerCalleeMethodMatch[richConnOuter](); err != nil { + if err := rpc.IsCallerCalleeMethodMatch[connection](); err != nil { return err } @@ -212,25 +191,23 @@ func (tv *Tavern) prepare(ctx context.Context) error { mongoClient: dbconn, region: region, methods: methods, + redisClient: redisClient, } - sub.wshRpc = createRpcCallDomain(tv.wsh.RedisSync, func(rc *wshandler.Richconn) *richConnOuter { - return &richConnOuter{wsh: sub.wsh, rc: rc} - }) - groups := make(map[string]group) for typename, cfg := range config.GroupTypes { cfg.Name = typename if cfg.Transient { groupinstance, err = cfg.prepareInMemory(ctx, region, typename, tv.wsh) } else { - if !dbconn.Connected() { - dbconn, err = common.NewMongoClient(ctx, url.Mongo, region) - if err != nil { - return err - } - } - groupinstance, err = cfg.preparePersistent(ctx, region, dbconn, tv.wsh) + // TODO : db + // if !dbconn.Connected() { + // dbconn, err = common.NewMongoClient(ctx, url.Mongo, region) + // if err != nil { + // return err + // } + // } + // groupinstance, err = cfg.preparePersistent(ctx, region, dbconn, tv.wsh) } if err != nil { return err @@ -246,9 +223,6 @@ func (tv *Tavern) prepare(ctx context.Context) error { } func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { - // request는 항상 서비스 서버를 거쳐서 들어온다. [client] <--tls--> [service server] <--http--> tavern - // 클라이언트는 tavern으로부터 메시지를 수신할 뿐, 송신하지 못한다. - // 단, 요청은 https 서비스 서버를 통해 들어오고 클라이언트는 ws으로 수신만 한다는 원칙이 유지되어야 한다.(채팅 메시지는 예외?) for _, sub := range tv.subTaverns { var pattern string if sub.region == "default" { diff --git a/go.mod b/go.mod index 8e7d3c6..1cafdd6 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/go-redis/redis/v8 v8.11.5 github.com/gorilla/websocket v1.5.0 go.mongodb.org/mongo-driver v1.11.7 - repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22 + repositories.action2quare.com/ayo/gocommon v0.0.0-20230705150145-7ef3e68d161e ) require ( diff --git a/go.sum b/go.sum index 907558a..190b6ba 100644 --- a/go.sum +++ b/go.sum @@ -104,3 +104,9 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22 h1:DImSGNxZrc+Q4WlS1OKMsLAScEfDYLX4XMJdjAaVnXc= repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230705022744-edd2f7aab52e h1:l9aqmNpEF8V1o0b3eCT/nhC+O1dXMUcPzBPewbshuDI= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230705022744-edd2f7aab52e/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230705132657-822681bf74cf h1:VQ78wRZaKHnWOM+Y2ZxB/EVNopzC4DNbwihledqjwy8= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230705132657-822681bf74cf/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230705150145-7ef3e68d161e h1:LNzK2Fhl1X8JQfn7XsoQwz2H/LY7YmMehEPqCyXgV1U= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230705150145-7ef3e68d161e/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns=