go-ayo/common을 gocommon으로 분리
This commit is contained in:
485
mongo.go
Normal file
485
mongo.go
Normal file
@ -0,0 +1,485 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/event"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.mongodb.org/mongo-driver/mongo/readpref"
|
||||
)
|
||||
|
||||
type MongoClient struct {
|
||||
db *mongo.Database
|
||||
c *mongo.Client
|
||||
}
|
||||
|
||||
type ConnectionInfo struct {
|
||||
Url string
|
||||
Database string
|
||||
}
|
||||
|
||||
var Devflag = flag.Bool("dev", false, "")
|
||||
|
||||
type CollectionName string
|
||||
|
||||
func ParseObjectID(hexstr string) (out primitive.ObjectID) {
|
||||
out, _ = primitive.ObjectIDFromHex(hexstr)
|
||||
return
|
||||
}
|
||||
|
||||
func NewMongoConnectionInfo(url string, dbname string) *ConnectionInfo {
|
||||
if len(dbname) == 0 {
|
||||
panic("dbname is empty")
|
||||
}
|
||||
|
||||
if *Devflag {
|
||||
hostname, _ := os.Hostname()
|
||||
dbname = hostname + "-" + dbname
|
||||
}
|
||||
|
||||
return &ConnectionInfo{
|
||||
Url: url,
|
||||
Database: dbname,
|
||||
}
|
||||
}
|
||||
|
||||
func (ci *ConnectionInfo) SetURL(url string) *ConnectionInfo {
|
||||
ci.Url = url
|
||||
return ci
|
||||
}
|
||||
|
||||
func (ci *ConnectionInfo) SetDatabase(dbname string) *ConnectionInfo {
|
||||
ci.Database = dbname
|
||||
return ci
|
||||
}
|
||||
|
||||
func NewMongoClient(ctx context.Context, url string, dbname string) (MongoClient, error) {
|
||||
return newMongoClient(ctx, NewMongoConnectionInfo(url, dbname))
|
||||
}
|
||||
|
||||
func newMongoClient(ctx context.Context, ci *ConnectionInfo) (MongoClient, error) {
|
||||
if len(ci.Url) == 0 {
|
||||
return MongoClient{}, errors.New("mongo connection string is empty")
|
||||
}
|
||||
|
||||
secondaryPref := readpref.SecondaryPreferred()
|
||||
//client, err := mongo.NewClient(options.Client().ApplyURI(ci.Url).SetReadPreference(secondaryPref))
|
||||
client, err := mongo.NewClient(options.Client().ApplyURI(ci.Url).SetReadPreference(secondaryPref).SetServerMonitor(&event.ServerMonitor{
|
||||
ServerOpening: func(evt *event.ServerOpeningEvent) {
|
||||
logger.Println("mongodb ServerOpening :", *evt)
|
||||
},
|
||||
ServerClosed: func(evt *event.ServerClosedEvent) {
|
||||
logger.Println("mongodb ServerClosed :", *evt)
|
||||
},
|
||||
TopologyOpening: func(evt *event.TopologyOpeningEvent) {
|
||||
logger.Println("mongodb TopologyOpening :", *evt)
|
||||
},
|
||||
TopologyClosed: func(evt *event.TopologyClosedEvent) {
|
||||
logger.Println("mongodb TopologyClosed :", *evt)
|
||||
},
|
||||
}))
|
||||
if err != nil {
|
||||
return MongoClient{}, err
|
||||
}
|
||||
|
||||
err = client.Connect(ctx)
|
||||
if err != nil {
|
||||
return MongoClient{}, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
if err := client.Ping(ctx, nil); err != nil {
|
||||
logger.Error("mongo client ping err :", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(10 * time.Second):
|
||||
continue
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
mdb := client.Database(ci.Database, nil)
|
||||
return MongoClient{c: client, db: mdb}, nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) Connected() bool {
|
||||
return mc.db != nil && mc.c != nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) Close() {
|
||||
if mc.c != nil {
|
||||
mc.c.Disconnect(context.Background())
|
||||
}
|
||||
}
|
||||
|
||||
func (mc MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) {
|
||||
if len(opts) == 0 {
|
||||
opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)}
|
||||
}
|
||||
return mc.Collection(coll).Watch(context.Background(), pipeline, opts...)
|
||||
}
|
||||
|
||||
func (mc MongoClient) Collection(collname CollectionName) *mongo.Collection {
|
||||
return mc.db.Collection(string(collname))
|
||||
}
|
||||
|
||||
func (mc MongoClient) All(coll CollectionName, opts ...*options.FindOptions) ([]bson.M, error) {
|
||||
cursor, err := mc.Collection(coll).Find(context.Background(), bson.D{}, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cursor.Close(context.Background())
|
||||
|
||||
var all []bson.M
|
||||
err = cursor.All(context.Background(), &all)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return all, nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) FindOneAndDelete(coll CollectionName, filter bson.M, opts ...*options.FindOneAndDeleteOptions) (bson.M, error) {
|
||||
result := mc.Collection(coll).FindOneAndDelete(context.Background(), filter, opts...)
|
||||
err := result.Err()
|
||||
if err != nil {
|
||||
if err == mongo.ErrNoDocuments {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tmp := make(map[string]interface{})
|
||||
err = result.Decode(&tmp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return bson.M(tmp), nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) Delete(coll CollectionName, filter bson.M, opts ...*options.DeleteOptions) (bool, error) {
|
||||
r, err := mc.Collection(coll).DeleteOne(context.Background(), filter, opts...)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return r.DeletedCount > 0, nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) UnsetField(coll CollectionName, filter bson.M, doc bson.M) error {
|
||||
_, err := mc.Collection(coll).UpdateOne(context.Background(), filter, bson.M{
|
||||
"$unset": doc,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (mc MongoClient) DeleteMany(coll CollectionName, filters bson.D, opts ...*options.DeleteOptions) (int, error) {
|
||||
if len(filters) == 0 {
|
||||
// 큰일난다
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
result, err := mc.Collection(coll).DeleteMany(context.Background(), filters, opts...)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return int(result.DeletedCount), nil
|
||||
}
|
||||
|
||||
type CommandInsertMany[T any] struct {
|
||||
MongoClient
|
||||
Collection CollectionName
|
||||
Documents []T
|
||||
}
|
||||
|
||||
func (c *CommandInsertMany[T]) Exec(opts ...*options.InsertManyOptions) (int, error) {
|
||||
conv := make([]any, len(c.Documents))
|
||||
for i, v := range c.Documents {
|
||||
conv[i] = v
|
||||
}
|
||||
return c.InsertMany(c.Collection, conv, opts...)
|
||||
}
|
||||
|
||||
func (mc MongoClient) InsertMany(coll CollectionName, documents []interface{}, opts ...*options.InsertManyOptions) (int, error) {
|
||||
result, err := mc.Collection(coll).InsertMany(context.Background(), documents, opts...)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return len(result.InsertedIDs), nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) UpdateMany(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.UpdateOptions) (count int, err error) {
|
||||
result, e := mc.Collection(coll).UpdateMany(context.Background(), filter, doc, opts...)
|
||||
|
||||
if e != nil {
|
||||
return 0, e
|
||||
}
|
||||
|
||||
err = nil
|
||||
count = int(result.UpsertedCount + result.ModifiedCount)
|
||||
return
|
||||
}
|
||||
|
||||
type Marshaler interface {
|
||||
MarshalBSON() ([]byte, error)
|
||||
}
|
||||
|
||||
type JsonDefaultMashaller struct {
|
||||
doc *bson.M
|
||||
}
|
||||
|
||||
func (m *JsonDefaultMashaller) MarshalBSON() ([]byte, error) {
|
||||
return json.Marshal(m.doc)
|
||||
}
|
||||
|
||||
func (mc MongoClient) Update(coll CollectionName, filter bson.M, doc interface{}, opts ...*options.UpdateOptions) (worked bool, newid interface{}, err error) {
|
||||
result, e := mc.Collection(coll).UpdateOne(context.Background(), filter, doc, opts...)
|
||||
|
||||
if e != nil {
|
||||
return false, "", e
|
||||
}
|
||||
|
||||
err = nil
|
||||
worked = result.MatchedCount > 0 || result.UpsertedCount > 0 || result.ModifiedCount > 0
|
||||
newid = result.UpsertedID
|
||||
return
|
||||
}
|
||||
|
||||
func (mc MongoClient) UpsertOne(coll CollectionName, filter bson.M, doc interface{}) (worked bool, newid interface{}, err error) {
|
||||
return mc.Update(coll, filter, bson.M{
|
||||
"$set": doc,
|
||||
}, options.Update().SetUpsert(true))
|
||||
|
||||
// return mc.Update(coll, filter, &JsonDefaultMashaller{doc: &bson.M{
|
||||
// "$set": doc,
|
||||
// }}, options.Update().SetUpsert(true))
|
||||
}
|
||||
|
||||
func (mc MongoClient) FindOneAs(coll CollectionName, filter bson.M, out interface{}, opts ...*options.FindOneOptions) error {
|
||||
err := mc.Collection(coll).FindOne(context.Background(), filter, opts...).Decode(out)
|
||||
if err == mongo.ErrNoDocuments {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (mc MongoClient) FindOne(coll CollectionName, filter bson.M, opts ...*options.FindOneOptions) (doc bson.M, err error) {
|
||||
result := mc.Collection(coll).FindOne(context.Background(), filter, opts...)
|
||||
tmp := make(map[string]interface{})
|
||||
err = result.Decode(&tmp)
|
||||
if err == nil {
|
||||
doc = bson.M(tmp)
|
||||
} else if err == mongo.ErrNoDocuments {
|
||||
err = nil
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (mc MongoClient) FindOneAndUpdateAs(coll CollectionName, filter bson.M, doc bson.M, out interface{}, opts ...*options.FindOneAndUpdateOptions) error {
|
||||
result := mc.Collection(coll).FindOneAndUpdate(context.Background(), filter, doc, opts...)
|
||||
err := result.Decode(out)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err == mongo.ErrNoDocuments {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (mc MongoClient) FindOneAndUpdate(coll CollectionName, filter bson.M, doc bson.M, opts ...*options.FindOneAndUpdateOptions) (olddoc bson.M, err error) {
|
||||
result := mc.Collection(coll).FindOneAndUpdate(context.Background(), filter, doc, opts...)
|
||||
tmp := make(map[string]interface{})
|
||||
err = result.Decode(&tmp)
|
||||
if err == nil {
|
||||
olddoc = bson.M(tmp)
|
||||
} else if err == mongo.ErrNoDocuments {
|
||||
err = nil
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (mc MongoClient) Exists(coll CollectionName, filter bson.M) (bool, error) {
|
||||
cnt, err := mc.Collection(coll).CountDocuments(context.Background(), filter, options.Count().SetLimit(1))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return cnt > 0, nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) SearchText(coll CollectionName, text string, opts ...*options.FindOptions) ([]bson.M, error) {
|
||||
cursor, err := mc.Collection(coll).Find(context.Background(), bson.M{"$text": bson.M{"$search": text}}, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cursor.Close(context.Background())
|
||||
|
||||
var output []bson.M
|
||||
err = cursor.All(context.Background(), &output)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return output, nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) FindAll(coll CollectionName, filter bson.M, opts ...*options.FindOptions) ([]bson.M, error) {
|
||||
cursor, err := mc.Collection(coll).Find(context.Background(), filter, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cursor.Close(context.Background())
|
||||
|
||||
var output []bson.M
|
||||
err = cursor.All(context.Background(), &output)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return output, nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) FindAllAs(coll CollectionName, filter bson.M, output interface{}, opts ...*options.FindOptions) error {
|
||||
cursor, err := mc.Collection(coll).Find(context.Background(), filter, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cursor.Close(context.Background())
|
||||
|
||||
err = cursor.All(context.Background(), output)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) MakeExpireIndex(coll CollectionName, expireSeconds int32) error {
|
||||
matchcoll := mc.Collection(coll)
|
||||
indices, err := matchcoll.Indexes().List(context.Background(), options.ListIndexes().SetMaxTime(time.Second))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
allindices := make([]interface{}, 0)
|
||||
err = indices.All(context.Background(), &allindices)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tsfound := false
|
||||
var tsname string
|
||||
var exp int32
|
||||
|
||||
IndexSearchLabel:
|
||||
for _, index := range allindices {
|
||||
d := index.(bson.D)
|
||||
key := d.Map()["key"].(bson.D)
|
||||
for _, kd := range key {
|
||||
if kd.Key == "_ts" {
|
||||
tsfound = true
|
||||
|
||||
if v, ok := d.Map()["name"]; ok {
|
||||
tsname = v.(string)
|
||||
}
|
||||
if v, ok := d.Map()["expireAfterSeconds"]; ok {
|
||||
exp = v.(int32)
|
||||
}
|
||||
break IndexSearchLabel
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if tsfound {
|
||||
if exp == expireSeconds {
|
||||
return nil
|
||||
}
|
||||
_, err = matchcoll.Indexes().DropOne(context.Background(), tsname)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
mod := mongo.IndexModel{
|
||||
Keys: primitive.M{"_ts": 1},
|
||||
Options: options.Index().SetExpireAfterSeconds(expireSeconds),
|
||||
}
|
||||
|
||||
_, err = matchcoll.Indexes().CreateOne(context.Background(), mod)
|
||||
return err
|
||||
}
|
||||
|
||||
func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[string]bson.D, option *options.IndexOptions) error {
|
||||
collection := mc.Collection(coll)
|
||||
cursor, err := collection.Indexes().List(context.Background(), options.ListIndexes().SetMaxTime(time.Second))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cursor.Close(context.Background())
|
||||
|
||||
found := make(map[string]bool)
|
||||
for k := range indices {
|
||||
found[k] = false
|
||||
}
|
||||
|
||||
for cursor.TryNext(context.Background()) {
|
||||
rawval := cursor.Current
|
||||
name := rawval.Lookup("name").StringValue()
|
||||
if _, ok := indices[name]; ok {
|
||||
found[name] = true
|
||||
}
|
||||
}
|
||||
|
||||
for name, exist := range found {
|
||||
if !exist {
|
||||
v := indices[name]
|
||||
var mod mongo.IndexModel
|
||||
if len(v) == 1 {
|
||||
mod = mongo.IndexModel{
|
||||
Keys: primitive.M{v[0].Key: v[0].Value},
|
||||
Options: options.MergeIndexOptions(options.Index().SetName(name), option),
|
||||
}
|
||||
} else {
|
||||
mod = mongo.IndexModel{
|
||||
Keys: indices[name],
|
||||
Options: options.MergeIndexOptions(options.Index().SetName(name), option),
|
||||
}
|
||||
}
|
||||
|
||||
_, err = collection.Indexes().CreateOne(context.Background(), mod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) MakeUniqueIndices(coll CollectionName, indices map[string]bson.D) error {
|
||||
return mc.makeIndicesWithOption(coll, indices, options.Index().SetUnique(true))
|
||||
}
|
||||
|
||||
func (mc MongoClient) MakeIndices(coll CollectionName, indices map[string]bson.D) error {
|
||||
return mc.makeIndicesWithOption(coll, indices, options.Index())
|
||||
}
|
||||
Reference in New Issue
Block a user