package shared import ( "context" "errors" "flag" "os" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type MongoClient struct { db *mongo.Database c *mongo.Client } type ConnectionInfo struct { Url string Database string } var Devflag = flag.Bool("dev", false, "") type CollectionName string const ( CollectionCoupon = CollectionName("coupon") CollectionCouponUse = CollectionName("coupon_use") CollectionAccount = CollectionName("account") CollectionAuth = CollectionName("auth") CollectionMatch = CollectionName("match") ) func mongourl() string { v := os.Getenv("MONGO_URL") if len(v) > 0 { return v } return "mongodb://redis-dev.actionsquare.corp:27017/?replicaSet=repl01" } func NewMongoConnectionInfo() *ConnectionInfo { if !flag.Parsed() { flag.Parse() } dbname := "anvil" if *Devflag { dbname, _ = os.Hostname() } return &ConnectionInfo{ Url: mongourl(), Database: dbname, } } func (ci *ConnectionInfo) SetURL(url string) *ConnectionInfo { ci.Url = url return ci } func (ci *ConnectionInfo) SetDatabase(dbname string) *ConnectionInfo { ci.Database = dbname return ci } func NewMongoClient(ci *ConnectionInfo) (MongoClient, error) { if len(ci.Url) == 0 { return MongoClient{}, errors.New("mongo connection string is empty") } client, err := mongo.NewClient(options.Client().ApplyURI(ci.Url)) if err != nil { return MongoClient{}, err } ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() err = client.Connect(ctx) if err != nil { return MongoClient{}, err } err = client.Ping(ctx, nil) if err != nil { return MongoClient{}, err } anvildb := client.Database(ci.Database, nil) makeExpiredIndex := func(collname CollectionName, expireSeconds int32) error { matchcoll := anvildb.Collection(string(collname)) indices, err := matchcoll.Indexes().List(ctx, options.ListIndexes().SetMaxTime(time.Second)) if err != nil { return err } allindices := make([]interface{}, 0) err = indices.All(context.Background(), &allindices) if err != nil { return err } tsfound := false var tsname string var exp int32 for _, index := range allindices { d := index.(bson.D) key := d.Map()["key"].(bson.D) for _, kd := range key { if kd.Key == "_ts" { tsfound = true } } if v, ok := d.Map()["name"]; ok { tsname = v.(string) } if v, ok := d.Map()["expireAfterSeconds"]; ok { exp = v.(int32) } } if tsfound { if exp == expireSeconds { return nil } _, err = matchcoll.Indexes().DropOne(ctx, tsname) if err != nil { return err } } mod := mongo.IndexModel{ Keys: primitive.M{"_ts": 1}, Options: options.Index().SetExpireAfterSeconds(expireSeconds), } _, err = matchcoll.Indexes().CreateOne(ctx, mod) return err } if err = makeExpiredIndex(CollectionMatch, 30); err != nil { return MongoClient{}, err } if err = makeExpiredIndex(CollectionAuth, 300); err != nil { return MongoClient{}, err } return MongoClient{c: client, db: anvildb}, nil } func (mc MongoClient) Close() { if mc.c != nil { mc.c.Disconnect(context.Background()) } } func (mc MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline) (*mongo.ChangeStream, error) { return mc.Collection(coll).Watch(context.Background(), pipeline, options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)) } func (mc MongoClient) Collection(collname CollectionName) *mongo.Collection { return mc.db.Collection(string(collname)) } func (mc MongoClient) All(coll CollectionName, opts ...*options.FindOptions) ([]bson.M, error) { cursor, err := mc.Collection(coll).Find(context.Background(), bson.D{}, opts...) if err != nil { return nil, err } var all []bson.M err = cursor.All(context.Background(), &all) if err != nil { return nil, err } return all, nil } func (mc MongoClient) FindOneAndDelete(coll CollectionName, filter bson.M) (bson.M, error) { result := mc.Collection(coll).FindOneAndDelete(context.Background(), filter) err := result.Err() if err != nil { if err == mongo.ErrNoDocuments { return nil, nil } return nil, err } tmp := make(map[string]interface{}) err = result.Decode(&tmp) if err != nil { return nil, err } return bson.M(tmp), nil } func (mc MongoClient) Delete(coll CollectionName, filter bson.M) (bool, error) { r, err := mc.Collection(coll).DeleteOne(context.Background(), filter) if err != nil { return false, err } return r.DeletedCount > 0, nil } func (mc MongoClient) UnsetField(coll CollectionName, filter bson.M, doc bson.M) error { _, err := mc.Collection(coll).UpdateOne(context.Background(), filter, bson.M{ "$unset": doc, }) return err } func (mc MongoClient) DeleteMany(coll CollectionName, filters bson.M, opts ...*options.DeleteOptions) (int, error) { result, err := mc.Collection(coll).DeleteMany(context.Background(), filters, opts...) if err != nil { return 0, err } return int(result.DeletedCount), nil } func (mc MongoClient) InsertMany(coll CollectionName, documents []interface{}, opts ...*options.InsertManyOptions) (int, error) { result, err := mc.Collection(coll).InsertMany(context.Background(), documents, opts...) if err != nil { return 0, err } return len(result.InsertedIDs), nil } func (mc MongoClient) UpdateMany(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.UpdateOptions) (count int, err error) { result, e := mc.Collection(coll).UpdateMany(context.Background(), filter, doc, opts...) if e != nil { return 0, e } err = nil count = int(result.UpsertedCount + result.ModifiedCount) return } func (mc MongoClient) Update(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.UpdateOptions) (worked bool, newid interface{}, err error) { result, e := mc.Collection(coll).UpdateOne(context.Background(), filter, doc, opts...) if e != nil { return false, "", e } err = nil worked = result.UpsertedCount > 0 || result.ModifiedCount > 0 newid = result.UpsertedID return } func (mc MongoClient) UpsertOne(coll CollectionName, filter bson.M, doc bson.M) (worked bool, newid interface{}, err error) { return mc.Update(coll, filter, bson.M{ "$set": doc, }, options.Update().SetUpsert(true)) } func (mc MongoClient) FindOne(coll CollectionName, filter bson.M, opts ...*options.FindOneOptions) (doc bson.M, err error) { result := mc.Collection(coll).FindOne(context.Background(), filter, opts...) tmp := make(map[string]interface{}) err = result.Decode(&tmp) if err == nil { doc = bson.M(tmp) } else if err == mongo.ErrNoDocuments { err = nil } return } func (mc MongoClient) FindOneAndUpdate(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.FindOneAndUpdateOptions) (olddoc bson.M, err error) { result := mc.Collection(coll).FindOneAndUpdate(context.Background(), filter, doc, opts...) tmp := make(map[string]interface{}) err = result.Decode(&tmp) if err == nil { olddoc = bson.M(tmp) } else if err == mongo.ErrNoDocuments { err = nil } return } func (mc MongoClient) Exists(coll CollectionName, filter bson.M) (bool, error) { cnt, err := mc.Collection(coll).CountDocuments(context.Background(), filter, options.Count().SetLimit(1)) if err != nil { return false, err } return cnt > 0, nil } func (mc MongoClient) FindAll(coll CollectionName, filter bson.M, opts ...*options.FindOptions) ([]bson.M, error) { cursor, err := mc.Collection(coll).Find(context.Background(), filter, opts...) if err != nil { return nil, err } output := make([]interface{}, 0) err = cursor.All(context.Background(), &output) if err != nil { return nil, err } docs := make([]bson.M, 0, len(output)) for _, doc := range output { one := make(bson.M) for _, kv := range doc.(bson.D) { one[kv.Key] = kv.Value } docs = append(docs, one) } return docs, nil }