InMemory 그룹을 redis로 변경

This commit is contained in:
2023-07-19 09:37:02 +09:00
parent 01da5bb3a4
commit 8e1b232d57
8 changed files with 209 additions and 448 deletions

View File

@ -54,36 +54,45 @@ type memberDoc struct {
type InvitationFail bson.M
type groupDoc struct {
Body bson.M `json:"_body"`
Members map[string]*memberDoc `json:"_members"`
InCharge string `json:"_incharge"`
Gid string `json:"_gid"`
Members map[string]any `json:"_members"`
InCharge string `json:"_incharge"`
Gid string `json:"_gid"`
rh *RedisonHandler
rh *gocommon.RedisonHandler
id groupID
}
func (p groupDoc) MarshalJSON() ([]byte, error) {
if len(p.Gid) == 0 {
p.Gid = p.id.Hex()
func (gd *groupDoc) loadMemberFull(tid string) (bson.M, error) {
full, err := gd.rh.JSONGet(gd.strid(), "$._members."+tid)
if err != nil {
return nil, err
}
// Turn p into a map
type groupDoc_ groupDoc // prevent recursion
b, _ := json.Marshal(groupDoc_(p))
bt := []byte(full.(string))
bt = bt[1 : len(bt)-1]
var m map[string]json.RawMessage
_ = json.Unmarshal(b, &m)
// Add tags to the map, possibly overriding struct fields
for k, v := range p.Body {
// if overriding struct fields is not acceptable:
// if _, ok := m[k]; ok { continue }
b, _ = json.Marshal(v)
m[k] = b
var doc bson.M
if err = json.Unmarshal(bt, &doc); err != nil {
return nil, err
}
return json.Marshal(m)
return doc, nil
}
func (gd *groupDoc) loadFull() (doc bson.M) {
// 새 멤버에 그룹 전체를 알림
full, err := gd.rh.JSONGet(gd.strid(), "$")
if err == nil {
bt := []byte(full.(string))
bt = bt[1 : len(bt)-1]
err = json.Unmarshal(bt, &doc)
if err != nil {
logger.Println("loadFull err :", err)
}
} else {
logger.Println("loadFull err :", err)
}
return
}
func (gd *groupDoc) strid() string {
@ -160,18 +169,19 @@ func (gd *groupDoc) addInvite(inviteeDoc bson.M, ttl time.Duration, max int) (*m
return newdoc, err
}
func (gd *groupDoc) addMember(mid accountID, doc bson.M) (*memberDoc, error) {
memdoc := &memberDoc{
Body: doc,
Invite: false,
InviteExpire: 0,
}
func (gd *groupDoc) addMember(mid accountID, doc bson.M) (bson.M, error) {
tid := gd.tid(mid)
prefix := "$._members." + tid
if _, err := gd.rh.JSONSet(gd.strid(), "$._members."+gd.tid(mid), memdoc, SetOptionXX); err != nil {
if _, err := gd.rh.JSONMerge(gd.strid(), prefix+"._body", doc, gocommon.RedisonSetOptionXX); err != nil {
return nil, err
}
return memdoc, nil
if err := gd.rh.JSONMDel(gd.strid(), []string{prefix + "._invite", prefix + "._invite_exp"}); err != nil {
return nil, err
}
return gd.loadMemberFull(tid)
}
func (gd *groupDoc) removeMember(mid accountID) error {
@ -179,21 +189,41 @@ func (gd *groupDoc) removeMember(mid accountID) error {
return err
}
func (gd *groupDoc) getMembers() (map[string]any, error) {
res, err := gd.rh.JSONGet(gd.strid(), "$._members")
if err != nil {
return nil, err
}
var temp []map[string]any
err = json.Unmarshal([]byte(res.(string)), &temp)
if err != nil {
return nil, err
}
out := make(map[string]any)
for k, v := range temp[0] {
body := v.(map[string]any)["_body"]
out[gd.mid(k).Hex()] = body
}
return out, nil
}
type groupInMemory struct {
*groupConfig
sendUpstreamMessage func(*wshandler.UpstreamMessage)
sendEnterRoomMessage func(groupID, accountID)
sendLeaveRoomMessage func(groupID, accountID)
rh *RedisonHandler
rh *gocommon.RedisonHandler
}
func (gm *groupInMemory) createGroup(newid groupID, charge accountID, chargeDoc bson.M) (*groupDoc, error) {
tid := makeTid(newid, charge)
gd := &groupDoc{
Body: bson.M{},
Members: map[string]*memberDoc{
tid: {
Members: map[string]any{
tid: &memberDoc{
Body: chargeDoc,
Invite: false,
InviteExpire: 0,
@ -205,7 +235,7 @@ func (gm *groupInMemory) createGroup(newid groupID, charge accountID, chargeDoc
id: newid,
}
_, err := gm.rh.JSONSet(gd.strid(), "$", gd, SetOptionNX)
_, err := gm.rh.JSONSet(gd.strid(), "$", gd, gocommon.RedisonSetOptionNX)
if err != nil {
return nil, err
}
@ -243,18 +273,36 @@ func (gm *groupInMemory) Candidate(gid groupID, mid accountID, doc bson.M) error
var errGroupNotExist = errors.New("group does not exist")
func (gm *groupInMemory) Join(gid groupID, mid accountID, doc bson.M) error {
group, err := gm.find(gid)
gd, err := gm.find(gid)
if err != nil {
return err
}
if group == nil {
if gd == nil {
// 그룹이 없다. 실패
return errGroupNotExist
}
// 내 정보 업데이트할 때에도 사용됨
_, err = group.addMember(mid, doc)
if memdoc, err := gd.addMember(mid, doc); err == nil {
// 기존 유저에게 새 유저 알림
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "#" + gid.Hex(),
Body: map[string]any{
gd.tid(mid): memdoc,
},
Tag: []string{"MemberDocFull"},
})
gm.sendEnterRoomMessage(gid, mid)
// 새 멤버에 그룹 전체를 알림
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Body: gd.loadFull(),
Tag: []string{"GroupDocFull"},
})
}
return err
}
@ -269,7 +317,7 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
}
// targetid에 초대한 mid가 들어있다.
already, err := gm.rh.Get(gm.rh.ctx, targetid.Hex()).Result()
already, err := gm.rh.Get(context.Background(), targetid.Hex()).Result()
if err != nil && err != redis.Nil {
return "", err
}
@ -313,7 +361,7 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i
}
// 초대 중 표시
_, err = gm.rh.SetNX(gm.rh.ctx, targetid.Hex(), mid.Hex(), time.Duration(gm.InviteExpire)*time.Second).Result()
_, err = gm.rh.SetNX(context.Background(), targetid.Hex(), mid.Hex(), time.Duration(gm.InviteExpire)*time.Second).Result()
if err != nil {
return "", err
}
@ -340,9 +388,7 @@ func (gm *groupInMemory) CancelInvitation(gid groupID, tid ticketID) error {
var errInvitationExpired = errors.New("invitation is already expired")
func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, member bson.M) error {
logger.Println("accept invitation key :", mid.Hex())
cnt, err := gm.rh.Del(gm.rh.ctx, mid.Hex()).Result()
cnt, err := gm.rh.Del(context.Background(), mid.Hex()).Result()
if err != nil {
return err
}
@ -351,7 +397,7 @@ func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, member bso
return errInvitationExpired
}
gd := groupDoc{
gd := &groupDoc{
id: gid,
rh: gm.rh,
}
@ -366,25 +412,13 @@ func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, member bso
},
Tag: []string{"MemberDocFull"},
})
gm.sendEnterRoomMessage(gid, mid)
// 새 멤버에 그룹 전체를 알림
full, err := gm.rh.JSONGet(gd.strid(), "$")
if err != nil {
return err
}
var temp []*groupDoc
err = json.Unmarshal([]byte(full.(string)), &temp)
if err != nil {
return err
}
test, _ := json.Marshal(temp[0])
logger.Println(string(test))
gm.sendUpstreamMessage(&wshandler.UpstreamMessage{
Target: "@" + mid.Hex(),
Body: temp[0],
Body: gd.loadFull(),
Tag: []string{"GroupDocFull"},
})
}
@ -393,8 +427,16 @@ func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, member bso
return err
}
func (gm *groupInMemory) QueryGroupMembers(gid groupID) (bson.M, error) {
gd := groupDoc{
id: gid,
rh: gm.rh,
}
return gd.getMembers()
}
func (gm *groupInMemory) DenyInvitation(gid groupID, mid accountID, tid ticketID) error {
gm.rh.Del(gm.rh.ctx, mid.Hex()).Result()
gm.rh.Del(context.Background(), mid.Hex()).Result()
gd := groupDoc{
id: gid,
rh: gm.rh,
@ -435,10 +477,11 @@ func (gm *groupInMemory) Leave(gid groupID, mid accountID) error {
}
func (gm *groupInMemory) UpdateMemberDocument(gid groupID, mid accountID, doc bson.M) error {
gd := groupDoc{
gd := &groupDoc{
id: gid,
rh: gm.rh,
}
prefixPath := fmt.Sprintf("$._members.%s.", gd.tid(mid))
err := gm.rh.JSONMSetRel(gd.strid(), prefixPath, doc)
if err != nil {
@ -494,7 +537,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, typename string, su
// 각 함수에서는 publish
gm := &groupInMemory{
groupConfig: cfg,
rh: NewRedisonHandler(ctx, redisClient),
rh: gocommon.NewRedisonHandler(ctx, redisClient),
sendUpstreamMessage: func(msg *wshandler.UpstreamMessage) {
wsh.SendUpstreamMessage(region, msg)
},