redis stack 사용
This commit is contained in:
265
core/apiimpl.go
265
core/apiimpl.go
@ -1,7 +1,6 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
@ -9,7 +8,6 @@ import (
|
||||
common "repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
@ -83,7 +81,7 @@ func (sub *subTavern) JoinGroup(w http.ResponseWriter, r *http.Request) {
|
||||
if candidate, ok := common.ReadBoolFormValue(r.Form, "candidate"); ok && candidate {
|
||||
err = group.Candidate(gidobj, midobj, doc)
|
||||
} else {
|
||||
tidobj, err = group.Join(gidobj, midobj, tidobj, doc)
|
||||
err = group.Join(gidobj, midobj, doc)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
@ -184,12 +182,6 @@ func (sub *subTavern) AcceptInvitation(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
gid, _ := common.ReadObjectIDFormValue(r.Form, "gid")
|
||||
mid, _ := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
tid, ok := common.ReadObjectIDFormValue(r.Form, "tid")
|
||||
if !ok {
|
||||
logger.Println("CancelInvitation failed. form value 'tid' is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var member bson.M
|
||||
if err := readBsonDoc(r.Body, &member); err != nil {
|
||||
@ -198,14 +190,15 @@ func (sub *subTavern) AcceptInvitation(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
gidbytes, err := group.AcceptInvitation(gid, mid, tid, member)
|
||||
err := group.AcceptInvitation(gid, mid, member)
|
||||
if err != nil {
|
||||
logger.Error("AcceptInvitation failed. group.AcceptInvitation returns err :", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Write([]byte(gidbytes.Hex()))
|
||||
// TODO : full group doc을 내려보냄
|
||||
// w.Write([]byte(gidbytes.Hex()))
|
||||
}
|
||||
|
||||
func (sub *subTavern) DenyInvitation(w http.ResponseWriter, r *http.Request) {
|
||||
@ -270,49 +263,6 @@ func (sub *subTavern) QueryInvitations(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) QueryOnlineGroup(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
if group == nil {
|
||||
logger.Println("QueryOnlineGroup failed. group type is missing :", r.Form)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var cmd *redis.StringSliceCmd
|
||||
scoreStart, _ := common.ReadStringFormValue(r.Form, "score_start")
|
||||
scoreStop, _ := common.ReadStringFormValue(r.Form, "score_stop")
|
||||
|
||||
if len(scoreStart) > 0 || len(scoreStop) > 0 {
|
||||
if len(scoreStart) == 0 {
|
||||
scoreStart = "-inf"
|
||||
}
|
||||
if len(scoreStop) == 0 {
|
||||
scoreStop = "+inf"
|
||||
}
|
||||
cmd = sub.wsh.RedisSync.ZRangeArgs(context.Background(), redis.ZRangeArgs{
|
||||
Key: onlineGroupQueryKey(typename),
|
||||
ByScore: true,
|
||||
Start: scoreStart,
|
||||
Stop: scoreStop,
|
||||
Rev: true,
|
||||
Count: 1,
|
||||
})
|
||||
} else {
|
||||
// 아무거나
|
||||
cmd = sub.wsh.RedisSync.ZRandMember(context.Background(), onlineGroupQueryKey(typename), 1, false)
|
||||
}
|
||||
|
||||
result, err := cmd.Result()
|
||||
if err != nil {
|
||||
logger.Error("QueryOnlineGroup failed. redid.ZRandMember returns err :", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
writeBsonDoc(w, bson.M{"r": result})
|
||||
}
|
||||
|
||||
func (sub *subTavern) SearchGroup(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
@ -426,95 +376,6 @@ func (sub *subTavern) QueryGroup(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// QueryGroupMembers : 그룹내 멤버 조회
|
||||
// - type : 그룹 타입
|
||||
// - 그룹 타입에 맞는 키(주로 _id)
|
||||
// - projection : select할 필드. ,로 구분
|
||||
func (sub *subTavern) QueryGroupMembers(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
if group == nil {
|
||||
logger.Println("QueryGroupMembers failed. group type is missing :", r.Form)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
|
||||
if !ok {
|
||||
logger.Println("QueryGroupMembers failed. _id is missing :", r.Form)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
midobj, _ := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
var after primitive.Timestamp
|
||||
if ts, ok := common.ReadStringFormValue(r.Form, "after"); ok && ts != "0.0" {
|
||||
after = common.DotStringToTimestamp(ts)
|
||||
}
|
||||
projection, _ := common.ReadStringFormValue(r.Form, "projection")
|
||||
|
||||
result, err := group.QueryMembers(gidobj, midobj, projection, after)
|
||||
if err != nil {
|
||||
logger.Error("QueryGroupMembers failed. FindAll err :", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if result == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := writeBsonDoc(w, result); err != nil {
|
||||
logger.Error("QueryGroupMembers failed. writeBsonArr err :", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) QueryGroupMember(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
if group == nil {
|
||||
logger.Println("QueryGroupMember failed. group type is missing :", r.Form)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
gid, ok := common.ReadObjectIDFormValue(r.Form, "gid")
|
||||
if !ok {
|
||||
logger.Println("QueryGroupMember failed. gid is missing :", r.Form)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
mid, midok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
tid, tidok := common.ReadObjectIDFormValue(r.Form, "tid")
|
||||
if !midok && !tidok {
|
||||
// 둘 중 하나는 있어야지
|
||||
logger.Println("QueryGroupMember failed. tid and mid are both missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
projection, _ := common.ReadStringFormValue(r.Form, "projection")
|
||||
result, err := group.QueryMember(gid, mid, tid, projection)
|
||||
if err != nil {
|
||||
logger.Println("QueryGroupMember failed. group.QueryMember returns err :", err)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if result == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := writeBsonDoc(w, result); err != nil {
|
||||
logger.Error("QueryGroupMember failed. writeBsonDoc err :", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// LeaveGroup : 그룹에서 나감 or 내보냄
|
||||
// - type : 그룹 타입
|
||||
// - 그룹 타입에 맞는 키(주로 _id)
|
||||
@ -535,15 +396,14 @@ func (sub *subTavern) LeaveGroup(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
mid, midok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
tid, tidok := common.ReadObjectIDFormValue(r.Form, "tid")
|
||||
if !midok && !tidok {
|
||||
// 둘 중 하나는 있어야지
|
||||
logger.Println("LeaveGroup failed. tid and mid are both missing")
|
||||
|
||||
if !midok {
|
||||
logger.Println("LeaveGroup failed. mid is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if err := group.Leave(gid, mid, tid); err != nil {
|
||||
if err := group.Leave(gid, mid); err != nil {
|
||||
// 둘 중 하나는 있어야지
|
||||
logger.Println("LeaveGroup failed. group.Leave returns err :", err)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
@ -639,112 +499,3 @@ func (sub *subTavern) UpdateGroupDocument(w http.ResponseWriter, r *http.Request
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (sub *subTavern) PauseGroupMember(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
if group == nil {
|
||||
logger.Println("DismissGroup failed. type is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
midobj, ok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
if !ok {
|
||||
logger.Println("UpdateMemberDocument failed. member_id is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
gidobj, ok := common.ReadObjectIDFormValue(r.Form, "gid")
|
||||
if !ok {
|
||||
logger.Println("UpdateMemberDocument failed. _id is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
group.PauseMember(gidobj, midobj)
|
||||
}
|
||||
|
||||
func (sub *subTavern) DropPausedMember(w http.ResponseWriter, r *http.Request) {
|
||||
typename, _ := common.ReadStringFormValue(r.Form, "type")
|
||||
group := sub.groups[typename]
|
||||
if group == nil {
|
||||
logger.Println("DropDeadMember failed. type is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
gid, ok := common.ReadObjectIDFormValue(r.Form, "gid")
|
||||
if !ok {
|
||||
logger.Println("DropDeadMember failed. gid is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
mid, ok := common.ReadObjectIDFormValue(r.Form, "mid")
|
||||
if !ok {
|
||||
logger.Println("DropDeadMember failed. mid is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if err := group.DropPausedMember(gid, mid); err != nil {
|
||||
logger.Error("DropDeadMember failed. group.DropDeadMember returns err :", err)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// func (sub *subTavern) deliveryMessageHandler(deliveryChan <-chan wshandler.DeliveryMessage) {
|
||||
// defer func() {
|
||||
// r := recover()
|
||||
// if r != nil {
|
||||
// logger.Error(r)
|
||||
// }
|
||||
// }()
|
||||
|
||||
// redisSync := sub.wsh.RedisSync
|
||||
// for msg := range deliveryChan {
|
||||
// mid := msg.Alias
|
||||
// if msg.Body != nil {
|
||||
// buffer := msg.Body
|
||||
|
||||
// var channame string
|
||||
// for i, ch := range buffer {
|
||||
// if ch == 0 {
|
||||
// channame = string(buffer[:i])
|
||||
// buffer = buffer[i+1:]
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
|
||||
// if len(channame) == 0 {
|
||||
// continue
|
||||
// }
|
||||
|
||||
// buffer = append(mid[:], buffer...)
|
||||
// _, err := redisSync.Publish(context.Background(), channame, buffer).Result()
|
||||
// if err != nil {
|
||||
// logger.Error(err)
|
||||
// }
|
||||
// }
|
||||
|
||||
// if len(msg.Command) > 0 {
|
||||
// switch msg.Command {
|
||||
// case "pause":
|
||||
// gidtype := msg.Conn.GetTag("gid")
|
||||
// if len(gidtype) > 0 {
|
||||
// tokens := strings.SplitN(gidtype, "@", 2)
|
||||
// gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
|
||||
// gtype := tokens[1]
|
||||
// group := sub.groups[gtype]
|
||||
// if group != nil {
|
||||
// group.PauseMember(gidobj, msg.Alias, msg.Conn)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// logger.Println("delivery chan fin")
|
||||
// }
|
||||
|
||||
1
core/config.json
Normal file
1
core/config.json
Normal file
@ -0,0 +1 @@
|
||||
{}
|
||||
@ -25,21 +25,16 @@ type groupConfig struct {
|
||||
type group interface {
|
||||
Create(form url.Values, doc bson.M) (primitive.ObjectID, error)
|
||||
Candidate(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error
|
||||
Join(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, doc bson.M) (newTicketID primitive.ObjectID, err error)
|
||||
FindTicketID(groupID primitive.ObjectID, memberID primitive.ObjectID) primitive.ObjectID
|
||||
Join(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error
|
||||
Invite(groupID primitive.ObjectID, memberID primitive.ObjectID, inviterDoc bson.M, inviteeDoc bson.M) (string, error)
|
||||
CancelInvitation(groupID primitive.ObjectID, ticketID primitive.ObjectID) error
|
||||
AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID, member bson.M) (primitive.ObjectID, error)
|
||||
AcceptInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, member bson.M) error
|
||||
DenyInvitation(groupID primitive.ObjectID, mid primitive.ObjectID, ticketID primitive.ObjectID) error
|
||||
QueryInvitations(memberID primitive.ObjectID, after primitive.Timestamp) ([]bson.M, error)
|
||||
Exist(groupID primitive.ObjectID, filter bson.M) (bool, error)
|
||||
FindAll(filter bson.M, projection string, after primitive.Timestamp) ([]bson.M, error)
|
||||
FindOne(groupID primitive.ObjectID, projection string) (bson.M, error)
|
||||
QueryMembers(groupID primitive.ObjectID, requesterID primitive.ObjectID, projection string, after primitive.Timestamp) (map[string]bson.M, error)
|
||||
QueryMember(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID, projection string) (bson.M, error)
|
||||
Leave(groupID primitive.ObjectID, memberID primitive.ObjectID, ticketID primitive.ObjectID) error
|
||||
DropPausedMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error
|
||||
PauseMember(groupID primitive.ObjectID, memberID primitive.ObjectID) error
|
||||
Leave(groupID primitive.ObjectID, memberID primitive.ObjectID) error
|
||||
UpdateMemberDocument(groupID primitive.ObjectID, memberID primitive.ObjectID, doc bson.M) error
|
||||
Dismiss(groupID primitive.ObjectID) error
|
||||
UpdateGroupDocument(groupID primitive.ObjectID, body []byte) error
|
||||
|
||||
1180
core/group_memory.go
1180
core/group_memory.go
File diff suppressed because it is too large
Load Diff
210
core/rejson_handler.go
Normal file
210
core/rejson_handler.go
Normal file
@ -0,0 +1,210 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/nitishm/go-rejson/v4"
|
||||
"github.com/nitishm/go-rejson/v4/rjs"
|
||||
)
|
||||
|
||||
// gocommon으로 옮길 거
|
||||
type RejsonHandler struct {
|
||||
*redis.Client
|
||||
inner *rejson.Handler
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewReJSONHandler(ctx context.Context, redisClient *redis.Client) *RejsonHandler {
|
||||
inner := rejson.NewReJSONHandler()
|
||||
inner.SetGoRedisClientWithContext(ctx, redisClient)
|
||||
return &RejsonHandler{
|
||||
Client: redisClient,
|
||||
inner: inner,
|
||||
ctx: ctx,
|
||||
}
|
||||
}
|
||||
|
||||
func respToArray[T any](resp any) []T {
|
||||
resArr := resp.([]any)
|
||||
v := make([]T, len(resArr))
|
||||
for i, e := range resArr {
|
||||
v[i] = e.(T)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONMSet(key string, kv map[string]any) error {
|
||||
pl := rh.Pipeline()
|
||||
for path, obj := range kv {
|
||||
b, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pl.Do(rh.ctx, []any{"JSON.SET"}, key, path, b)
|
||||
}
|
||||
|
||||
cmders, err := pl.Exec(rh.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, cmder := range cmders {
|
||||
if cmder.Err() != nil {
|
||||
return cmder.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONSet(key, path string, obj any, opts ...rjs.SetOption) (res interface{}, err error) {
|
||||
b, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args := []any{
|
||||
[]any{"JSON.SET"},
|
||||
key,
|
||||
path,
|
||||
b,
|
||||
}
|
||||
if len(opts) > 0 {
|
||||
args = append(args, opts[0])
|
||||
}
|
||||
|
||||
return rh.Do(rh.ctx, args...).Result()
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONGet(key, path string, opts ...rjs.GetOption) (res interface{}, err error) {
|
||||
return rh.inner.JSONGet(key, path, opts...)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONGetString(key, path string) ([]string, error) {
|
||||
res, err := rh.inner.JSONResp(key, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return respToArray[string](res), nil
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONGetInt64(key, path string) ([]int64, error) {
|
||||
res, err := rh.inner.JSONResp(key, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return respToArray[int64](res), nil
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONMGet(path string, keys ...string) (res interface{}, err error) {
|
||||
return rh.inner.JSONMGet(path, keys...)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONMDel(key string, paths []string) error {
|
||||
pl := rh.Pipeline()
|
||||
for _, path := range paths {
|
||||
name, args, err := rjs.CommandBuilder(rjs.ReJSONCommandDEL, key, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
args = append([]interface{}{name}, args...)
|
||||
pl.Do(rh.ctx, args...)
|
||||
}
|
||||
_, err := pl.Exec(rh.ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONDel(key, path string) (res interface{}, err error) {
|
||||
return rh.inner.JSONDel(key, path)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONType(key, path string) (res interface{}, err error) {
|
||||
return rh.inner.JSONType(key, path)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONNumIncrBy(key, path string, number int) (res interface{}, err error) {
|
||||
return rh.inner.JSONNumIncrBy(key, path, number)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONNumMultBy(key, path string, number int) (res interface{}, err error) {
|
||||
return rh.inner.JSONNumMultBy(key, path, number)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONStrAppend(key, path string, jsonstring string) (res interface{}, err error) {
|
||||
return rh.inner.JSONStrAppend(key, path, jsonstring)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONStrLen(key, path string) (res interface{}, err error) {
|
||||
return rh.inner.JSONStrLen(key, path)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONArrAppend(key, path string, values ...interface{}) (res interface{}, err error) {
|
||||
return rh.inner.JSONArrAppend(key, path, values...)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONArrLen(key, path string) (res interface{}, err error) {
|
||||
return rh.inner.JSONArrLen(key, path)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONArrPop(key, path string, index int) (res interface{}, err error) {
|
||||
return rh.inner.JSONArrPop(key, path, index)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONArrIndex(key, path string, jsonValue interface{}, optionalRange ...int) (res interface{}, err error) {
|
||||
return rh.inner.JSONArrIndex(key, path, jsonValue, optionalRange...)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONArrTrim(key, path string, start, end int) (res interface{}, err error) {
|
||||
return rh.inner.JSONArrTrim(key, path, start, end)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONArrInsert(key, path string, index int, values ...interface{}) (res interface{}, err error) {
|
||||
return rh.inner.JSONArrInsert(key, path, index, values...)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONObjKeys(key, path string) ([]string, error) {
|
||||
name, args, err := rjs.CommandBuilder(rjs.ReJSONCommandOBJKEYS, key, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
args = append([]interface{}{name}, args...)
|
||||
res, err := rh.Do(rh.ctx, args...).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resArr := res.([]any)
|
||||
resArr = resArr[0].([]any)
|
||||
slc := make([]string, len(resArr))
|
||||
|
||||
for i, r := range resArr {
|
||||
slc[i] = r.(string)
|
||||
}
|
||||
|
||||
return slc, nil
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONObjLen(key, path string) (res interface{}, err error) {
|
||||
return rh.inner.JSONObjLen(key, path)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONObjLenInt64(key, path string) (int64, error) {
|
||||
res, err := rh.inner.JSONObjLen(key, path)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return res.([]any)[0].(int64), nil
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONDebug(subCmd rjs.DebugSubCommand, key, path string) (res interface{}, err error) {
|
||||
return rh.inner.JSONDebug(subCmd, key, path)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONForget(key, path string) (res interface{}, err error) {
|
||||
return rh.inner.JSONForget(key, path)
|
||||
}
|
||||
|
||||
func (rh *RejsonHandler) JSONResp(key, path string) (res interface{}, err error) {
|
||||
return rh.inner.JSONResp(key, path)
|
||||
}
|
||||
@ -1,97 +0,0 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type connection struct {
|
||||
locker sync.Mutex
|
||||
alias string
|
||||
tags []string
|
||||
onClose map[string]func()
|
||||
}
|
||||
|
||||
func (rc *connection) addTag(name, val string) {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
prefix := name + "="
|
||||
for i, tag := range rc.tags {
|
||||
if strings.HasPrefix(tag, prefix) {
|
||||
rc.tags[i] = prefix + val
|
||||
return
|
||||
}
|
||||
}
|
||||
rc.tags = append(rc.tags, prefix+val)
|
||||
}
|
||||
|
||||
func (rc *connection) removeTag(name string, val string) {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
whole := fmt.Sprintf("%s=%s", name, val)
|
||||
for i, tag := range rc.tags {
|
||||
if tag == whole {
|
||||
if i == 0 && len(rc.tags) == 1 {
|
||||
rc.tags = nil
|
||||
} else {
|
||||
lastidx := len(rc.tags) - 1
|
||||
if i < lastidx {
|
||||
rc.tags[i] = rc.tags[lastidx]
|
||||
}
|
||||
rc.tags = rc.tags[:lastidx]
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rc *connection) registOnCloseFunc(name string, f func()) {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
if rc.onClose == nil {
|
||||
f()
|
||||
return
|
||||
}
|
||||
rc.onClose[name] = f
|
||||
}
|
||||
|
||||
func (rc *connection) hasOnCloseFunc(name string) bool {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
if rc.onClose == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
_, ok := rc.onClose[name]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (rc *connection) unregistOnCloseFunc(name string) (out func()) {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
if rc.onClose == nil {
|
||||
return
|
||||
}
|
||||
out = rc.onClose[name]
|
||||
delete(rc.onClose, name)
|
||||
return
|
||||
}
|
||||
|
||||
func (rc *connection) cleanup() {
|
||||
rc.locker.Lock()
|
||||
defer rc.locker.Unlock()
|
||||
|
||||
cp := rc.onClose
|
||||
rc.onClose = nil
|
||||
go func() {
|
||||
for _, f := range cp {
|
||||
f()
|
||||
}
|
||||
}()
|
||||
}
|
||||
@ -9,7 +9,6 @@ import (
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
@ -30,10 +29,6 @@ func writeBsonArr(w io.Writer, src []bson.M) error {
|
||||
})
|
||||
}
|
||||
|
||||
func onlineGroupQueryKey(prefix string) string {
|
||||
return prefix + "_olg"
|
||||
}
|
||||
|
||||
func writeBsonDoc[T any](w io.Writer, src T) error {
|
||||
rw, err := bsonrw.NewBSONValueWriter(w)
|
||||
if err != nil {
|
||||
@ -82,43 +77,6 @@ type TavernConfig struct {
|
||||
|
||||
var config TavernConfig
|
||||
|
||||
type connectionMap struct {
|
||||
sync.Mutex
|
||||
conns map[primitive.ObjectID]*connection
|
||||
}
|
||||
|
||||
func (cm *connectionMap) add(accid accountID, alias string) {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
||||
old := cm.conns[accid]
|
||||
if old != nil {
|
||||
old.cleanup()
|
||||
}
|
||||
cm.conns[accid] = &connection{
|
||||
alias: alias,
|
||||
onClose: make(map[string]func()),
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *connectionMap) remove(accid accountID) {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
||||
old := cm.conns[accid]
|
||||
if old != nil {
|
||||
delete(cm.conns, accid)
|
||||
old.cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *connectionMap) get(accid accountID) *connection {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
||||
return cm.conns[accid]
|
||||
}
|
||||
|
||||
type Tavern struct {
|
||||
subTaverns []*subTavern
|
||||
wsh *wshandler.WebsocketHandler
|
||||
@ -130,7 +88,6 @@ type subTavern struct {
|
||||
region string
|
||||
groups map[string]group
|
||||
methods map[string]reflect.Method
|
||||
cm connectionMap
|
||||
}
|
||||
|
||||
func getMacAddr() (string, error) {
|
||||
@ -183,23 +140,6 @@ func (tv *Tavern) Cleanup() {
|
||||
}
|
||||
}
|
||||
|
||||
// func (tv *Tavern) SocketMessageReceived(accid primitive.ObjectID, alias string, messageType wshandler.WebSocketMessageType, body io.Reader) {
|
||||
// switch messageType {
|
||||
// case wshandler.Connected:
|
||||
|
||||
// case wshandler.Disconnected:
|
||||
// }
|
||||
// // gidtype := msg.Conn.GetTag("gid")
|
||||
// // if len(gidtype) > 0 {
|
||||
// // tokens := strings.SplitN(gidtype, "@", 2)
|
||||
// // gidobj, _ := primitive.ObjectIDFromHex(tokens[0])
|
||||
// // gtype := tokens[1]
|
||||
// // group := sub.groups[gtype]
|
||||
// // if group != nil {
|
||||
// // group.PauseMember(gidobj, msg.Alias, msg.Conn)
|
||||
// // }
|
||||
// }
|
||||
|
||||
func (tv *Tavern) prepare(ctx context.Context) error {
|
||||
for region := range config.RegionStorage {
|
||||
var dbconn gocommon.MongoClient
|
||||
@ -219,9 +159,6 @@ func (tv *Tavern) prepare(ctx context.Context) error {
|
||||
mongoClient: dbconn,
|
||||
region: region,
|
||||
methods: methods,
|
||||
cm: connectionMap{
|
||||
conns: make(map[primitive.ObjectID]*connection),
|
||||
},
|
||||
}
|
||||
|
||||
groups := make(map[string]group)
|
||||
@ -269,9 +206,9 @@ func (tv *Tavern) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux,
|
||||
|
||||
func (sub *subTavern) clientMessageReceived(sender *wshandler.Sender, messageType wshandler.WebSocketMessageType, body io.Reader) {
|
||||
if messageType == wshandler.Connected {
|
||||
sub.cm.add(sender.Accid, sender.Alias)
|
||||
|
||||
} else if messageType == wshandler.Disconnected {
|
||||
sub.cm.remove(sender.Accid)
|
||||
|
||||
} else if messageType == wshandler.BinaryMessage {
|
||||
var msg map[string][]any
|
||||
dec := json.NewDecoder(body)
|
||||
|
||||
74
core/tavern_test.go
Normal file
74
core/tavern_test.go
Normal file
@ -0,0 +1,74 @@
|
||||
// warroom project main.go
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
)
|
||||
|
||||
func TestReJSON(t *testing.T) {
|
||||
rc := redis.NewClient(&redis.Options{Addr: "192.168.8.94:6380"})
|
||||
rh := NewReJSONHandler(context.Background(), rc)
|
||||
|
||||
testDoc := map[string]any{
|
||||
"members": map[string]any{
|
||||
"mid2": map[string]any{
|
||||
"key": "val",
|
||||
"exp": 20202020,
|
||||
},
|
||||
"mid1": map[string]any{
|
||||
"key": "val",
|
||||
"exp": 10101010,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
gd := groupDoc{
|
||||
id: primitive.NewObjectID(),
|
||||
}
|
||||
|
||||
midin := primitive.NewObjectID()
|
||||
tid := gd.tid(midin)
|
||||
midout := gd.mid(tid)
|
||||
logger.Println(midin, tid, midout)
|
||||
|
||||
res, err := rh.JSONSet("jsontest", "$", testDoc)
|
||||
logger.Println(res, err)
|
||||
|
||||
res, err = rh.JSONResp("jsontest", "$.members")
|
||||
vars := res.([]any)
|
||||
logger.Println(vars, err)
|
||||
|
||||
res, err = rh.JSONGetString("jsontest", "$.members..key")
|
||||
logger.Println(res, err)
|
||||
|
||||
res, err = rh.JSONGetInt64("jsontest", "$.members..exp")
|
||||
logger.Println(res, err)
|
||||
|
||||
res, err = rh.JSONObjKeys("jsontest", "$.members")
|
||||
logger.Println(res, err)
|
||||
|
||||
err = rh.JSONMSet("jsontest", map[string]any{
|
||||
"$.members.mid1.key": "newval",
|
||||
"$.members.mid2.key": "newval",
|
||||
})
|
||||
logger.Println(err)
|
||||
|
||||
res, err = rh.JSONGet("jsontest", "$")
|
||||
logger.Println(string(res.([]byte)), err)
|
||||
|
||||
err = rh.JSONMDel("jsontest", []string{"$.members.mid1", "$.members.mid2"})
|
||||
logger.Println(err)
|
||||
|
||||
res, err = rh.JSONGet("jsontest", "$")
|
||||
logger.Println(string(res.([]byte)), err)
|
||||
|
||||
res, err = rh.JSONObjLen("jsontest", "$.members")
|
||||
count := res.([]any)[0].(int64)
|
||||
logger.Println(count, err)
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user