계정 제재 api추가
This commit is contained in:
169
core/member_container.go
Normal file
169
core/member_container.go
Normal file
@ -0,0 +1,169 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
)
|
||||
|
||||
type memberContraints[K comparable] interface {
|
||||
Key() K
|
||||
Expired() bool
|
||||
}
|
||||
|
||||
type memberContainerPtr[K comparable, T memberContraints[K]] struct {
|
||||
ptr unsafe.Pointer
|
||||
}
|
||||
|
||||
func (p *memberContainerPtr[K, T]) init(ms []T) {
|
||||
next := map[K]T{}
|
||||
for _, m := range ms {
|
||||
next[m.Key()] = m
|
||||
}
|
||||
atomic.StorePointer(&p.ptr, unsafe.Pointer(&next))
|
||||
}
|
||||
|
||||
func (p *memberContainerPtr[K, T]) add(m T) {
|
||||
ptr := atomic.LoadPointer(&p.ptr)
|
||||
src := (*map[K]T)(ptr)
|
||||
|
||||
next := map[K]T{}
|
||||
for k, v := range *src {
|
||||
next[k] = v
|
||||
}
|
||||
next[m.Key()] = m
|
||||
atomic.StorePointer(&p.ptr, unsafe.Pointer(&next))
|
||||
}
|
||||
|
||||
func (p *memberContainerPtr[K, T]) remove(key K) {
|
||||
ptr := atomic.LoadPointer(&p.ptr)
|
||||
src := (*map[K]T)(ptr)
|
||||
|
||||
next := map[K]T{}
|
||||
for k, v := range *src {
|
||||
next[k] = v
|
||||
}
|
||||
delete(next, key)
|
||||
atomic.StorePointer(&p.ptr, unsafe.Pointer(&next))
|
||||
}
|
||||
|
||||
type memberPipelineDocument[K comparable, T memberContraints[K]] struct {
|
||||
OperationType string `bson:"operationType"`
|
||||
DocumentKey struct {
|
||||
Id primitive.ObjectID `bson:"_id"`
|
||||
} `bson:"documentKey"`
|
||||
Member T `bson:"fullDocument"`
|
||||
}
|
||||
|
||||
func (p *memberContainerPtr[K, T]) all() []T {
|
||||
ptr := atomic.LoadPointer(&p.ptr)
|
||||
src := (*map[K]T)(ptr)
|
||||
|
||||
out := make([]T, 0, len(*src))
|
||||
for _, m := range *src {
|
||||
if m.Expired() {
|
||||
continue
|
||||
}
|
||||
out = append(out, m)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (p *memberContainerPtr[K, T]) contains(key K, out *T) bool {
|
||||
ptr := atomic.LoadPointer(&p.ptr)
|
||||
src := (*map[K]T)(ptr)
|
||||
|
||||
found, exists := (*src)[key]
|
||||
if exists {
|
||||
if found.Expired() {
|
||||
p.remove(key)
|
||||
return false
|
||||
}
|
||||
if out != nil {
|
||||
out = &found
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *memberContainerPtr[K, T]) watchCollection(parentctx context.Context, coll gocommon.CollectionName, mc gocommon.MongoClient) {
|
||||
defer func() {
|
||||
s := recover()
|
||||
if s != nil {
|
||||
logger.Error(s)
|
||||
}
|
||||
}()
|
||||
|
||||
matchStage := bson.D{
|
||||
{
|
||||
Key: "$match", Value: bson.D{
|
||||
{Key: "operationType", Value: bson.D{
|
||||
{Key: "$in", Value: bson.A{
|
||||
"update",
|
||||
"insert",
|
||||
}},
|
||||
}},
|
||||
},
|
||||
}}
|
||||
projectStage := bson.D{
|
||||
{
|
||||
Key: "$project", Value: bson.D{
|
||||
{Key: "documentKey", Value: 1},
|
||||
{Key: "fullDocument", Value: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var stream *mongo.ChangeStream
|
||||
var err error
|
||||
var ctx context.Context
|
||||
|
||||
for {
|
||||
if stream == nil {
|
||||
stream, err = mc.Watch(coll, mongo.Pipeline{matchStage, projectStage})
|
||||
if err != nil {
|
||||
logger.Error("watchCollection watch failed :", err)
|
||||
time.Sleep(time.Minute)
|
||||
continue
|
||||
}
|
||||
ctx = context.TODO()
|
||||
}
|
||||
|
||||
changed := stream.TryNext(ctx)
|
||||
if ctx.Err() != nil {
|
||||
logger.Error("watchCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
||||
break
|
||||
}
|
||||
|
||||
if changed {
|
||||
var data memberPipelineDocument[K, T]
|
||||
if err := stream.Decode(&data); err == nil {
|
||||
p.add(data.Member)
|
||||
} else {
|
||||
logger.Error("watchCollection stream.Decode failed :", err)
|
||||
}
|
||||
} else if stream.Err() != nil || stream.ID() == 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Println("watchCollection is done")
|
||||
stream.Close(ctx)
|
||||
return
|
||||
|
||||
case <-time.After(time.Second):
|
||||
logger.Error("watchCollection stream error :", stream.Err())
|
||||
stream.Close(ctx)
|
||||
stream = nil
|
||||
}
|
||||
} else {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user