session.provider로 교체
This commit is contained in:
@ -10,7 +10,6 @@ import (
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
@ -19,14 +18,6 @@ import (
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
type authPipelineDocument struct {
|
||||
OperationType string `bson:"operationType"`
|
||||
DocumentKey struct {
|
||||
Id primitive.ObjectID `bson:"_id"`
|
||||
} `bson:"documentKey"`
|
||||
Authinfo *gocommon.Authinfo `bson:"fullDocument"`
|
||||
}
|
||||
|
||||
type servicePipelineDocument struct {
|
||||
OperationType string `bson:"operationType"`
|
||||
DocumentKey struct {
|
||||
@ -222,87 +213,3 @@ func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux *
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func watchAuthCollection(parentctx context.Context, ac *gocommon.AuthCollection, mongoClient gocommon.MongoClient) {
|
||||
defer func() {
|
||||
s := recover()
|
||||
if s != nil {
|
||||
logger.Error(s)
|
||||
}
|
||||
}()
|
||||
|
||||
matchStage := bson.D{
|
||||
{
|
||||
Key: "$match", Value: bson.D{
|
||||
{Key: "operationType", Value: bson.D{
|
||||
{Key: "$in", Value: bson.A{
|
||||
"delete",
|
||||
"insert",
|
||||
"update",
|
||||
}},
|
||||
}},
|
||||
},
|
||||
}}
|
||||
projectStage := bson.D{
|
||||
{
|
||||
Key: "$project", Value: bson.D{
|
||||
{Key: "documentKey", Value: 1},
|
||||
{Key: "operationType", Value: 1},
|
||||
{Key: "fullDocument", Value: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var stream *mongo.ChangeStream
|
||||
var err error
|
||||
var ctx context.Context
|
||||
|
||||
for {
|
||||
if stream == nil {
|
||||
stream, err = mongoClient.Watch(CollectionAuth, mongo.Pipeline{matchStage, projectStage})
|
||||
if err != nil {
|
||||
logger.Error("watchAuthCollection watch failed :", err)
|
||||
time.Sleep(time.Minute)
|
||||
continue
|
||||
}
|
||||
ctx = context.TODO()
|
||||
}
|
||||
|
||||
changed := stream.TryNext(ctx)
|
||||
if ctx.Err() != nil {
|
||||
logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
||||
break
|
||||
}
|
||||
|
||||
if changed {
|
||||
var data authPipelineDocument
|
||||
if err := stream.Decode(&data); err == nil {
|
||||
ot := data.OperationType
|
||||
switch ot {
|
||||
case "insert":
|
||||
ac.AddRaw(&mongoAuthCell{src: data.Authinfo})
|
||||
case "update":
|
||||
ac.AddRaw(&mongoAuthCell{src: data.Authinfo})
|
||||
case "delete":
|
||||
ac.RemoveByAccId(data.DocumentKey.Id)
|
||||
}
|
||||
} else {
|
||||
logger.Error("watchAuthCollection stream.Decode failed :", err)
|
||||
}
|
||||
} else if stream.Err() != nil || stream.ID() == 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Println("watchAuthCollection is done")
|
||||
stream.Close(ctx)
|
||||
return
|
||||
|
||||
case <-time.After(time.Second):
|
||||
logger.Error("watchAuthCollection stream error :", stream.Err())
|
||||
stream.Close(ctx)
|
||||
stream = nil
|
||||
}
|
||||
} else {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user