maingate 이전
This commit is contained in:
330
core/watch.go
Normal file
330
core/watch.go
Normal file
@ -0,0 +1,330 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"repositories.action2quare.com/ayo/go-ayo/logger"
|
||||
|
||||
"repositories.action2quare.com/ayo/go-ayo/common"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
type authPipelineDocument struct {
|
||||
OperationType string `bson:"operationType"`
|
||||
DocumentKey struct {
|
||||
Id primitive.ObjectID `bson:"_id"`
|
||||
} `bson:"documentKey"`
|
||||
Authinfo *common.Authinfo `bson:"fullDocument"`
|
||||
}
|
||||
|
||||
type servicePipelineDocument struct {
|
||||
OperationType string `bson:"operationType"`
|
||||
DocumentKey struct {
|
||||
Id primitive.ObjectID `bson:"_id"`
|
||||
} `bson:"documentKey"`
|
||||
Service *serviceDescription `bson:"fullDocument"`
|
||||
}
|
||||
|
||||
type whilelistPipelineDocument struct {
|
||||
OperationType string `bson:"operationType"`
|
||||
DocumentKey struct {
|
||||
Id primitive.ObjectID `bson:"_id"`
|
||||
} `bson:"documentKey"`
|
||||
Member *whitelistmember `bson:"fullDocument"`
|
||||
}
|
||||
|
||||
func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) {
|
||||
defer func() {
|
||||
s := recover()
|
||||
if s != nil {
|
||||
logger.Error(s)
|
||||
}
|
||||
}()
|
||||
|
||||
matchStage := bson.D{
|
||||
{
|
||||
Key: "$match", Value: bson.D{
|
||||
{Key: "operationType", Value: bson.D{
|
||||
{Key: "$in", Value: bson.A{
|
||||
"update",
|
||||
"insert",
|
||||
}},
|
||||
}},
|
||||
},
|
||||
}}
|
||||
projectStage := bson.D{
|
||||
{
|
||||
Key: "$project", Value: bson.D{
|
||||
{Key: "documentKey", Value: 1},
|
||||
{Key: "operationType", Value: 1},
|
||||
{Key: "fullDocument", Value: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var stream *mongo.ChangeStream
|
||||
var err error
|
||||
var ctx context.Context
|
||||
|
||||
for {
|
||||
if stream == nil {
|
||||
stream, err = mg.mongoClient.Watch(CollectionWhitelist, mongo.Pipeline{matchStage, projectStage})
|
||||
if err != nil {
|
||||
logger.Error("watchWhitelistCollection watch failed :", err)
|
||||
time.Sleep(time.Minute)
|
||||
continue
|
||||
}
|
||||
ctx = context.TODO()
|
||||
}
|
||||
|
||||
changed := stream.TryNext(ctx)
|
||||
if ctx.Err() != nil {
|
||||
logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
||||
break
|
||||
}
|
||||
|
||||
if changed {
|
||||
var data whilelistPipelineDocument
|
||||
if err := stream.Decode(&data); err == nil {
|
||||
ot := data.OperationType
|
||||
switch ot {
|
||||
case "insert":
|
||||
// 새 화이트리스트 멤버
|
||||
if svc := mg.services.get(data.Member.Service); svc != nil {
|
||||
svc.wl.add(data.Member)
|
||||
}
|
||||
case "update":
|
||||
if svc := mg.services.get(data.Member.Service); svc != nil {
|
||||
if data.Member.Expired != 0 {
|
||||
logger.Println("whitelist member is removed :", *data.Member)
|
||||
svc.wl.remove(data.Member.Email)
|
||||
} else {
|
||||
logger.Println("whitelist member is updated :", *data.Member)
|
||||
svc.wl.add(data.Member)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.Error("watchServiceCollection stream.Decode failed :", err)
|
||||
}
|
||||
} else if stream.Err() != nil || stream.ID() == 0 {
|
||||
logger.Error("watchServiceCollection stream error :", stream.Err())
|
||||
stream.Close(ctx)
|
||||
stream = nil
|
||||
} else {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) {
|
||||
defer func() {
|
||||
s := recover()
|
||||
if s != nil {
|
||||
logger.Error(s)
|
||||
}
|
||||
}()
|
||||
|
||||
matchStage := bson.D{
|
||||
{
|
||||
Key: "$match", Value: bson.D{
|
||||
{Key: "operationType", Value: bson.D{
|
||||
{Key: "$in", Value: bson.A{
|
||||
"delete",
|
||||
"insert",
|
||||
"update",
|
||||
"replace",
|
||||
}},
|
||||
}},
|
||||
},
|
||||
}}
|
||||
projectStage := bson.D{
|
||||
{
|
||||
Key: "$project", Value: bson.D{
|
||||
{Key: "operationType", Value: 1},
|
||||
{Key: "fullDocument", Value: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var stream *mongo.ChangeStream
|
||||
var err error
|
||||
var ctx context.Context
|
||||
|
||||
for {
|
||||
if stream == nil {
|
||||
stream, err = mg.mongoClient.Watch(CollectionService, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
|
||||
if err != nil {
|
||||
logger.Error("watchServiceCollection watch failed :", err)
|
||||
time.Sleep(time.Minute)
|
||||
continue
|
||||
}
|
||||
ctx = context.TODO()
|
||||
}
|
||||
|
||||
changed := stream.TryNext(ctx)
|
||||
if ctx.Err() != nil {
|
||||
logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
||||
break
|
||||
}
|
||||
|
||||
if changed {
|
||||
var data servicePipelineDocument
|
||||
if err := stream.Decode(&data); err == nil {
|
||||
ot := data.OperationType
|
||||
switch ot {
|
||||
case "insert":
|
||||
// 새 서비스 추가됨
|
||||
if err := data.Service.prepare(mg); err != nil {
|
||||
logger.Error("service cannot be prepared :", data.Service, err)
|
||||
} else {
|
||||
logger.Println("service is on the board! :", data.Service)
|
||||
mg.services.add(data.Service)
|
||||
serveMux.Handle(common.MakeHttpHandlerPattern(prefix, data.Service.ServiceCode, "/"), data.Service)
|
||||
}
|
||||
|
||||
case "replace":
|
||||
fallthrough
|
||||
|
||||
case "update":
|
||||
data.Service.prepare(mg)
|
||||
if old := mg.services.get(data.Service.ServiceName); old != nil {
|
||||
logger.Printf("service is changed : %v", data.Service)
|
||||
|
||||
atomic.SwapPointer(&old.divisionsSerialized, data.Service.divisionsSerialized)
|
||||
atomic.SwapPointer(&old.apiUsers, data.Service.apiUsers)
|
||||
atomic.SwapPointer(&old.serviceSerialized, data.Service.serviceSerialized)
|
||||
|
||||
for _, token := range old.ServerApiTokens {
|
||||
mg.apiTokenToService.remove(token.Hex())
|
||||
}
|
||||
|
||||
for _, token := range data.Service.ServerApiTokens {
|
||||
mg.apiTokenToService.add(token.Hex(), data.Service.ServiceCode)
|
||||
}
|
||||
|
||||
if data.Service.UseWhitelist {
|
||||
atomic.StoreInt32(&old.wl.working, 1)
|
||||
} else {
|
||||
atomic.StoreInt32(&old.wl.working, 0)
|
||||
}
|
||||
|
||||
old.Closed = data.Service.Closed
|
||||
if old.Closed {
|
||||
atomic.StoreInt32(&old.closed, 1)
|
||||
} else {
|
||||
atomic.StoreInt32(&old.closed, 0)
|
||||
}
|
||||
atomic.SwapPointer(&old.wl.emailptr, data.Service.wl.emailptr)
|
||||
|
||||
old.Divisions = data.Service.Divisions
|
||||
} else if !data.Service.Closed {
|
||||
if err := data.Service.prepare(mg); err != nil {
|
||||
logger.Error("service cannot be prepared :", data.Service, err)
|
||||
} else {
|
||||
logger.Println("service is on the board! :", data.Service)
|
||||
mg.services.add(data.Service)
|
||||
serveMux.Handle(common.MakeHttpHandlerPattern(prefix, data.Service.ServiceCode, "/"), data.Service)
|
||||
}
|
||||
}
|
||||
case "delete":
|
||||
if deleted := mg.services.remove(data.DocumentKey.Id); deleted != nil {
|
||||
logger.Println("service is closed :", data.Service)
|
||||
atomic.AddInt32(&deleted.closed, 1)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.Error("watchServiceCollection stream.Decode failed :", err)
|
||||
}
|
||||
} else if stream.Err() != nil || stream.ID() == 0 {
|
||||
logger.Error("watchServiceCollection stream error :", stream.Err())
|
||||
stream.Close(ctx)
|
||||
stream = nil
|
||||
} else {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func watchAuthCollection(parentctx context.Context, ac *common.AuthCollection, mongoClient common.MongoClient) {
|
||||
defer func() {
|
||||
s := recover()
|
||||
if s != nil {
|
||||
logger.Error(s)
|
||||
}
|
||||
}()
|
||||
|
||||
matchStage := bson.D{
|
||||
{
|
||||
Key: "$match", Value: bson.D{
|
||||
{Key: "operationType", Value: bson.D{
|
||||
{Key: "$in", Value: bson.A{
|
||||
"delete",
|
||||
"insert",
|
||||
"update",
|
||||
}},
|
||||
}},
|
||||
},
|
||||
}}
|
||||
projectStage := bson.D{
|
||||
{
|
||||
Key: "$project", Value: bson.D{
|
||||
{Key: "documentKey", Value: 1},
|
||||
{Key: "operationType", Value: 1},
|
||||
{Key: "fullDocument", Value: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var stream *mongo.ChangeStream
|
||||
var err error
|
||||
var ctx context.Context
|
||||
|
||||
for {
|
||||
if stream == nil {
|
||||
stream, err = mongoClient.Watch(CollectionAuth, mongo.Pipeline{matchStage, projectStage})
|
||||
if err != nil {
|
||||
logger.Error("watchAuthCollection watch failed :", err)
|
||||
time.Sleep(time.Minute)
|
||||
continue
|
||||
}
|
||||
ctx = context.TODO()
|
||||
}
|
||||
|
||||
changed := stream.TryNext(ctx)
|
||||
if ctx.Err() != nil {
|
||||
logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
||||
break
|
||||
}
|
||||
|
||||
if changed {
|
||||
var data authPipelineDocument
|
||||
if err := stream.Decode(&data); err == nil {
|
||||
ot := data.OperationType
|
||||
switch ot {
|
||||
case "insert":
|
||||
ac.AddRaw(&mongoAuthCell{src: data.Authinfo})
|
||||
case "update":
|
||||
ac.AddRaw(&mongoAuthCell{src: data.Authinfo})
|
||||
case "delete":
|
||||
ac.RemoveByAccId(data.DocumentKey.Id)
|
||||
}
|
||||
} else {
|
||||
logger.Error("watchAuthCollection stream.Decode failed :", err)
|
||||
}
|
||||
} else if stream.Err() != nil || stream.ID() == 0 {
|
||||
logger.Error("watchAuthCollection stream error :", stream.Err())
|
||||
stream.Close(ctx)
|
||||
stream = nil
|
||||
} else {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user