diff --git a/core/apiimpl.go b/core/apiimpl.go index 8232570..260fab2 100644 --- a/core/apiimpl.go +++ b/core/apiimpl.go @@ -5,48 +5,15 @@ import ( "encoding/json" "io" "net/http" - "strings" common "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" - "repositories.action2quare.com/ayo/gocommon/wshandler" "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) -func splitDocument(doc bson.M) bson.M { - setdoc := bson.M{} - unsetdoc := bson.M{} - findoc := bson.M{} - - for k, v := range doc { - if k == "$set" { - setdoc = v.(bson.M) - } else if k == "$unset" { - unsetdoc = v.(bson.M) - } - } - - for k, v := range doc { - if v == nil { - unsetdoc[k] = 1 - } else if k[0] != '$' { - setdoc[k] = v - } - } - - if len(setdoc) > 0 { - findoc["$set"] = setdoc - } - if len(unsetdoc) > 0 { - findoc["$unset"] = unsetdoc - } - - return findoc -} - // CreateGroup : 그룹 생성 // - 그룹 : 멤버와 권한을 관리할 수 있다. 그룹 타입에 따라 디비에 저장되거나 메모리에만 존재한다. // - 생성 요청이 오면 파티를 만든다. 파티을 만들 수 있는지 여부는 서비스에서 결정할 것이고, 이 요청을 호출했다는 것은 서비스가 결정한 그룹 생성 조건을 다 통과했다는 의미이다. @@ -179,52 +146,6 @@ func (sub *subTavern) Invite(w http.ResponseWriter, r *http.Request) { w.Write([]byte(result)) } -func (sub *subTavern) UpdateGroupMember(w http.ResponseWriter, r *http.Request) { - typename, _ := common.ReadStringFormValue(r.Form, "type") - group := sub.groups[typename] - if group == nil { - logger.Println("UpdateGroupMember failed. group type is missing :", r.Form) - w.WriteHeader(http.StatusBadRequest) - return - } - - gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") - if !ok { - logger.Println("UpdateGroupMember failed. gid is missing :", r.Form) - w.WriteHeader(http.StatusBadRequest) - return - } - - midobj, midok := common.ReadObjectIDFormValue(r.Form, "mid") - tidobj, tidok := common.ReadObjectIDFormValue(r.Form, "tid") - if !midok && !tidok { - // 둘다 없네? - logger.Println("JoinGroup failed. tid or mid should be exist") - w.WriteHeader(http.StatusBadRequest) - return - } - - var err error - delete, _ := common.ReadBoolFormValue(r.Form, "delete") - if delete { - err = group.UpdateGroupMember(gidobj, midobj, tidobj, nil) - } else { - var doc bson.M - if err := readBsonDoc(r.Body, &doc); err != nil { - logger.Error("UpdateGroupMember failed. readBsonDoc returns err :", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - err = group.UpdateGroupMember(gidobj, midobj, tidobj, doc) - } - - if err != nil { - logger.Println("UpdateGroupMember failed. Update returns err :", err) - w.WriteHeader(http.StatusInternalServerError) - return - } -} - func (sub *subTavern) CancelInvitation(w http.ResponseWriter, r *http.Request) { typename, _ := common.ReadStringFormValue(r.Form, "type") group := sub.groups[typename] @@ -719,6 +640,32 @@ func (sub *subTavern) UpdateGroupDocument(w http.ResponseWriter, r *http.Request } } +func (sub *subTavern) PauseGroupMember(w http.ResponseWriter, r *http.Request) { + typename, _ := common.ReadStringFormValue(r.Form, "type") + group := sub.groups[typename] + if group == nil { + logger.Println("DismissGroup failed. type is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid") + if !ok { + logger.Println("UpdateMemberDocument failed. member_id is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid") + if !ok { + logger.Println("UpdateMemberDocument failed. _id is missing") + w.WriteHeader(http.StatusBadRequest) + return + } + + group.PauseMember(gidobj, midobj) +} + func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) { typename, _ := common.ReadStringFormValue(r.Form, "type") group := sub.groups[typename] @@ -749,55 +696,55 @@ func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) { } } -func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) { - defer func() { - r := recover() - if r != nil { - logger.Error(r) - } - }() +// func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) { +// defer func() { +// r := recover() +// if r != nil { +// logger.Error(r) +// } +// }() - redisSync := sub.wsh.RedisSync - for msg := range deliveryChan { - mid := msg.Alias - if msg.Body != nil { - buffer := msg.Body +// redisSync := sub.wsh.RedisSync +// for msg := range deliveryChan { +// mid := msg.Alias +// if msg.Body != nil { +// buffer := msg.Body - var channame string - for i, ch := range buffer { - if ch == 0 { - channame = string(buffer[:i]) - buffer = buffer[i+1:] - break - } - } +// var channame string +// for i, ch := range buffer { +// if ch == 0 { +// channame = string(buffer[:i]) +// buffer = buffer[i+1:] +// break +// } +// } - if len(channame) == 0 { - continue - } +// if len(channame) == 0 { +// continue +// } - buffer = append(mid[:], buffer...) - _, err := redisSync.Publish(context.Background(), channame, buffer).Result() - if err != nil { - logger.Error(err) - } - } +// buffer = append(mid[:], buffer...) +// _, err := redisSync.Publish(context.Background(), channame, buffer).Result() +// if err != nil { +// logger.Error(err) +// } +// } - if len(msg.Command) > 0 { - switch msg.Command { - case "pause": - gidtype := msg.Conn.GetTag("gid") - if len(gidtype) > 0 { - tokens := strings.SplitN(gidtype, "@", 2) - gidobj, _ := primitive.ObjectIDFromHex(tokens[0]) - gtype := tokens[1] - group := sub.groups[gtype] - if group != nil { - group.PauseMember(gidobj, msg.Alias, msg.Conn) - } - } - } - } - } - logger.Println("delivery chan fin") -} +// if len(msg.Command) > 0 { +// switch msg.Command { +// case "pause": +// gidtype := msg.Conn.GetTag("gid") +// if len(gidtype) > 0 { +// tokens := strings.SplitN(gidtype, "@", 2) +// gidobj, _ := primitive.ObjectIDFromHex(tokens[0]) +// gtype := tokens[1] +// group := sub.groups[gtype] +// if group != nil { +// group.PauseMember(gidobj, msg.Alias, msg.Conn) +// } +// } +// } +// } +// } +// logger.Println("delivery chan fin") +// } diff --git a/core/group.go b/core/group.go index 0c5a738..dc1cf2e 100644 --- a/core/group.go +++ b/core/group.go @@ -3,8 +3,6 @@ package core import ( "net/url" - "repositories.action2quare.com/ayo/gocommon/wshandler" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -30,7 +28,6 @@ type group interface { Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (newTicketID primitive.ObjectID, err error) FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error) - UpdateGroupMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) error CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error) DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error @@ -42,7 +39,7 @@ type group interface { QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error) Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error DropPausedMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error - PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID, conn *wshandler.Richconn) error + PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error Dismiss(groupID primitive.ObjectID) error UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error diff --git a/core/group_memory.go b/core/group_memory.go index a1e976b..d289c64 100644 --- a/core/group_memory.go +++ b/core/group_memory.go @@ -10,20 +10,17 @@ import ( "fmt" "net/url" "os" - "path" "reflect" - "runtime" "strings" "sync" "time" "repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/logger" + "repositories.action2quare.com/ayo/gocommon/rpc" "repositories.action2quare.com/ayo/gocommon/wshandler" - "repositories.action2quare.com/ayo/tavern/core/rpc" "github.com/go-redis/redis/v8" - "github.com/gorilla/websocket" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -32,8 +29,6 @@ type accountID = primitive.ObjectID type ticketID = primitive.ObjectID type groupID = primitive.ObjectID -var everyHost, _ = primitive.ObjectIDFromHex("010203040506070809101112") - type Invitation struct { GroupID groupID `json:"gid"` TicketID ticketID `json:"tid"` @@ -56,7 +51,6 @@ type PublicMemberDoc struct { type FullGroupDoc struct { Gid groupID - DM string AllMembers []*PublicMemberDoc `json:",omitempty"` Body GroupDocBody `json:",omitempty"` } @@ -69,7 +63,7 @@ type memberDoc struct { // underscore keys in Hidden Hidden bson.M - rconn *wshandler.Richconn + rconn *connection Mid accountID } @@ -99,28 +93,28 @@ func (gd *groupDoc) updateBodyWithBson(src []byte) ([]byte, error) { return bson.Marshal(gd.Body) } -func (gd *groupDoc) updateBodyWithJson(src []byte) ([]byte, error) { +func (gd *groupDoc) updateBodyWithJson(src []byte) []byte { gd.Lock() defer gd.Unlock() err := json.Unmarshal(src, &gd.Body) if err != nil { - return nil, err + return nil } - return json.Marshal(makeTypeMessage(gd.Body)) + return makeTypeMessage(gd.Body) } -func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) (jsonBt []byte, err error) { +func (gd *groupDoc) updateBodyBsonToJson(bsonSrc []byte) []byte { gd.Lock() defer gd.Unlock() - err = bson.Unmarshal(bsonSrc, &gd.Body) + err := bson.Unmarshal(bsonSrc, &gd.Body) if err != nil { - return nil, err + return nil } - return json.Marshal(makeTypeMessage(gd.Body)) + return makeTypeMessage(gd.Body) } func (gd *groupDoc) updateBody(bsonSrc []byte) error { @@ -130,7 +124,7 @@ func (gd *groupDoc) updateBody(bsonSrc []byte) error { return bson.Unmarshal(bsonSrc, &gd.Body) } -func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *wshandler.Richconn, ttl time.Duration, max int) (ticketID, *memberDoc) { +func (gd *groupDoc) addInvite(inviteeDoc bson.M, rconn *connection, ttl time.Duration, max int) (ticketID, *memberDoc) { gd.Lock() defer gd.Unlock() @@ -194,7 +188,7 @@ func seperateHidden(in bson.M) (public bson.M, hidden bson.M) { return in, hidden } -func (gd *groupDoc) addInCharge(mid accountID, rconn *wshandler.Richconn, doc bson.M) (ticketID, *memberDoc) { +func (gd *groupDoc) addInCharge(mid accountID, rconn *connection, doc bson.M) (ticketID, *memberDoc) { gd.Lock() defer gd.Unlock() @@ -291,21 +285,6 @@ func (gd *groupDoc) removeMember(mid accountID, tid *ticketID) { } } -func (gd *groupDoc) conns(includeInvitee bool) (out []*wshandler.Richconn) { - gd.Lock() - defer gd.Unlock() - - for _, mem := range gd.tickets { - if mem.rconn != nil { - if !includeInvitee && mem.Invite { - continue - } - out = append(out, mem.rconn) - } - } - return -} - func (gd *groupDoc) ticket(mid accountID) ticketID { gd.Lock() defer gd.Unlock() @@ -399,7 +378,7 @@ func (gd *groupDoc) iterateMembers(cb func(ticketID, *memberDoc)) { } } -func (gd *groupDoc) serializeFull(gid groupID, directMessageChanName string) []byte { +func (gd *groupDoc) serializeFull(gid groupID) []byte { gd.Lock() defer gd.Unlock() @@ -416,14 +395,11 @@ func (gd *groupDoc) serializeFull(gid groupID, directMessageChanName string) []b }) } - bt, _ := json.Marshal(makeTypeMessage(FullGroupDoc{ + return makeTypeMessage(FullGroupDoc{ Gid: gid, - DM: directMessageChanName, AllMembers: output, Body: gd.Body, - })) - - return bt + }) } type groupContainer struct { @@ -433,11 +409,13 @@ type groupContainer struct { type groupInMemory struct { *groupConfig - groupDocSync func(groupID, []byte) error - memberSync func(groupID, accountID, ticketID, *memberDoc, bool) error - rpcCall func([]byte) error - hasConn func(accountID) *wshandler.Richconn - groups groupContainer + groupDocSync func(groupID, []byte) error + memberSync func(groupID, accountID, ticketID, *memberDoc, bool) error + rpcCall func([]byte) error + hasConn func(accountID) *connection + sendUpstreamMessage func(*wshandler.UpstreamMessage) + sendCloseMessage func(accountID, string) + groups groupContainer } func (gc *groupContainer) add(id groupID, doc *groupDoc) { @@ -496,46 +474,6 @@ func (gm *groupInMemory) Candidate(gid groupID, mid accountID, doc bson.M) error } var errGroupNotExist = errors.New("group does not exist") -var errFuncNameIsMissing = errors.New("how func name is missin") - -func (gm *groupInMemory) callProxyRpc(target accountID, name string, args ...any) error { - bt, err := rpc.Encode(target, name, args...) - if err != nil { - return err - } - - return gm.rpcCall(bt) -} - -type rpcTarget struct { - gm *groupInMemory - target accountID -} - -func (rt rpcTarget) call(args ...any) error { - pc := make([]uintptr, 1) - n := runtime.Callers(2, pc[:]) - if n < 1 { - return nil - } - - frame, _ := runtime.CallersFrames(pc).Next() - funcname := path.Ext(frame.Func.Name()) - if len(funcname) > 0 { - funcname = funcname[1:] - return rt.gm.callProxyRpc(rt.target, funcname, args...) - } - - return errFuncNameIsMissing -} - -func (gm *groupInMemory) rpc(target accountID) rpcTarget { - return rpcTarget{ - gm: gm, - target: target, - } -} - var errNoEmptySlot = errors.New("no more seat in group") func (gm *groupInMemory) Join(gid groupID, mid accountID, tid ticketID, doc bson.M) (ticketID, error) { @@ -558,58 +496,47 @@ func (gm *groupInMemory) FindTicketID(gid groupID, mid groupID) ticketID { return primitive.NilObjectID } -func makeTypeMessage[T any](msg T) bson.M { +func makeTypeMessage[T any](msg T) []byte { var ptr *T name := reflect.TypeOf(ptr).Elem().Name() - return bson.M{name: msg} + bt, _ := json.Marshal(bson.M{name: msg}) + return bt } -func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) { - bt, _ := json.Marshal(makeTypeMessage(msg)) - rconn.WriteBytes(bt) -} +// func sendTypedMessageDirect[T any](rconn *wshandler.Richconn, msg T) { +// bt, _ := json.Marshal(makeTypeMessage(msg)) +// rconn.WriteBytes(bt) +// } -func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) { - bt, _ := json.Marshal(makeTypeMessage(msg)) - gm.SendMessage(target, bt) -} +// func sendTypedMessage[T any](gm *groupInMemory, target accountID, msg T) { +// bt, _ := json.Marshal(makeTypeMessage(msg)) +// gm.SendMessage(target, bt) +// } -func (gm *groupInMemory) SendMessage(target accountID, msg []byte) { - rconn := gm.hasConn(target) - if rconn != nil { - rconn.WriteBytes(msg) - } else { - gm.rpc(target).call(target, msg) - } -} +// func (gm *groupInMemory) SendMessage(target accountID, msg []byte) { +// rconn := gm.hasConn(target) +// if rconn != nil { +// rconn.WriteBytes(msg) +// } else { +// gm.rpc(target).call(target, msg) +// } +// } -func multicast(conns []*wshandler.Richconn, raw []byte) { - for _, rconn := range conns { - rconn.WriteBytes(raw) - } -} +// func multicast(conns []*wshandler.Richconn, raw []byte) { +// for _, rconn := range conns { +// rconn.WriteBytes(raw) +// } +// } -func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) { - if gd := gm.groups.find(gid); gd != nil { - bt, _ := json.Marshal(makeTypeMessage(msg)) - go multicast(gd.conns(false), bt) - } -} +// func broadcastTypedMessage[T any](gm *groupInMemory, gid groupID, msg T) { +// if gd := gm.groups.find(gid); gd != nil { +// bt, _ := json.Marshal(makeTypeMessage(msg)) +// go multicast(gd.conns(false), bt) +// } +// } var errInviteeDocMidMissing = errors.New("inviteeDoc must have '_mid' field") -func (gm *groupInMemory) SendInvitationFailed(mid accountID, inviteeDoc bson.M) error { - delete(inviteeDoc, "_mid") - rconn := gm.hasConn(mid) - if rconn == nil { - return gm.rpc(mid).call(mid, inviteeDoc) - } - - sendTypedMessage(gm, mid, InvitationFail(inviteeDoc)) - - return nil -} - func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc bson.M, inviterDoc bson.M) error { targetid := inviteeDoc["_mid"].(accountID) @@ -617,7 +544,7 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc // invitee의 rconn이 종료될 때 그룹에 반영해야 하므로 rconn을 찾자 rconn := gm.hasConn(targetid) if rconn == nil { - return gm.rpc(targetid).call(gid, inviteeDoc, inviterDoc) + return rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc) } gd := gm.groups.find(gid) @@ -625,10 +552,14 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc return errGroupNotExist } - if rconn.HasOnCloseFunc("member_remove_invite") { + if rconn.hasOnCloseFunc("member_remove_invite") { // 이미 초대 중이다. // inviter한테 알려줘야 한다. - return gm.SendInvitationFailed(mid, inviteeDoc) + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "@" + mid.Hex(), + Body: makeTypeMessage(InvitationFail(inviteeDoc)), + }) + return nil } tid, newdoc := gd.addInvite(inviteeDoc, rconn, time.Duration(gm.InviteExpire)*time.Second, gm.MaxMember) @@ -636,17 +567,20 @@ func (gm *groupInMemory) InviteImplement(gid groupID, mid accountID, inviteeDoc return errNoEmptySlot } - rconn.RegistOnCloseFunc("member_remove_invite", func() { + rconn.registOnCloseFunc("member_remove_invite", func() { gd.removeMember(targetid, &tid) gm.memberSync(gid, targetid, tid, nil, false) }) gm.memberSync(gid, targetid, tid, newdoc, false) - sendTypedMessage(gm, targetid, Invitation{ - GroupID: gid, - TicketID: tid, - Inviter: inviterDoc, - ExpireAtUTC: newdoc.InviteExpire.Unix(), + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "@" + targetid.Hex(), + Body: makeTypeMessage(Invitation{ + GroupID: gid, + TicketID: tid, + Inviter: inviterDoc, + ExpireAtUTC: newdoc.InviteExpire.Unix(), + }), }) return nil @@ -681,7 +615,7 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i if rconn == nil { // mid가 있는 곳에서 처리를 해야 접속 끊겼을 때 콜백을 먼저 등록할 수 있다. // 콜백이 rconn에 먼저 등록되지 않으면 좀비 group이 생길 가능성이 생긴다. - return gid.Hex(), gm.rpc(mid).call(gid, mid, inviteeDoc, inviteeDoc) + return "", rpc.Make(gm).To(targetid).Call(gid, mid, inviteeDoc, inviterDoc) } // 이제 여기는 mid가 InCharge이면서 rconn이 존재 @@ -689,7 +623,7 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i if gd == nil { _, gd = gm.groups.createWithID(gid, bson.M{}) tid, newdoc := gd.addInCharge(mid, rconn, inviterDoc) - rconn.RegistOnCloseFunc("member_remove", func() { + rconn.registOnCloseFunc("member_remove", func() { // 내가 InCharge이므로 접속이 종료될 때 그룹을 해체한다. gm.groupDocSync(gid, nil) }) @@ -710,9 +644,6 @@ func (gm *groupInMemory) Invite(gid groupID, mid accountID, inviterDoc bson.M, i return gid.Hex(), gm.InviteImplement(gid, mid, inviteeDoc, inviterDoc) } -func (gm *groupInMemory) UpdateGroupMember(gid groupID, mid accountID, tid ticketID, doc bson.M) error { - return nil -} func (gm *groupInMemory) CancelInvitation(gid groupID, tid ticketID) error { return nil } @@ -725,17 +656,17 @@ func (gm *groupInMemory) AcceptInvitation(gid groupID, mid accountID, tid ticket rconn := gm.hasConn(mid) if rconn == nil { - return gid, gm.rpc(mid).call(gid, mid, tid, member) + return gid, rpc.Make(gm).To(mid).Call(gid, mid, tid, member) } - oldFunc := rconn.UnregistOnCloseFunc("member_remove") + oldFunc := rconn.unregistOnCloseFunc("member_remove") if oldFunc != nil { // 기존 멤버였으면 탈퇴 처리 oldFunc() } - inviteFunc := rconn.UnregistOnCloseFunc("member_remove_invite") - rconn.RegistOnCloseFunc("member_remove", inviteFunc) + inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite") + rconn.registOnCloseFunc("member_remove", inviteFunc) result, isNew := gd.addMember(mid, &tid, member) if result != nil { @@ -754,10 +685,10 @@ func (gm *groupInMemory) DenyInvitation(gid groupID, mid accountID, tid ticketID rconn := gm.hasConn(mid) if rconn == nil { - return gm.rpc(mid).call(gid, mid, tid) + return rpc.Make(gm).To(mid).Call(gid, mid, tid) } - inviteFunc := rconn.UnregistOnCloseFunc("member_remove_invite") + inviteFunc := rconn.unregistOnCloseFunc("member_remove_invite") if inviteFunc != nil { inviteFunc() // removeMember는 여기에 들어있다. return nil @@ -806,10 +737,16 @@ func (gm *groupInMemory) DropPausedMember(gid primitive.ObjectID, mid primitive. return nil } -func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID, rconn *wshandler.Richconn) error { - rconn.UnregistOnCloseFunc("member_remove") - rconn.UnregistOnCloseFunc("member_remove_invite") - rconn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "pause"), time.Time{}) +func (gm *groupInMemory) PauseMember(gid primitive.ObjectID, mid primitive.ObjectID) error { + rconn := gm.hasConn(mid) + if rconn == nil { + return rpc.Make(gm).To(mid).Call(gid, mid) + } + + // 접속은 끊기지만 그룹에서 제거하지는 않는 상태 + rconn.unregistOnCloseFunc("member_remove") + rconn.unregistOnCloseFunc("member_remove_invite") + gm.sendCloseMessage(mid, "pause") gd := gm.groups.find(gid) if gd == nil { @@ -903,24 +840,40 @@ func (gm *groupInMemory) Leave(gid groupID, mid accountID, tid ticketID) error { // targetmid의 "member_remove" 함수를 등록 해제해야 하므로 rconn이 있는 곳에서 하자 rconn := gm.hasConn(targetmid) if rconn == nil { - return gm.rpc(targetmid).call(gid, mid, tid) + return rpc.Make(gm).To(targetmid).Call(gid, mid, tid) } - if oldfunc := rconn.UnregistOnCloseFunc("member_remove"); oldfunc != nil { + if oldfunc := rconn.unregistOnCloseFunc("member_remove"); oldfunc != nil { oldfunc() // 이 안에 다 있다. } // 나한테는 빈 FullGroupDoc을 보낸다. - sendTypedMessageDirect(rconn, FullGroupDoc{ - Gid: gid, + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "@" + mid.Hex(), + Body: makeTypeMessage(FullGroupDoc{Gid: gid}), }) return nil } func (gm *groupInMemory) UpdateMemberDocument(gid groupID, mid accountID, doc bson.M) error { + gd := gm.groups.find(gid) + if gd == nil { + return errGroupNotExist + } + + tid := gd.ticket(mid) + bt, _ := json.Marshal(doc) + + personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt))) + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + gid.Hex(), + Body: gd.updateBodyWithJson(personalized), + }) + return nil } + func (gm *groupInMemory) Dismiss(gid groupID) error { return nil } @@ -938,15 +891,21 @@ func (gm *groupInMemory) UpdateGroupDocument(gid groupID, body []byte) error { return gm.groupDocSync(gid, newbody) } +func (gm *groupInMemory) TargetExists(target primitive.ObjectID) bool { + return gm.hasConn(target) != nil +} + var devflag = flagx.Bool("dev", false, "") -func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, typename string, wsh *wshandler.WebsocketHandler) (group, error) { +func (cfg *groupConfig) prepareInMemory(ctx context.Context, typename string, sub *subTavern) (group, error) { // group document // member document + region := sub.region + wsh := sub.wsh + groupDocSyncChanName := fmt.Sprintf("d_mgc_%s_%s", region, typename) memberSyncChanName := fmt.Sprintf("m_mgc_%s_%s", region, typename) rpcChanName := fmt.Sprintf("r_mgc_%s_%s", region, typename) - clientMessageChanName := fmt.Sprintf("c_mgc_%s_%s", region, typename) toHashHex := func(name string) string { hash := md5.New() @@ -962,7 +921,6 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type groupDocSyncChanName = toHashHex(groupDocSyncChanName) memberSyncChanName = toHashHex(memberSyncChanName) rpcChanName = toHashHex(rpcChanName) - clientMessageChanName = toHashHex(clientMessageChanName) // 여기서는 subscribe channel // 각 함수에서는 publish @@ -996,15 +954,22 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type _, err := wsh.RedisSync.Publish(ctx, rpcChanName, bt).Result() return err }, - hasConn: func(t accountID) *wshandler.Richconn { - return wsh.Conn(region, t) + hasConn: func(t accountID) *connection { + return sub.cm.get(t) }, groups: groupContainer{ groupDocs: make(map[groupID]*groupDoc), }, + sendUpstreamMessage: func(msg *wshandler.UpstreamMessage) { + wsh.SendUpstreamMessage(region, msg) + }, + sendCloseMessage: func(target accountID, text string) { + wsh.SendCloseMessage(region, target.Hex(), text) + }, } - // TODO : processChannelMessage 스레드 분리해보자 + rpc.RegistReceiver(gm) + processChannelMessage := func(gm *groupInMemory, pubsub *redis.PubSub) *redis.PubSub { defer func() { r := recover() @@ -1020,34 +985,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type } switch msg.Channel { - case clientMessageChanName: - bt := []byte(msg.Payload) - if len(bt) < 24 { - break - } - - // 주!! mid가 먼저 - var mid groupID - copy(mid[:], bt[:12]) - bt = bt[12:] - - var gid groupID - copy(gid[:], bt[:12]) - bt = bt[12:] - - gd := gm.groups.find(gid) - if gd == nil { - break - } - tid, _ := gd.memberByAccount(mid) - if !tid.IsZero() { - personalized := []byte(fmt.Sprintf(`{"%s":%s}`, tid.Hex(), string(bt))) - if after, err := gd.updateBodyWithJson(personalized); err == nil { - go multicast(gd.conns(false), after) - } - } - - case groupDocSyncChanName: + case groupDocSyncChanName: // 호스트들간 그룹 정보 동기화 채널 payload := []byte(msg.Payload) if len(payload) < len(config.macAddr) { break @@ -1065,17 +1003,17 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type if len(remain) == 0 { // gid 그룹 삭제 // 그룹 안에 있는 멤버에게 알림 - bt, _ := json.Marshal(makeTypeMessage(FullGroupDoc{ - Gid: gid, - })) - go multicast(gd.conns(true), bt) + bt := makeTypeMessage(FullGroupDoc{Gid: gid}) + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + gid.Hex(), + Body: bt, + }) gm.groups.delete(gid) } else if string(senderHost) != config.macAddr { - if r, err := gd.updateBodyBsonToJson(remain); err != nil { - logger.Error("groupDocSyncChanName message decode failed :", remain, err) - } else { - go multicast(gd.conns(true), r) - } + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + gid.Hex(), + Body: gd.updateBodyBsonToJson(remain), + }) } } else if string(senderHost) != config.macAddr { var newDoc groupDoc @@ -1086,7 +1024,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type } } - case memberSyncChanName: + case memberSyncChanName: // 호스트들간 멤버 정보 동기화 채널 if len(msg.Payload) < len(config.macAddr) { break } @@ -1116,7 +1054,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type } var updated *memberDoc - rconn := wsh.Conn(region, mid) + rconn := gm.hasConn(mid) if senderHost != config.macAddr { // 내가 보낸 메시지가 아니면 멤버 도큐먼트 업데이트 하고 브로드캐스팅 @@ -1136,56 +1074,37 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type if rconn != nil { // gid에 이미 다른 값이 있을 수 있다. // 정확하게 이 값이면 제거하고, 아니면 넘어간다. - rconn.RemoveTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) + rconn.removeTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) } - broadcastTypedMessage(gm, gid, PublicMemberDoc{Tid: tid}) + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + gid.Hex(), + Body: makeTypeMessage(PublicMemberDoc{Tid: tid}), + }) } else { if isNewMember && updated.rconn == nil && rconn != nil { updated.rconn = rconn } // 업데이트 된 플레이어(새로 들어온 플레이어 포함)를 모두에게 알려준다. 본인 포함, invitee 제외 - broadcastTypedMessage(gm, gid, PublicMemberDoc{ - Tid: tid, - memberDocCommon: updated.memberDocCommon, + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "#" + gid.Hex(), + Body: makeTypeMessage(PublicMemberDoc{ + Tid: tid, + memberDocCommon: updated.memberDocCommon, + }), }) } if isNewMember { if rconn != nil { // 새 멤버이므로 기존 멤버를 다 보내준다. - rconn.AddTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) - rconn.WriteBytes(gd.serializeFull(gid, clientMessageChanName)) + rconn.addTag("gid", fmt.Sprintf("%s@%s", gid.Hex(), gm.Name)) + gm.sendUpstreamMessage(&wshandler.UpstreamMessage{ + Target: "@" + mid.Hex(), + Body: gd.serializeFull(gid), + }) } } - case rpcChanName: - targetbt, fn, params, err := rpc.Decode[accountID]([]byte(msg.Payload)) - if err != nil { - logger.Error("rpcChanName message decode failed :", msg.Payload, err) - break - } - - call := func() { - method, ok := reflect.TypeOf(gm).MethodByName(fn) - if !ok { - logger.Println("message decode failed :", err, targetbt, msg.Payload) - } - - args := []reflect.Value{ - reflect.ValueOf(gm), - } - for _, arg := range params { - args = append(args, reflect.ValueOf(arg)) - } - - method.Func.Call(args) - } - - if *targetbt == everyHost { - call() - } else if rconn := wsh.Conn(region, *targetbt); rconn != nil { - call() - } default: logger.Println("unknown channel") } @@ -1204,7 +1123,7 @@ func (cfg *groupConfig) prepareInMemory(ctx context.Context, region string, type var pubsub *redis.PubSub for { if pubsub == nil { - pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName, clientMessageChanName) + pubsub = wsh.RedisSync.Subscribe(ctx, groupDocSyncChanName, memberSyncChanName, rpcChanName) } if pubsub == nil { diff --git a/core/richconn.go b/core/richconn.go index 8af6653..218b5e5 100644 --- a/core/richconn.go +++ b/core/richconn.go @@ -27,19 +27,6 @@ func (rc *connection) addTag(name, val string) { rc.tags = append(rc.tags, prefix+val) } -func (rc *connection) getTag(name string) string { - rc.locker.Lock() - defer rc.locker.Unlock() - - prefix := name + "=" - for _, tag := range rc.tags { - if strings.HasPrefix(tag, prefix) { - return tag[len(prefix):] - } - } - return "" -} - func (rc *connection) removeTag(name string, val string) { rc.locker.Lock() defer rc.locker.Unlock() @@ -95,3 +82,16 @@ func (rc *connection) unregistOnCloseFunc(name string) (out func()) { delete(rc.onClose, name) return } + +func (rc *connection) cleanup() { + rc.locker.Lock() + defer rc.locker.Unlock() + + cp := rc.onClose + rc.onClose = nil + go func() { + for _, f := range cp { + f() + } + }() +} diff --git a/core/rpc/connrpc.go b/core/rpc/connrpc.go deleted file mode 100644 index 75b02f4..0000000 --- a/core/rpc/connrpc.go +++ /dev/null @@ -1,195 +0,0 @@ -package rpc - -import ( - "bytes" - "encoding/gob" - "fmt" - "reflect" - - "repositories.action2quare.com/ayo/gocommon/logger" - "repositories.action2quare.com/ayo/gocommon/wshandler" - - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -var Everybody = primitive.ObjectID([12]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}) - -func init() { - gob.Register(bson.M{}) - gob.Register(primitive.ObjectID{}) - gob.Register(primitive.Timestamp{}) -} - -type RpcCaller struct { - publish func(bt []byte) error -} - -func NewRpcCaller(f func(bt []byte) error) RpcCaller { - return RpcCaller{ - publish: f, - } -} - -type rpcCallContext struct { - alias primitive.ObjectID - publish func(bt []byte) error -} - -func (c *RpcCaller) One(alias primitive.ObjectID) rpcCallContext { - return rpcCallContext{ - alias: alias, - publish: c.publish, - } -} - -func (c *RpcCaller) Everybody() rpcCallContext { - return rpcCallContext{ - alias: Everybody, - publish: c.publish, - } -} - -func IsCallerCalleeMethodMatch[Callee any]() error { - var caller rpcCallContext - var callee Callee - - callerType := reflect.TypeOf(caller) - calleeType := reflect.TypeOf(callee) - for i := 0; i < callerType.NumMethod(); i++ { - callerMethod := callerType.Method(i) - calleeMethod, ok := calleeType.MethodByName(callerMethod.Name) - if !ok { - return fmt.Errorf("method '%s' of '%s' is missing", callerMethod.Name, calleeType.Name()) - } - - if calleeMethod.Func.Type().NumIn() != callerMethod.Func.Type().NumIn() { - return fmt.Errorf("method '%s' argument num is not match", callerMethod.Name) - } - - if calleeMethod.Func.Type().NumOut() != callerMethod.Func.Type().NumOut() { - return fmt.Errorf("method '%s' out num is not match", callerMethod.Name) - } - - for i := 1; i < calleeMethod.Func.Type().NumIn(); i++ { - if calleeMethod.Func.Type().In(i) != callerMethod.Func.Type().In(i) { - return fmt.Errorf("method '%s' argument is not match. %s-%s", callerMethod.Name, calleeMethod.Func.Type().In(i).Name(), callerMethod.Func.Type().In(i).Name()) - } - } - - } - - return nil -} - -type fnsig struct { - FunctionName string `bson:"fn"` - Args []any `bson:"args"` -} - -func Encode[T any](prefix T, fn string, args ...any) ([]byte, error) { - m := append([]any{ - prefix, - fn, - }, args...) - - buff := new(bytes.Buffer) - encoder := gob.NewEncoder(buff) - err := encoder.Encode(m) - if err != nil { - logger.Error("rpcCallContext.send err :", err) - return nil, err - } - - return buff.Bytes(), nil -} - -func Decode[T any](src []byte) (*T, string, []any, error) { - var m []any - decoder := gob.NewDecoder(bytes.NewReader(src)) - if err := decoder.Decode(&m); err != nil { - logger.Error("RpcCallee.Call err :", err) - return nil, "", nil, err - } - - prfix := m[0].(T) - fn := m[1].(string) - - return &prfix, fn, m[2:], nil -} - -func decode(src []byte) (string, []any, error) { - var sig fnsig - decoder := gob.NewDecoder(bytes.NewReader(src)) - if err := decoder.Decode(&sig); err != nil { - logger.Error("RpcCallee.Call err :", err) - return "", nil, err - } - - return sig.FunctionName, sig.Args, nil -} - -func (c *rpcCallContext) send(fn string, args ...any) error { - bt, err := Encode(c.alias, fn, args...) - if err != nil { - return err - } - return c.publish(bt) -} - -type RpcCallee[T any] struct { - methods map[string]reflect.Method - create func(*wshandler.Richconn) *T -} - -func NewRpcCallee[T any](createReceiverFunc func(*wshandler.Richconn) *T) RpcCallee[T] { - out := RpcCallee[T]{ - methods: make(map[string]reflect.Method), - create: createReceiverFunc, - } - - var tmp *T - - tp := reflect.TypeOf(tmp) - for i := 0; i < tp.NumMethod(); i++ { - method := tp.Method(i) - out.methods[method.Name] = method - } - return out -} - -func (r RpcCallee[T]) Call(rc *wshandler.Richconn, src []byte) error { - defer func() { - s := recover() - if s != nil { - logger.Error(s) - } - }() - - fn, params, err := decode(src) - if err != nil { - logger.Error("RpcCallee.Call err :", err) - return err - } - - method, ok := r.methods[fn] - if !ok { - err := fmt.Errorf("method '%s' is missing", fn) - logger.Error("RpcCallee.Call err :", err) - return err - } - - receiver := r.create(rc) - args := []reflect.Value{ - reflect.ValueOf(receiver), - } - for _, arg := range params { - args = append(args, reflect.ValueOf(arg)) - } - - rets := method.Func.Call(args) - if len(rets) > 0 && rets[len(rets)-1].Interface() != nil { - return rets[len(rets)-1].Interface().(error) - } - return nil -} diff --git a/core/rpc/proxy.go b/core/rpc/proxy.go deleted file mode 100644 index 8996b4b..0000000 --- a/core/rpc/proxy.go +++ /dev/null @@ -1,33 +0,0 @@ -package rpc - -import ( - "go.mongodb.org/mongo-driver/bson/primitive" -) - -func (c rpcCallContext) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error { - return c.send("JoinTag", region, tag, tid, hint) -} - -func (c rpcCallContext) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, hint string) error { - return c.send("LeaveTag", region, tag, tid, hint) -} - -func (c rpcCallContext) TurnGroupOnline(key string, gid primitive.ObjectID, score float64) error { - return c.send("TurnGroupOnline", key, gid, score) -} - -func (c rpcCallContext) TurnGroupOffline(key string, gid primitive.ObjectID) error { - return c.send("TurnGroupOffline", key, gid) -} -func (c rpcCallContext) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error { - return c.send("SetStateInTag", region, tag, tid, state, hint) -} -func (c rpcCallContext) SendMessage(doc []byte) error { - return c.send("SendMessage", doc) -} -func (c rpcCallContext) SendMessageToTag(region string, gid primitive.ObjectID, msg []byte) error { - return c.send("SendMessageToTag", region, gid, msg) -} -func (c rpcCallContext) CloseOnPurpose() error { - return c.send("CloseOnPurpose") -} diff --git a/core/tavern.go b/core/tavern.go index d250693..1379120 100644 --- a/core/tavern.go +++ b/core/tavern.go @@ -8,12 +8,11 @@ import ( "net/http" "reflect" "strings" + "sync" "repositories.action2quare.com/ayo/gocommon" - common "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/wshandler" - "repositories.action2quare.com/ayo/tavern/core/rpc" "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/bson" @@ -73,7 +72,7 @@ func readBsonDoc(r io.Reader, src any) error { } type TavernConfig struct { - common.RegionStorageConfig `json:",inline"` + gocommon.RegionStorageConfig `json:",inline"` GroupTypes map[string]*groupConfig `json:"tavern_group_types"` MaingateApiToken string `json:"maingate_api_token"` @@ -83,17 +82,55 @@ type TavernConfig struct { var config TavernConfig +type connectionMap struct { + sync.Mutex + conns map[primitive.ObjectID]*connection +} + +func (cm *connectionMap) add(accid accountID, alias string) { + cm.Lock() + defer cm.Unlock() + + old := cm.conns[accid] + if old != nil { + old.cleanup() + } + cm.conns[accid] = &connection{ + alias: alias, + onClose: make(map[string]func()), + } +} + +func (cm *connectionMap) remove(accid accountID) { + cm.Lock() + defer cm.Unlock() + + old := cm.conns[accid] + if old != nil { + delete(cm.conns, accid) + old.cleanup() + } +} + +func (cm *connectionMap) get(accid accountID) *connection { + cm.Lock() + defer cm.Unlock() + + return cm.conns[accid] +} + type Tavern struct { subTaverns []*subTavern wsh *wshandler.WebsocketHandler } type subTavern struct { - mongoClient common.MongoClient + mongoClient gocommon.MongoClient wsh *wshandler.WebsocketHandler region string groups map[string]group methods map[string]reflect.Method + cm connectionMap redisClient *redis.Client } @@ -118,7 +155,7 @@ func getMacAddr() (string, error) { func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *TavernConfig) (*Tavern, error) { if inconfig == nil { var loaded TavernConfig - if err := common.LoadConfig(&loaded); err != nil { + if err := gocommon.LoadConfig(&loaded); err != nil { return nil, err } inconfig = &loaded @@ -130,7 +167,7 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav return nil, err } config.macAddr = macaddr - tv := Tavern{ + tv := &Tavern{ wsh: wsh, } @@ -139,27 +176,29 @@ func New(context context.Context, wsh *wshandler.WebsocketHandler, inconfig *Tav return nil, err } - return &tv, nil + return tv, nil } -func (tv *Tavern) Destructor() { - tv.wsh.Destructor() +func (tv *Tavern) Cleanup() { for _, st := range tv.subTaverns { st.mongoClient.Close() } } -type groupPipelineDocument struct { - OperationType string `bson:"operationType"` - FullDocument map[string]any `bson:"fullDocument"` - DocumentKey struct { - Id primitive.ObjectID `bson:"_id"` - } `bson:"documentKey"` - UpdateDescription struct { - UpdatedFields bson.M `bson:"updatedFields"` - RemovedFileds bson.A `bson:"removedFields"` - TruncatedArrays bson.A `bson:"truncatedArrays"` - } `bson:"updateDescription"` +func (tv *Tavern) SocketMessageReceived(accid primitive.ObjectID, alias string, messageType wshandler.WebSocketMessageType, body io.Reader) { + switch messageType { + case wshandler.Connected: + case wshandler.Disconnected: + } + // gidtype := msg.Conn.GetTag("gid") + // if len(gidtype) > 0 { + // tokens := strings.SplitN(gidtype, "@", 2) + // gidobj, _ := primitive.ObjectIDFromHex(tokens[0]) + // gtype := tokens[1] + // group := sub.groups[gtype] + // if group != nil { + // group.PauseMember(gidobj, msg.Alias, msg.Conn) + // } } func (tv *Tavern) prepare(ctx context.Context) error { @@ -169,15 +208,11 @@ func (tv *Tavern) prepare(ctx context.Context) error { return err } - for region, url := range config.RegionStorage { - var dbconn common.MongoClient + for region := range config.RegionStorage { + var dbconn gocommon.MongoClient var err error var groupinstance group - if err := rpc.IsCallerCalleeMethodMatch[connection](); err != nil { - return err - } - var tmp *subTavern methods := make(map[string]reflect.Method) tp := reflect.TypeOf(tmp) @@ -192,17 +227,20 @@ func (tv *Tavern) prepare(ctx context.Context) error { region: region, methods: methods, redisClient: redisClient, + cm: connectionMap{ + conns: make(map[primitive.ObjectID]*connection), + }, } groups := make(map[string]group) for typename, cfg := range config.GroupTypes { cfg.Name = typename if cfg.Transient { - groupinstance, err = cfg.prepareInMemory(ctx, region, typename, tv.wsh) - } else { + groupinstance, err = cfg.prepareInMemory(ctx, typename, sub) + //} else { // TODO : db // if !dbconn.Connected() { - // dbconn, err = common.NewMongoClient(ctx, url.Mongo, region) + // dbconn, err = gocommon.NewMongoClient(ctx, url.Mongo, region) // if err != nil { // return err // } @@ -224,21 +262,27 @@ func (tv *Tavern) prepare(ctx context.Context) error { func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error { for _, sub := range tv.subTaverns { + tv.wsh.RegisterReceiver(sub.region, sub.clientMessageReceived) var pattern string if sub.region == "default" { - pattern = common.MakeHttpHandlerPattern(prefix, "api") + pattern = gocommon.MakeHttpHandlerPattern(prefix, "api") } else { - pattern = common.MakeHttpHandlerPattern(prefix, sub.region, "api") + pattern = gocommon.MakeHttpHandlerPattern(prefix, sub.region, "api") } serveMux.HandleFunc(pattern, sub.api) - - deliveryChan := tv.wsh.DeliveryChannel(sub.region) - go sub.deliveryMessageHandler(deliveryChan) } return nil } +func (sub *subTavern) clientMessageReceived(accid primitive.ObjectID, alias string, messageType wshandler.WebSocketMessageType, body io.Reader) { + if messageType == wshandler.Connected { + sub.cm.add(accid, alias) + } else if messageType == wshandler.Disconnected { + sub.cm.remove(accid) + } +} + func (sub *subTavern) api(w http.ResponseWriter, r *http.Request) { defer func() { s := recover() diff --git a/go.mod b/go.mod index 1cafdd6..5ef6a2f 100644 --- a/go.mod +++ b/go.mod @@ -4,15 +4,15 @@ go 1.19 require ( github.com/go-redis/redis/v8 v8.11.5 - github.com/gorilla/websocket v1.5.0 go.mongodb.org/mongo-driver v1.11.7 - repositories.action2quare.com/ayo/gocommon v0.0.0-20230705150145-7ef3e68d161e + repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee ) require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/klauspost/compress v1.16.6 // indirect github.com/montanaflynn/stats v0.7.1 // indirect github.com/pires/go-proxyproto v0.7.0 // indirect diff --git a/go.sum b/go.sum index 190b6ba..cf031a8 100644 --- a/go.sum +++ b/go.sum @@ -102,11 +102,5 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22 h1:DImSGNxZrc+Q4WlS1OKMsLAScEfDYLX4XMJdjAaVnXc= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230621052811-06ef97f11d22/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230705022744-edd2f7aab52e h1:l9aqmNpEF8V1o0b3eCT/nhC+O1dXMUcPzBPewbshuDI= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230705022744-edd2f7aab52e/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230705132657-822681bf74cf h1:VQ78wRZaKHnWOM+Y2ZxB/EVNopzC4DNbwihledqjwy8= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230705132657-822681bf74cf/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230705150145-7ef3e68d161e h1:LNzK2Fhl1X8JQfn7XsoQwz2H/LY7YmMehEPqCyXgV1U= -repositories.action2quare.com/ayo/gocommon v0.0.0-20230705150145-7ef3e68d161e/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee h1:Aau1j/b9wI4nyvrM7m1Q+2xkcW1Qo7i3q+QBD4Umnzg= +repositories.action2quare.com/ayo/gocommon v0.0.0-20230710053024-a842845685ee/go.mod h1:ng62uGMGXyQSeuxePG5gJAMtip4Rnspu5Tu7hgvaXns= diff --git a/main.go b/main.go index a8b3d41..cd81f4c 100644 --- a/main.go +++ b/main.go @@ -34,13 +34,14 @@ func main() { panic(err) } else { serveMux := http.NewServeMux() - wsh.RegisterHandlers(ctx, serveMux, *prefix) + wsh.RegisterHandlers(serveMux, *prefix) tv.RegisterHandlers(ctx, serveMux, *prefix) server := common.NewHTTPServer(serveMux) logger.Println("tavern is started") + wsh.Start(ctx) server.Start() cancel() - tv.Destructor() - wsh.Destructor() + tv.Cleanup() + wsh.Cleanup() } }