From 66a191f4944ffa09289a482e77d7eb0ddaa6b4be Mon Sep 17 00:00:00 2001 From: mountain Date: Wed, 30 Aug 2023 15:43:26 +0900 Subject: [PATCH] =?UTF-8?q?mongodb=EC=9A=A9=20=EC=84=B8=EC=85=98=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- session/common.go | 8 +- session/consumer_common.go | 78 ++++++++ session/consumer_mongo.go | 214 +++++++++++++++++++++ session/{consumer.go => consumer_redis.go} | 100 +++------- session/provider.go | 61 +++++- session/session_test.go | 44 +++-- 6 files changed, 405 insertions(+), 100 deletions(-) create mode 100644 session/consumer_common.go create mode 100644 session/consumer_mongo.go rename session/{consumer.go => consumer_redis.go} (57%) diff --git a/session/common.go b/session/common.go index da279cf..9dee8e9 100644 --- a/session/common.go +++ b/session/common.go @@ -1,11 +1,17 @@ package session +import ( + "go.mongodb.org/mongo-driver/bson/primitive" + "repositories.action2quare.com/ayo/gocommon" +) + const ( communication_channel_name_prefix = "_sess_comm_chan_name" + session_collection_name = gocommon.CollectionName("session") ) type Authorization struct { - Account string `bson:"a" json:"a"` + Account primitive.ObjectID `bson:"a" json:"a"` // by authorization provider Platform string `bson:"p" json:"p"` diff --git a/session/consumer_common.go b/session/consumer_common.go new file mode 100644 index 0000000..5d6f5f3 --- /dev/null +++ b/session/consumer_common.go @@ -0,0 +1,78 @@ +package session + +import ( + "context" + "sync" + "time" + + "repositories.action2quare.com/ayo/gocommon/logger" +) + +type cache_stage[T any] struct { + cache map[string]T + deleted map[string]bool +} + +func make_cache_stage[T any]() *cache_stage[T] { + return &cache_stage[T]{ + cache: make(map[string]T), + deleted: make(map[string]bool), + } +} + +type Consumer interface { + Query(key string) *Authorization + Touch(key string) bool +} + +type consumer_common[T any] struct { + lock sync.Mutex + ttl time.Duration + ctx context.Context + stages [2]*cache_stage[T] + startTime time.Time +} + +func (c *consumer_common[T]) add_internal(key string, si T) { + c.stages[0].cache[key] = si + delete(c.stages[0].deleted, key) + c.stages[1].cache[key] = si + delete(c.stages[1].deleted, key) + + logger.Printf("add : %v, %v\n", *c.stages[0], *c.stages[1]) +} + +func (c *consumer_common[T]) add(key string, si T) { + c.lock.Lock() + defer c.lock.Unlock() + + c.add_internal(key, si) +} + +func (c *consumer_common[T]) delete_internal(key string) { + delete(c.stages[0].cache, key) + c.stages[0].deleted[key] = true + delete(c.stages[1].cache, key) + c.stages[1].deleted[key] = true + + logger.Printf("delete : %v, %v\n", *c.stages[0], *c.stages[1]) +} + +func (c *consumer_common[T]) delete(key string) { + c.lock.Lock() + defer c.lock.Unlock() + + c.delete_internal(key) +} + +func (c *consumer_common[T]) changeStage() { + c.lock.Lock() + defer c.lock.Unlock() + + logger.Printf("changeStage : %v, %v\n", *c.stages[0], *c.stages[1]) + + c.stages[1] = c.stages[0] + c.stages[0] = make_cache_stage[T]() + + logger.Printf("---> : %v, %v\n", *c.stages[0], *c.stages[1]) +} diff --git a/session/consumer_mongo.go b/session/consumer_mongo.go new file mode 100644 index 0000000..b34b1a3 --- /dev/null +++ b/session/consumer_mongo.go @@ -0,0 +1,214 @@ +package session + +import ( + "context" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "repositories.action2quare.com/ayo/gocommon" + "repositories.action2quare.com/ayo/gocommon/logger" +) + +type sessionMongo struct { + *Authorization `bson:",inline"` + Key string `bson:"key"` + Ts primitive.DateTime `bson:"_ts"` +} + +type consumer_mongo struct { + consumer_common[sessionMongo] + ids map[primitive.ObjectID]string + mongoClient gocommon.MongoClient + ttl time.Duration +} + +type sessionPipelineDocument struct { + OperationType string `bson:"operationType"` + DocumentKey struct { + Id primitive.ObjectID `bson:"_id"` + } `bson:"documentKey"` + Session sessionMongo `bson:"fullDocument"` +} + +func NewConsumerWithMongo(ctx context.Context, mongoUrl string, dbname string, ttl time.Duration) (Consumer, error) { + mc, err := gocommon.NewMongoClient(ctx, mongoUrl, dbname) + if err != nil { + return nil, err + } + + consumer := &consumer_mongo{ + consumer_common: consumer_common[sessionMongo]{ + ttl: ttl, + ctx: ctx, + stages: [2]*cache_stage[sessionMongo]{make_cache_stage[sessionMongo](), make_cache_stage[sessionMongo]()}, + startTime: time.Now(), + }, + ids: make(map[primitive.ObjectID]string), + ttl: ttl, + mongoClient: mc, + } + + go func() { + matchStage := bson.D{ + { + Key: "$match", Value: bson.D{ + {Key: "operationType", Value: bson.D{ + {Key: "$in", Value: bson.A{ + "delete", + "insert", + "update", + }}, + }}, + }, + }} + projectStage := bson.D{ + { + Key: "$project", Value: bson.D{ + {Key: "documentKey", Value: 1}, + {Key: "operationType", Value: 1}, + {Key: "fullDocument", Value: 1}, + }, + }, + } + + var stream *mongo.ChangeStream + nextswitch := time.Now().Add(ttl) + for { + if stream == nil { + stream, err = mc.Watch(session_collection_name, mongo.Pipeline{matchStage, projectStage}) + if err != nil { + logger.Error("watchAuthCollection watch failed :", err) + time.Sleep(time.Minute) + continue + } + } + + changed := stream.TryNext(ctx) + if ctx.Err() != nil { + logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) + break + } + + if changed { + var data sessionPipelineDocument + if err := stream.Decode(&data); err == nil { + ot := data.OperationType + switch ot { + case "insert": + consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session) + case "update": + consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session) + case "delete": + consumer.deleteById(data.DocumentKey.Id) + } + } else { + logger.Error("watchAuthCollection stream.Decode failed :", err) + } + } else if stream.Err() != nil || stream.ID() == 0 { + select { + case <-ctx.Done(): + logger.Println("watchAuthCollection is done") + stream.Close(ctx) + return + + case <-time.After(time.Second): + logger.Error("watchAuthCollection stream error :", stream.Err()) + stream.Close(ctx) + stream = nil + } + } else { + time.Sleep(time.Second) + } + + now := time.Now() + for now.After(nextswitch) { + consumer.changeStage() + nextswitch = nextswitch.Add(ttl) + } + } + }() + + return consumer, nil +} + +func (c *consumer_mongo) Query(key string) *Authorization { + c.lock.Lock() + defer c.lock.Unlock() + + if _, deleted := c.stages[0].deleted[key]; deleted { + return nil + } + + if _, deleted := c.stages[1].deleted[key]; deleted { + return nil + } + + found, ok := c.stages[0].cache[key] + if !ok { + found, ok = c.stages[1].cache[key] + } + + now := time.Now().UTC() + if ok { + if now.Before(found.Ts.Time().Add(c.ttl)) { + return found.Authorization + } + } + + var si sessionMongo + err := c.mongoClient.FindOneAs(session_collection_name, bson.M{ + "key": key, + }, &si) + + if err != nil { + logger.Println("consumer Query :", err) + return nil + } + + if len(si.Key) > 0 { + c.add_internal(key, si) + return si.Authorization + } + return nil +} + +func (c *consumer_mongo) Touch(key string) bool { + c.lock.Lock() + defer c.lock.Unlock() + + _, _, err := c.mongoClient.Update(session_collection_name, bson.M{ + "key": key, + }, bson.M{ + "$currentDate": bson.M{ + "_ts": bson.M{"$type": "date"}, + }, + }, options.Update().SetUpsert(false)) + + if err != nil { + logger.Println("consumer Touch :", err) + return false + } + + return true +} + +func (c *consumer_mongo) add(key string, id primitive.ObjectID, si sessionMongo) { + c.lock.Lock() + defer c.lock.Unlock() + + c.consumer_common.add_internal(key, si) + c.ids[id] = key +} + +func (c *consumer_mongo) deleteById(id primitive.ObjectID) { + c.lock.Lock() + defer c.lock.Unlock() + + if key, ok := c.ids[id]; ok { + c.consumer_common.delete_internal(key) + delete(c.ids, id) + } +} diff --git a/session/consumer.go b/session/consumer_redis.go similarity index 57% rename from session/consumer.go rename to session/consumer_redis.go index faec072..e77de0c 100644 --- a/session/consumer.go +++ b/session/consumer_redis.go @@ -2,7 +2,6 @@ package session import ( "context" - "sync" "time" "github.com/go-redis/redis/v8" @@ -11,44 +10,30 @@ import ( "repositories.action2quare.com/ayo/gocommon/logger" ) -type innerSession struct { - inner *Authorization +type sessionRedis struct { + *Authorization expireAt time.Time } -type cache_stage struct { - cache map[string]innerSession - deleted map[string]bool -} - -func make_cache_stage() *cache_stage { - return &cache_stage{ - cache: make(map[string]innerSession), - deleted: make(map[string]bool), - } -} - -type Consumer struct { - lock sync.Mutex +type consumer_redis struct { + consumer_common[sessionRedis] redisClient *redis.Client - ttl time.Duration - ctx context.Context - stages [2]*cache_stage - startTime time.Time } -func NewConsumer(ctx context.Context, redisUrl string, ttl time.Duration) (*Consumer, error) { +func NewConsumerWithRedis(ctx context.Context, redisUrl string, ttl time.Duration) (Consumer, error) { redisClient, err := gocommon.NewRedisClient(redisUrl) if err != nil { return nil, err } - consumer := &Consumer{ + consumer := &consumer_redis{ + consumer_common: consumer_common[sessionRedis]{ + ttl: ttl, + ctx: ctx, + stages: [2]*cache_stage[sessionRedis]{make_cache_stage[sessionRedis](), make_cache_stage[sessionRedis]()}, + startTime: time.Now(), + }, redisClient: redisClient, - ttl: ttl, - ctx: ctx, - stages: [2]*cache_stage{make_cache_stage(), make_cache_stage()}, - startTime: time.Now(), } updateChannel := communication_channel_name_prefix + "_u" @@ -88,7 +73,10 @@ func NewConsumer(ctx context.Context, redisUrl string, ttl time.Duration) (*Cons } else if len(raw) > 0 { var si Authorization if bson.Unmarshal([]byte(raw), &si) == nil { - consumer.add(key, &si) + consumer.add(key, sessionRedis{ + Authorization: &si, + expireAt: time.Now().Add(consumer.ttl), + }) } } case deleteChannel: @@ -102,52 +90,7 @@ func NewConsumer(ctx context.Context, redisUrl string, ttl time.Duration) (*Cons return consumer, nil } -func (c *Consumer) add_internal(key string, si *Authorization, ttl time.Duration) { - inner := innerSession{ - inner: si, - expireAt: time.Now().Add(ttl), - } - - c.stages[0].cache[key] = inner - delete(c.stages[0].deleted, key) - c.stages[1].cache[key] = inner - delete(c.stages[1].deleted, key) - - logger.Printf("add : %v, %v\n", *c.stages[0], *c.stages[1]) -} - -func (c *Consumer) add(key string, si *Authorization) { - c.lock.Lock() - defer c.lock.Unlock() - - c.add_internal(key, si, c.ttl) -} - -func (c *Consumer) delete(key string) { - c.lock.Lock() - defer c.lock.Unlock() - - delete(c.stages[0].cache, key) - c.stages[0].deleted[key] = true - delete(c.stages[1].cache, key) - c.stages[1].deleted[key] = true - - logger.Printf("delete : %v, %v\n", *c.stages[0], *c.stages[1]) -} - -func (c *Consumer) changeStage() { - c.lock.Lock() - defer c.lock.Unlock() - - logger.Printf("changeStage : %v, %v\n", *c.stages[0], *c.stages[1]) - - c.stages[1] = c.stages[0] - c.stages[0] = make_cache_stage() - - logger.Printf("---> : %v, %v\n", *c.stages[0], *c.stages[1]) -} - -func (c *Consumer) Query(key string) *Authorization { +func (c *consumer_redis) Query(key string) *Authorization { c.lock.Lock() defer c.lock.Unlock() @@ -166,7 +109,7 @@ func (c *Consumer) Query(key string) *Authorization { if ok { if found.expireAt.After(time.Now()) { - return found.inner + return found.Authorization } } @@ -187,14 +130,17 @@ func (c *Consumer) Query(key string) *Authorization { return nil } - c.add_internal(key, &si, ttl) + c.add_internal(key, sessionRedis{ + Authorization: &si, + expireAt: time.Now().Add(ttl), + }) return &si } } return nil } -func (c *Consumer) Touch(key string) bool { +func (c *consumer_redis) Touch(key string) bool { c.lock.Lock() defer c.lock.Unlock() diff --git a/session/provider.go b/session/provider.go index 3ea2375..7946153 100644 --- a/session/provider.go +++ b/session/provider.go @@ -6,10 +6,16 @@ import ( "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" "repositories.action2quare.com/ayo/gocommon" ) -type Provider struct { +type Provider interface { + Update(key string, input *Authorization) error + Delete(key string) error +} + +type provider_redis struct { redisClient *redis.Client updateChannel string deleteChannel string @@ -17,13 +23,38 @@ type Provider struct { ctx context.Context } -func NewProvider(ctx context.Context, redisUrl string, ttl time.Duration) (*Provider, error) { +type provider_mongo struct { + mongoClient gocommon.MongoClient +} + +func NewProviderWithMongo(ctx context.Context, mongoUrl string, dbname string, ttl time.Duration) (Provider, error) { + mc, err := gocommon.NewMongoClient(ctx, mongoUrl, dbname) + if err != nil { + return nil, err + } + + if err = mc.MakeUniqueIndices(session_collection_name, map[string]bson.D{ + "key": {{Key: "key", Value: 1}}, + }); err != nil { + return nil, err + } + + if err := mc.MakeExpireIndex(session_collection_name, int32(ttl.Seconds())); err != nil { + return nil, err + } + + return &provider_mongo{ + mongoClient: mc, + }, nil +} + +func NewProviderWithRedis(ctx context.Context, redisUrl string, ttl time.Duration) (Provider, error) { redisClient, err := gocommon.NewRedisClient(redisUrl) if err != nil { return nil, err } - return &Provider{ + return &provider_redis{ redisClient: redisClient, updateChannel: communication_channel_name_prefix + "_u", deleteChannel: communication_channel_name_prefix + "_d", @@ -32,7 +63,7 @@ func NewProvider(ctx context.Context, redisUrl string, ttl time.Duration) (*Prov }, nil } -func (p *Provider) Update(key string, input *Authorization) error { +func (p *provider_redis) Update(key string, input *Authorization) error { bt, err := bson.Marshal(input) if err != nil { return err @@ -47,7 +78,7 @@ func (p *Provider) Update(key string, input *Authorization) error { return err } -func (p *Provider) Delete(key string) error { +func (p *provider_redis) Delete(key string) error { cnt, err := p.redisClient.Del(p.ctx, key).Result() if err != nil { return err @@ -59,3 +90,23 @@ func (p *Provider) Delete(key string) error { return err } + +func (p *provider_mongo) Update(key string, input *Authorization) error { + _, _, err := p.mongoClient.Update(session_collection_name, bson.M{ + "key": key, + }, bson.M{ + "$set": input, + "$currentDate": bson.M{ + "_ts": bson.M{"$type": "date"}, + }, + }, options.Update().SetUpsert(true)) + + return err +} + +func (p *provider_mongo) Delete(key string) error { + _, err := p.mongoClient.Delete(session_collection_name, bson.M{ + "key": key, + }) + return err +} diff --git a/session/session_test.go b/session/session_test.go index a0cd0ef..60f2c60 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -11,58 +11,68 @@ import ( ) func TestExpTable(t *testing.T) { - pv, err := NewProvider(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second) + // pv, err := NewProviderWithRedis(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second) + // if err != nil { + // t.Error(err) + // } + + // cs, err := NewConsumerWithRedis(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second) + // if err != nil { + // t.Error(err) + // } + + pv, err := NewProviderWithMongo(context.Background(), "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", "maingate", 10*time.Second) if err != nil { t.Error(err) } - cs, err := NewConsumer(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second) + cs, err := NewConsumerWithMongo(context.Background(), "mongodb://192.168.8.94:27017/?replicaSet=repl01&retrywrites=false", "maingate", 10*time.Second) if err != nil { t.Error(err) } - newid1 := primitive.NewObjectID() - newid2 := primitive.NewObjectID() + sk1 := primitive.NewObjectID().Hex() + sk2 := primitive.NewObjectID().Hex() go func() { for { - logger.Println("query :", cs.Query(newid1.Hex())) - logger.Println("query :", cs.Query(newid2.Hex())) + logger.Println("query :", cs.Query(sk1)) + logger.Println("query :", cs.Query(sk2)) time.Sleep(time.Second) } }() time.Sleep(2 * time.Second) - pv.Update(newid1.Hex(), &Authorization{ - Account: newid1.Hex(), + pv.Update(sk1, &Authorization{ + Account: primitive.NewObjectID(), Platform: "editor", Uid: "uid-1", }) time.Sleep(2 * time.Second) - pv.Update(newid2.Hex(), &Authorization{ - Account: newid2.Hex(), + pv.Update(sk2, &Authorization{ + Account: primitive.NewObjectID(), Platform: "editor", Uid: "uid-2", }) - cs.Touch(newid1.Hex()) + cs.Touch(sk1) time.Sleep(2 * time.Second) - cs.Touch(newid2.Hex()) + cs.Touch(sk2) time.Sleep(2 * time.Second) time.Sleep(2 * time.Second) - pv.Delete(newid1.Hex()) + pv.Delete(sk1) - cs.Touch(newid1.Hex()) + cs.Touch(sk1) time.Sleep(2 * time.Second) - cs.Touch(newid2.Hex()) + cs.Touch(sk2) time.Sleep(2 * time.Second) - cs2, err := NewConsumer(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second) + cs2, err := NewConsumerWithRedis(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second) if err != nil { t.Error(err) } - logger.Println("queryf :", cs2.Query(newid2.Hex())) + logger.Println("queryf :", cs2.Query(sk2)) time.Sleep(20 * time.Second) }