파일 업로드 추가 / 화이트리스트에 권한 종류 추가
This commit is contained in:
@ -31,6 +31,10 @@ type servicePipelineDocument struct {
|
||||
Service *serviceDescription `bson:"fullDocument"`
|
||||
}
|
||||
|
||||
type filePipelineDocument struct {
|
||||
File *fileDocumentDesc `bson:"fullDocument"`
|
||||
}
|
||||
|
||||
type whilelistPipelineDocument struct {
|
||||
OperationType string `bson:"operationType"`
|
||||
DocumentKey struct {
|
||||
@ -123,6 +127,70 @@ func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (mg *Maingate) watchFileCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) {
|
||||
defer func() {
|
||||
s := recover()
|
||||
if s != nil {
|
||||
logger.Error(s)
|
||||
}
|
||||
}()
|
||||
|
||||
matchStage := bson.D{
|
||||
{
|
||||
Key: "$match", Value: bson.D{
|
||||
{Key: "operationType", Value: bson.D{
|
||||
{Key: "$in", Value: bson.A{
|
||||
"insert",
|
||||
}},
|
||||
}},
|
||||
},
|
||||
}}
|
||||
projectStage := bson.D{
|
||||
{
|
||||
Key: "$project", Value: bson.D{
|
||||
{Key: "fullDocument", Value: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
var stream *mongo.ChangeStream
|
||||
var err error
|
||||
var ctx context.Context
|
||||
|
||||
for {
|
||||
if stream == nil {
|
||||
stream, err = mg.mongoClient.Watch(CollectionFile, mongo.Pipeline{matchStage, projectStage}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
|
||||
if err != nil {
|
||||
logger.Error("watchFileCollection watch failed :", err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
ctx = context.TODO()
|
||||
}
|
||||
|
||||
changed := stream.TryNext(ctx)
|
||||
if ctx.Err() != nil {
|
||||
logger.Error("watchFileCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
||||
break
|
||||
}
|
||||
|
||||
if !changed {
|
||||
if stream.Err() != nil || stream.ID() == 0 {
|
||||
logger.Error("watchServiceCollection stream error :", stream.Err())
|
||||
stream.Close(ctx)
|
||||
stream = nil
|
||||
} else {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
var data filePipelineDocument
|
||||
if err := stream.Decode(&data); err == nil {
|
||||
data.File.save()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux *http.ServeMux, prefix string) {
|
||||
defer func() {
|
||||
s := recover()
|
||||
|
||||
Reference in New Issue
Block a user