package session import ( "context" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" ) type sessionMongo struct { *Authorization `bson:",inline"` Key string `bson:"key"` Ts primitive.DateTime `bson:"_ts"` } type consumer_mongo struct { consumer_common[sessionMongo] ids map[primitive.ObjectID]string mongoClient gocommon.MongoClient ttl time.Duration } type sessionPipelineDocument struct { OperationType string `bson:"operationType"` DocumentKey struct { Id primitive.ObjectID `bson:"_id"` } `bson:"documentKey"` Session sessionMongo `bson:"fullDocument"` } func NewConsumerWithMongo(ctx context.Context, mongoUrl string, dbname string, ttl time.Duration) (Consumer, error) { mc, err := gocommon.NewMongoClient(ctx, mongoUrl, dbname) if err != nil { return nil, err } consumer := &consumer_mongo{ consumer_common: consumer_common[sessionMongo]{ ttl: ttl, ctx: ctx, stages: [2]*cache_stage[sessionMongo]{make_cache_stage[sessionMongo](), make_cache_stage[sessionMongo]()}, startTime: time.Now(), }, ids: make(map[primitive.ObjectID]string), ttl: ttl, mongoClient: mc, } go func() { matchStage := bson.D{ { Key: "$match", Value: bson.D{ {Key: "operationType", Value: bson.D{ {Key: "$in", Value: bson.A{ "delete", "insert", "update", }}, }}, }, }} projectStage := bson.D{ { Key: "$project", Value: bson.D{ {Key: "documentKey", Value: 1}, {Key: "operationType", Value: 1}, {Key: "fullDocument", Value: 1}, }, }, } var stream *mongo.ChangeStream nextswitch := time.Now().Add(ttl) for { if stream == nil { stream, err = mc.Watch(session_collection_name, mongo.Pipeline{matchStage, projectStage}) if err != nil { logger.Error("watchAuthCollection watch failed :", err) time.Sleep(time.Minute) continue } } changed := stream.TryNext(ctx) if ctx.Err() != nil { logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } if changed { var data sessionPipelineDocument if err := stream.Decode(&data); err == nil { ot := data.OperationType switch ot { case "insert": consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session) case "update": consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session) case "delete": consumer.deleteById(data.DocumentKey.Id) } } else { logger.Error("watchAuthCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { select { case <-ctx.Done(): logger.Println("watchAuthCollection is done") stream.Close(ctx) return case <-time.After(time.Second): logger.Error("watchAuthCollection stream error :", stream.Err()) stream.Close(ctx) stream = nil } } else { time.Sleep(time.Second) } now := time.Now() for now.After(nextswitch) { consumer.changeStage() nextswitch = nextswitch.Add(ttl) } } }() return consumer, nil } func (c *consumer_mongo) Query(key string) *Authorization { c.lock.Lock() defer c.lock.Unlock() if _, deleted := c.stages[0].deleted[key]; deleted { return nil } if _, deleted := c.stages[1].deleted[key]; deleted { return nil } found, ok := c.stages[0].cache[key] if !ok { found, ok = c.stages[1].cache[key] } now := time.Now().UTC() if ok { if now.Before(found.Ts.Time().Add(c.ttl)) { return found.Authorization } } var si sessionMongo err := c.mongoClient.FindOneAs(session_collection_name, bson.M{ "key": key, }, &si) if err != nil { logger.Println("consumer Query :", err) return nil } if len(si.Key) > 0 { c.add_internal(key, si) return si.Authorization } return nil } func (c *consumer_mongo) Touch(key string) bool { c.lock.Lock() defer c.lock.Unlock() _, _, err := c.mongoClient.Update(session_collection_name, bson.M{ "key": key, }, bson.M{ "$currentDate": bson.M{ "_ts": bson.M{"$type": "date"}, }, }, options.Update().SetUpsert(false)) if err != nil { logger.Println("consumer Touch :", err) return false } return true } func (c *consumer_mongo) add(key string, id primitive.ObjectID, si sessionMongo) { c.lock.Lock() defer c.lock.Unlock() c.consumer_common.add_internal(key, si) c.ids[id] = key } func (c *consumer_mongo) deleteById(id primitive.ObjectID) { c.lock.Lock() defer c.lock.Unlock() if key, ok := c.ids[id]; ok { c.consumer_common.delete_internal(key) delete(c.ids, id) } }