diff --git a/mongo.go b/mongo.go index 2a016c9..31a0b15 100644 --- a/mongo.go +++ b/mongo.go @@ -154,10 +154,36 @@ func (mc *MongoClient) DropIndex(coll CollectionName, name string) error { } func (mc *MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) { + // mc.db.RunCommand() if len(opts) == 0 { opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)} } - return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...) + + stream, err := mc.Collection(coll).Watch(mc.ctx, pipeline, opts...) + if err != nil { + if mongoErr, ok := err.(mongo.CommandError); ok { + logger.Println("MongoClient Watch return err code :", mongoErr, mongoErr.Code) + if mongoErr.Code == 40573 { + adminDb := mc.db.Client().Database("admin") + result := adminDb.RunCommand(mc.ctx, bson.D{ + {Key: "modifyChangeStreams", Value: 1}, + {Key: "database", Value: mc.db.Name()}, + {Key: "collection", Value: coll}, + {Key: "enable", Value: true}, + }) + + if result.Err() != nil { + logger.Println("mc.db.RunCommand failed :", result.Err(), mc.db.Name(), coll) + } else { + return mc.Collection(coll).Watch(mc.ctx, pipeline, opts...) + } + } + } + + logger.Fatal(err) + } + + return stream, err } func (mc *MongoClient) Collection(collname CollectionName) *mongo.Collection {