package session import ( "context" "sync" "time" "github.com/go-redis/redis/v8" "go.mongodb.org/mongo-driver/bson" "repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon/logger" ) type innerSession[T any] struct { inner *T expireAt time.Time } type cache_stage[T any] struct { cache map[string]innerSession[T] deleted map[string]bool } func make_cache_stage[T any]() *cache_stage[T] { return &cache_stage[T]{ cache: make(map[string]innerSession[T]), deleted: make(map[string]bool), } } type Consumer[T any] struct { lock sync.Mutex redisClient *redis.Client ttl time.Duration ctx context.Context stages [2]*cache_stage[T] startTime time.Time } func NewConsumer[T any](ctx context.Context, redisUrl string, ttl time.Duration) (*Consumer[T], error) { redisClient, err := gocommon.NewRedisClient(redisUrl) if err != nil { return nil, err } consumer := &Consumer[T]{ redisClient: redisClient, ttl: ttl, ctx: ctx, stages: [2]*cache_stage[T]{make_cache_stage[T](), make_cache_stage[T]()}, startTime: time.Now(), } updateChannel := communication_channel_name_prefix + "_u" deleteChannel := communication_channel_name_prefix + "_d" sub := redisClient.Subscribe(ctx, updateChannel, deleteChannel) go func() { stageswitch := time.Now().Add(ttl) tickTimer := time.After(ttl) for { select { case <-ctx.Done(): return case <-tickTimer: consumer.changeStage() stageswitch = stageswitch.Add(ttl) tempttl := time.Until(stageswitch) tickTimer = time.After(tempttl) case msg := <-sub.Channel(): if msg == nil { return } if len(msg.Payload) == 0 { continue } switch msg.Channel { case updateChannel: key := msg.Payload raw, err := redisClient.Get(ctx, key).Result() if err != nil { logger.Println(err) } else if len(raw) > 0 { var si T if bson.Unmarshal([]byte(raw), &si) == nil { consumer.add(key, &si) } } case deleteChannel: key := msg.Payload consumer.delete(key) } } } }() return consumer, nil } func (c *Consumer[T]) add_internal(key string, si *T, ttl time.Duration) { inner := innerSession[T]{ inner: si, expireAt: time.Now().Add(ttl), } c.stages[0].cache[key] = inner delete(c.stages[0].deleted, key) c.stages[1].cache[key] = inner delete(c.stages[1].deleted, key) logger.Printf("add : %v, %v\n", *c.stages[0], *c.stages[1]) } func (c *Consumer[T]) add(key string, si *T) { c.lock.Lock() defer c.lock.Unlock() c.add_internal(key, si, c.ttl) } func (c *Consumer[T]) delete(key string) { c.lock.Lock() defer c.lock.Unlock() delete(c.stages[0].cache, key) c.stages[0].deleted[key] = true delete(c.stages[1].cache, key) c.stages[1].deleted[key] = true logger.Printf("delete : %v, %v\n", *c.stages[0], *c.stages[1]) } func (c *Consumer[T]) changeStage() { c.lock.Lock() defer c.lock.Unlock() logger.Printf("changeStage : %v, %v\n", *c.stages[0], *c.stages[1]) c.stages[1] = c.stages[0] c.stages[0] = make_cache_stage[T]() logger.Printf("---> : %v, %v\n", *c.stages[0], *c.stages[1]) } func (c *Consumer[T]) Query(key string) *T { c.lock.Lock() defer c.lock.Unlock() if _, deleted := c.stages[0].deleted[key]; deleted { return nil } if _, deleted := c.stages[1].deleted[key]; deleted { return nil } found, ok := c.stages[0].cache[key] if !ok { found, ok = c.stages[1].cache[key] } if ok { if found.expireAt.After(time.Now()) { return found.inner } } payload, err := c.redisClient.Get(c.ctx, key).Result() if err == redis.Nil { return nil } else if err != nil { logger.Println("consumer Query :", err) return nil } if len(payload) > 0 { var si T if bson.Unmarshal([]byte(payload), &si) == nil { ttl, err := c.redisClient.TTL(c.ctx, key).Result() if err != nil { logger.Println("consumer Query :", err) return nil } c.add_internal(key, &si, ttl) return &si } } return nil } func (c *Consumer[T]) Touch(key string) bool { c.lock.Lock() defer c.lock.Unlock() ok, err := c.redisClient.Expire(c.ctx, key, c.ttl).Result() if err == redis.Nil { return false } else if err != nil { logger.Println("consumer Touch :", err) return false } if ok { newexpire := time.Now().Add(c.ttl) found, ok := c.stages[0].cache[key] if ok { found.expireAt = newexpire } found, ok = c.stages[1].cache[key] if ok { found.expireAt = newexpire } } return ok }