Compare commits
121 Commits
kd-live
...
7e7d023252
| Author | SHA1 | Date | |
|---|---|---|---|
| 7e7d023252 | |||
| f7ff7ae13b | |||
| 59b0a975f7 | |||
| d672b5dd90 | |||
| 8adef2adb8 | |||
| 9688731b3e | |||
| 64dece4f7e | |||
| 343754ba7f | |||
| 6ad727c728 | |||
| 49dabb35f0 | |||
| aa7a9661b9 | |||
| c34cb485bd | |||
| a5d9809db7 | |||
| 6c5a82cc9b | |||
| 72a62b678f | |||
| 770356d830 | |||
| 81ce0dd9b6 | |||
| 2e48ff0ca4 | |||
| 60d0c19256 | |||
| ebd1a89a58 | |||
| 4fd7d303bf | |||
| 0f30243725 | |||
| 0fb806bfda | |||
| 9c3d761d97 | |||
| 06ca411e03 | |||
| dc54082a42 | |||
| 3f29e0be86 | |||
| 2cec794483 | |||
| 49a3722a7e | |||
| 1af5d72819 | |||
| 23231dc6d7 | |||
| 46ce5f0989 | |||
| 3603c0386b | |||
| 6cbf32c386 | |||
| eb54fa2e3a | |||
| a7ff64fdbd | |||
| db3b0c8d08 | |||
| 25fef67c07 | |||
| 7951814f12 | |||
| 3a3839a461 | |||
| 7489fa657a | |||
| 0762d9311b | |||
| 57464c6cf0 | |||
| 8877bf88c0 | |||
| def9494dd3 | |||
| 5117847740 | |||
| 84f56dfc50 | |||
| 821dc5c639 | |||
| 0eefb438a6 | |||
| d396a35713 | |||
| 26e968f7c9 | |||
| 68e40d580f | |||
| 9e9d91b5a3 | |||
| 90502c3029 | |||
| a011588509 | |||
| 319dbf5a54 | |||
| 888abef7c0 | |||
| 58de9a3f0c | |||
| 57b518562e | |||
| cde46e6a5f | |||
| e88df26ed7 | |||
| 021f183157 | |||
| 66a191f494 | |||
| d8458662fd | |||
| a5d66a0249 | |||
| e4527aa5b3 | |||
| 6bae78a282 | |||
| 400c7f6443 | |||
| c34045e215 | |||
| 2addec5adf | |||
| 81d069cddf | |||
| a5b7e11964 | |||
| 4e69b5b9fb | |||
| 05478f54ae | |||
| b88bcff389 | |||
| a57de9715c | |||
| e4ac505928 | |||
| b32858cb88 | |||
| 11aabe0216 | |||
| 0f1aeccde0 | |||
| bdfc8082a2 | |||
| 1a7df89c47 | |||
| 6f9f791f02 | |||
| d6738b2b70 | |||
| 165d75a21f | |||
| f6441a9b4c | |||
| b501160efc | |||
| 1dff16a86e | |||
| 8ea2a698ce | |||
| 854084d819 | |||
| 112e037f4d | |||
| e0e911f9e7 | |||
| 269fa0f870 | |||
| 522bd4a597 | |||
| d48d4c0f21 | |||
| 74829b93ac | |||
| 4acb81a20d | |||
| 9fd0dd00cb | |||
| d7b26608df | |||
| a42eb2888e | |||
| 3bb985d0b6 | |||
| a842845685 | |||
| 8598d59ab4 | |||
| 1f668586f2 | |||
| bfebc67eb7 | |||
| b5a72aad05 | |||
| c859aeb75f | |||
| 27a3f2f08c | |||
| 98efbc2875 | |||
| e912c88993 | |||
| 550374ef6f | |||
| b85d271efe | |||
| ba746d03fa | |||
| 20803c67ed | |||
| 5dc9d4dca4 | |||
| 4bdd72152e | |||
| 8c94fc6e29 | |||
| 2b54d69b9e | |||
| 7ef3e68d16 | |||
| 822681bf74 | |||
| edd2f7aab5 |
@ -30,7 +30,7 @@ type Authinfo struct {
|
||||
}
|
||||
|
||||
const (
|
||||
sessionSyncChannelName = "session-sync-channel2"
|
||||
sessionSyncChannelNamePrefix = "session-sync-channel2"
|
||||
)
|
||||
|
||||
type AuthinfoCell interface {
|
||||
@ -99,6 +99,8 @@ func (ac *redisAuthCell) ToBytes() []byte {
|
||||
func newAuthCollectionWithRedis(redisClient *redis.Client, subctx context.Context, maingateURL string, apiToken string) *AuthCollection {
|
||||
sessionTTL := int64(3600)
|
||||
ac := MakeAuthCollection(time.Duration(sessionTTL * int64(time.Second)))
|
||||
|
||||
sessionSyncChannelName := fmt.Sprintf("%s-%d", sessionSyncChannelNamePrefix, redisClient.Options().DB)
|
||||
pubsub := redisClient.Subscribe(subctx, sessionSyncChannelName)
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
go func(ctx context.Context, sub *redis.PubSub, authCache *AuthCollection) {
|
||||
@ -205,7 +207,7 @@ func (acg *AuthCollectionGlobal) Reload(context context.Context) error {
|
||||
for r, url := range config.RegionStorage {
|
||||
if _, ok := oldval[r]; !ok {
|
||||
// 새로 생겼네
|
||||
redisClient, err := NewRedisClient(url.Redis.URL, url.Redis.Offset["session"])
|
||||
redisClient, err := NewRedisClient(url.Redis["session"])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -228,7 +230,7 @@ func NewAuthCollectionGlobal(context context.Context, apiToken string) (AuthColl
|
||||
|
||||
output := make(map[string]*AuthCollection)
|
||||
for region, url := range config.RegionStorage {
|
||||
redisClient, err := NewRedisClient(url.Redis.URL, url.Redis.Offset["session"])
|
||||
redisClient, err := NewRedisClient(url.Redis["session"])
|
||||
if err != nil {
|
||||
return AuthCollectionGlobal{}, err
|
||||
}
|
||||
|
||||
@ -18,8 +18,6 @@ import (
|
||||
|
||||
"repositories.action2quare.com/ayo/gocommon/flagx"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
)
|
||||
|
||||
var linkupdate = flagx.String("updatelink", "", "")
|
||||
@ -239,17 +237,3 @@ func ReplyUpdateComplete() {
|
||||
|
||||
// return (uint32(b[0]) << 0) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
|
||||
// }
|
||||
|
||||
type BsonMarshaler[T any] struct {
|
||||
val T
|
||||
}
|
||||
|
||||
func NewBsonMarshaler[T any](val T) *BsonMarshaler[T] {
|
||||
return &BsonMarshaler[T]{
|
||||
val: val,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *BsonMarshaler[T]) MarshalBinary() (data []byte, err error) {
|
||||
return bson.Marshal(m.val)
|
||||
}
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
package coupon
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"math/rand"
|
||||
@ -42,12 +41,8 @@ func DisolveCouponCode(code string) (round string, key string) {
|
||||
}
|
||||
|
||||
func MakeCouponRoundHash(name string) (hash string, roundNumber uint32) {
|
||||
m5 := md5.New()
|
||||
m5.Write([]byte(name))
|
||||
hashbt := m5.Sum(nil)
|
||||
|
||||
roundbt := make([]byte, 8)
|
||||
copy(roundbt, hashbt[:8])
|
||||
copy(roundbt, []byte(strings.ToLower(name)))
|
||||
|
||||
roundseed := int64(binary.BigEndian.Uint64(roundbt))
|
||||
roundhash := make([]byte, 4)
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package flagx
|
||||
|
||||
import (
|
||||
"encoding"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
@ -124,6 +125,9 @@ func Duration(name string, value time.Duration, usage string) *time.Duration {
|
||||
return findProperFlagSet(name).Duration(name, value, usage)
|
||||
}
|
||||
|
||||
func TextVar(p encoding.TextUnmarshaler, name string, value encoding.TextMarshaler, usage string) {
|
||||
findProperFlagSet(name).TextVar(p, name, value, usage)
|
||||
}
|
||||
func Func(name, usage string, fn func(string) error) {
|
||||
findProperFlagSet(name).Func(name, usage, fn)
|
||||
}
|
||||
|
||||
4
go.mod
4
go.mod
@ -1,8 +1,6 @@
|
||||
module repositories.action2quare.com/ayo/gocommon
|
||||
|
||||
go 1.18
|
||||
|
||||
replace repositories.action2quare.com/ayo/gocommon => ./
|
||||
go 1.20
|
||||
|
||||
require (
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
|
||||
@ -16,7 +16,7 @@ var txSetArgs redis.SetArgs = redis.SetArgs{
|
||||
}
|
||||
|
||||
type LockerWithRedis struct {
|
||||
key string
|
||||
locked_key string
|
||||
}
|
||||
|
||||
var ErrTransactionLocked = errors.New("transaction is already locked")
|
||||
@ -34,10 +34,10 @@ func (locker *LockerWithRedis) Lock(rc *redis.Client, key string) error {
|
||||
return ErrTransactionLocked
|
||||
}
|
||||
|
||||
locker.key = key
|
||||
locker.locked_key = key
|
||||
return nil
|
||||
}
|
||||
|
||||
func (locker *LockerWithRedis) Unlock(rc *redis.Client) {
|
||||
rc.Del(context.Background(), locker.key).Result()
|
||||
rc.Del(context.Background(), locker.locked_key).Result()
|
||||
}
|
||||
|
||||
@ -7,12 +7,12 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var stdlogger *log.Logger
|
||||
var errlogger *log.Logger
|
||||
var _ = flag.Bool("logfile", false, "")
|
||||
|
||||
func init() {
|
||||
@ -20,9 +20,7 @@ func init() {
|
||||
binname := path.Base(strings.ReplaceAll(binpath, "\\", "/"))
|
||||
|
||||
var outWriter io.Writer
|
||||
var errWriter io.Writer
|
||||
outWriter = os.Stdout
|
||||
errWriter = os.Stderr
|
||||
|
||||
args := os.Args
|
||||
useLogFile := false
|
||||
@ -46,11 +44,9 @@ func init() {
|
||||
}
|
||||
|
||||
outWriter = io.MultiWriter(outWriter, logFile)
|
||||
errWriter = io.MultiWriter(errWriter, logFile)
|
||||
}
|
||||
|
||||
stdlogger = log.New(outWriter, "", log.LstdFlags)
|
||||
errlogger = log.New(errWriter, "", log.LstdFlags)
|
||||
}
|
||||
|
||||
func Println(v ...interface{}) {
|
||||
@ -62,42 +58,83 @@ func Printf(format string, v ...interface{}) {
|
||||
}
|
||||
|
||||
func Error(v ...interface{}) {
|
||||
errlogger.Output(2, fmt.Sprintln(v...))
|
||||
errlogger.Output(2, string(debug.Stack()))
|
||||
stdlogger.Output(2, fmt.Sprintln(v...))
|
||||
stdlogger.Output(2, string(debug.Stack()))
|
||||
}
|
||||
|
||||
func Errorf(format string, v ...interface{}) {
|
||||
errlogger.Output(2, fmt.Sprintf(format, v...))
|
||||
errlogger.Output(2, string(debug.Stack()))
|
||||
stdlogger.Output(2, fmt.Sprintf(format, v...))
|
||||
stdlogger.Output(2, string(debug.Stack()))
|
||||
}
|
||||
|
||||
func Fatal(v ...interface{}) {
|
||||
errlogger.Output(2, fmt.Sprint(v...))
|
||||
errlogger.Output(2, string(debug.Stack()))
|
||||
stdlogger.Output(2, fmt.Sprint(v...))
|
||||
stdlogger.Output(2, string(debug.Stack()))
|
||||
os.Exit(1)
|
||||
}
|
||||
func Fatalln(v ...interface{}) {
|
||||
errlogger.Output(2, fmt.Sprintln(v...))
|
||||
errlogger.Output(2, string(debug.Stack()))
|
||||
stdlogger.Output(2, fmt.Sprintln(v...))
|
||||
stdlogger.Output(2, string(debug.Stack()))
|
||||
os.Exit(1)
|
||||
}
|
||||
func Panic(v ...interface{}) {
|
||||
s := fmt.Sprint(v...)
|
||||
errlogger.Output(2, s)
|
||||
errlogger.Output(2, string(debug.Stack()))
|
||||
stdlogger.Output(2, s)
|
||||
stdlogger.Output(2, string(debug.Stack()))
|
||||
panic(s)
|
||||
}
|
||||
|
||||
func Panicf(format string, v ...interface{}) {
|
||||
s := fmt.Sprintf(format, v...)
|
||||
errlogger.Output(2, s)
|
||||
errlogger.Output(2, string(debug.Stack()))
|
||||
stdlogger.Output(2, s)
|
||||
stdlogger.Output(2, string(debug.Stack()))
|
||||
panic(s)
|
||||
}
|
||||
|
||||
func Panicln(v ...interface{}) {
|
||||
s := fmt.Sprintln(v...)
|
||||
errlogger.Output(2, s)
|
||||
errlogger.Output(2, string(debug.Stack()))
|
||||
stdlogger.Output(2, s)
|
||||
stdlogger.Output(2, string(debug.Stack()))
|
||||
panic(s)
|
||||
}
|
||||
|
||||
type errWithCallstack struct {
|
||||
inner error
|
||||
frames []*runtime.Frame
|
||||
}
|
||||
|
||||
func (ecs *errWithCallstack) Error() string {
|
||||
if ecs.frames == nil {
|
||||
return ecs.inner.Error()
|
||||
}
|
||||
|
||||
out := make([]string, 0, len(ecs.frames)+1)
|
||||
out = append(out, ecs.inner.Error())
|
||||
for i := len(ecs.frames) - 1; i >= 0; i-- {
|
||||
frame := ecs.frames[i]
|
||||
out = append(out, fmt.Sprintf("%s\n\t%s:%d", frame.Function, frame.File, frame.Line))
|
||||
}
|
||||
|
||||
return strings.Join(out, "\n")
|
||||
}
|
||||
|
||||
func ErrorWithCallStack(err error) error {
|
||||
var frames []*runtime.Frame
|
||||
|
||||
if recur, ok := err.(*errWithCallstack); ok {
|
||||
err = recur.inner
|
||||
frames = recur.frames
|
||||
}
|
||||
|
||||
pc, _, _, ok := runtime.Caller(1)
|
||||
if ok {
|
||||
curframes := runtime.CallersFrames([]uintptr{pc})
|
||||
f, _ := curframes.Next()
|
||||
frames = append(frames, &f)
|
||||
}
|
||||
|
||||
return &errWithCallstack{
|
||||
inner: err,
|
||||
frames: frames,
|
||||
}
|
||||
}
|
||||
|
||||
179
metric/metric.go
Normal file
179
metric/metric.go
Normal file
@ -0,0 +1,179 @@
|
||||
package metric
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
METRIC_HEAD_INLINE = byte(14)
|
||||
METRIC_TAIL_INLINE = byte(15)
|
||||
)
|
||||
|
||||
type MetricType int
|
||||
|
||||
const (
|
||||
MetricCounter = MetricType(1)
|
||||
MetricGuage = MetricType(2)
|
||||
metric_key_size = 8
|
||||
)
|
||||
|
||||
type MetricDescription struct {
|
||||
Key string
|
||||
Type MetricType
|
||||
Name string `json:",omitempty"`
|
||||
Help string `json:",omitempty"`
|
||||
ConstLabels map[string]string `json:",omitempty"`
|
||||
}
|
||||
|
||||
type writeRequest struct {
|
||||
key string
|
||||
valfunc func() float64
|
||||
}
|
||||
|
||||
type metricCollection struct {
|
||||
writerChan chan *writeRequest
|
||||
}
|
||||
|
||||
var mc = metricCollection{
|
||||
writerChan: make(chan *writeRequest, 100),
|
||||
}
|
||||
|
||||
type MetricWriter interface {
|
||||
Add(int64)
|
||||
Set(int64)
|
||||
}
|
||||
|
||||
type metric_empty struct{}
|
||||
|
||||
func (mw *metric_empty) Set(newval int64) {}
|
||||
func (mw *metric_empty) Add(inc int64) {}
|
||||
|
||||
var MetricWriterNil = MetricWriter(&metric_empty{})
|
||||
|
||||
type metric_int64 struct {
|
||||
valptr *int64
|
||||
key string
|
||||
writerChan chan *writeRequest
|
||||
}
|
||||
|
||||
func (mw *metric_int64) requestMetricWrite() {
|
||||
mw.writerChan <- &writeRequest{
|
||||
key: mw.key,
|
||||
valfunc: func() float64 { return float64(atomic.LoadInt64(mw.valptr)) },
|
||||
}
|
||||
}
|
||||
|
||||
func (mw *metric_int64) Set(newval int64) {
|
||||
atomic.StoreInt64(mw.valptr, newval)
|
||||
mw.requestMetricWrite()
|
||||
}
|
||||
|
||||
func (mw *metric_int64) Add(inc int64) {
|
||||
atomic.AddInt64(mw.valptr, inc)
|
||||
mw.requestMetricWrite()
|
||||
}
|
||||
|
||||
func (mc *metricCollection) metricWriter() {
|
||||
defer func() {
|
||||
r := recover()
|
||||
if r != nil {
|
||||
logger.Error(r)
|
||||
}
|
||||
}()
|
||||
|
||||
// head + metric_key_size + 8byte + tail + cr = 19
|
||||
var buff [19]byte
|
||||
buff[0] = METRIC_HEAD_INLINE
|
||||
buff[17] = METRIC_TAIL_INLINE
|
||||
buff[18] = '\n'
|
||||
|
||||
for req := range mc.writerChan {
|
||||
copy(buff[1:], []byte(req.key))
|
||||
binary.BigEndian.PutUint64(buff[9:], math.Float64bits(req.valfunc()))
|
||||
os.Stdout.Write(buff[:])
|
||||
}
|
||||
}
|
||||
|
||||
var NewMetric func(MetricType, string, string, map[string]string) MetricWriter
|
||||
var ConcurrentUser = MetricWriterNil
|
||||
|
||||
func init() {
|
||||
NewMetric = func(MetricType, string, string, map[string]string) MetricWriter {
|
||||
return &metric_empty{}
|
||||
}
|
||||
|
||||
if path.Base(os.Args[0]) == "houston" {
|
||||
logger.Println("metrics are going to be generated for myself(houston)")
|
||||
go mc.metricWriter()
|
||||
NewMetric = newMetricImpl
|
||||
return
|
||||
}
|
||||
|
||||
ppid := os.Getppid()
|
||||
if parent, _ := os.FindProcess(ppid); parent != nil {
|
||||
filename := fmt.Sprintf(`/proc/%d/stat`, os.Getppid())
|
||||
if fn, err := os.ReadFile(filename); err == nil {
|
||||
stats := strings.SplitN(string(fn), " ", 3)
|
||||
parentname := strings.Trim(stats[1], "()")
|
||||
|
||||
if path.Base(parentname) == "houston" {
|
||||
logger.Println("metrics are going to be generated for houston")
|
||||
go mc.metricWriter()
|
||||
NewMetric = newMetricImpl
|
||||
ConcurrentUser = NewMetric(MetricGuage, "concurrent_user", "concurrent user count", map[string]string{"location": "lobby"})
|
||||
} else {
|
||||
logger.Println("metrics are NOT going to be generated. parent is not houston :", filename, string(fn))
|
||||
}
|
||||
} else {
|
||||
logger.Println("metrics are NOT going to be generated. ppid proc is missing :", filename)
|
||||
}
|
||||
} else {
|
||||
logger.Println("metrics are NOT going to be generated. parent process is missing. ppid :", ppid)
|
||||
}
|
||||
}
|
||||
|
||||
func newMetricImpl(mt MetricType, name string, help string, constLabels map[string]string) (writer MetricWriter) {
|
||||
hash := md5.New()
|
||||
hash.Write([]byte(name))
|
||||
|
||||
if constLabels == nil {
|
||||
constLabels = make(map[string]string)
|
||||
}
|
||||
|
||||
hn, _ := os.Hostname()
|
||||
sn := path.Base(os.Args[0])
|
||||
constLabels["hostname"] = hn
|
||||
constLabels["service"] = sn
|
||||
|
||||
key := hex.EncodeToString(hash.Sum(nil))[:metric_key_size]
|
||||
temp, _ := json.Marshal(MetricDescription{
|
||||
Key: key,
|
||||
Type: mt,
|
||||
Name: name,
|
||||
Help: help,
|
||||
ConstLabels: constLabels,
|
||||
})
|
||||
|
||||
writer = &metric_int64{
|
||||
valptr: new(int64),
|
||||
key: key,
|
||||
writerChan: mc.writerChan,
|
||||
}
|
||||
|
||||
output := append([]byte{METRIC_HEAD_INLINE}, temp...)
|
||||
output = append(output, METRIC_TAIL_INLINE, '\n')
|
||||
os.Stdout.Write(output)
|
||||
|
||||
return
|
||||
}
|
||||
68
mongo.go
68
mongo.go
@ -15,6 +15,7 @@ import (
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.mongodb.org/mongo-driver/mongo/readpref"
|
||||
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
|
||||
)
|
||||
|
||||
type MongoClient struct {
|
||||
@ -60,8 +61,19 @@ func (ci *ConnectionInfo) SetDatabase(dbname string) *ConnectionInfo {
|
||||
return ci
|
||||
}
|
||||
|
||||
func NewMongoClient(ctx context.Context, url string, dbname string) (MongoClient, error) {
|
||||
return newMongoClient(ctx, NewMongoConnectionInfo(url, dbname))
|
||||
var errNoDatabaseNameInMongoUri = errors.New("mongo uri has no database name")
|
||||
|
||||
func NewMongoClient(ctx context.Context, url string) (MongoClient, error) {
|
||||
connstr, err := connstring.ParseAndValidate(url)
|
||||
if err != nil {
|
||||
return MongoClient{}, err
|
||||
}
|
||||
|
||||
if len(connstr.Database) == 0 {
|
||||
return MongoClient{}, errNoDatabaseNameInMongoUri
|
||||
}
|
||||
|
||||
return newMongoClient(ctx, NewMongoConnectionInfo(url, connstr.Database))
|
||||
}
|
||||
|
||||
func newMongoClient(ctx context.Context, ci *ConnectionInfo) (MongoClient, error) {
|
||||
@ -94,21 +106,21 @@ func newMongoClient(ctx context.Context, ci *ConnectionInfo) (MongoClient, error
|
||||
return MongoClient{}, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
if err := client.Ping(ctx, nil); err != nil {
|
||||
logger.Error("mongo client ping err :", err)
|
||||
}
|
||||
// go func() {
|
||||
// for {
|
||||
// if err := client.Ping(ctx, nil); err != nil {
|
||||
// logger.Error("mongo client ping err :", err)
|
||||
// }
|
||||
|
||||
select {
|
||||
case <-time.After(10 * time.Second):
|
||||
continue
|
||||
// select {
|
||||
// case <-time.After(10 * time.Second):
|
||||
// continue
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
// case <-ctx.Done():
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// }()
|
||||
|
||||
mdb := client.Database(ci.Database, nil)
|
||||
return MongoClient{c: client, db: mdb}, nil
|
||||
@ -124,6 +136,18 @@ func (mc MongoClient) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
func (mc MongoClient) DropIndex(coll CollectionName, name string) error {
|
||||
matchcoll := mc.Collection(coll)
|
||||
_, err := matchcoll.Indexes().DropOne(context.Background(), name)
|
||||
if commanderr, ok := err.(mongo.CommandError); ok {
|
||||
if commanderr.Code == 27 {
|
||||
// 인덱스가 없는 것이므로 그냥 성공
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (mc MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) {
|
||||
if len(opts) == 0 {
|
||||
opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)}
|
||||
@ -432,7 +456,7 @@ IndexSearchLabel:
|
||||
return err
|
||||
}
|
||||
|
||||
func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[string]bson.D, option *options.IndexOptions) error {
|
||||
func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error {
|
||||
collection := mc.Collection(coll)
|
||||
cursor, err := collection.Indexes().List(context.Background(), options.ListIndexes().SetMaxTime(time.Second))
|
||||
if err != nil {
|
||||
@ -460,12 +484,12 @@ func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[str
|
||||
if len(v) == 1 {
|
||||
mod = mongo.IndexModel{
|
||||
Keys: primitive.M{v[0].Key: v[0].Value},
|
||||
Options: options.MergeIndexOptions(options.Index().SetName(name), option),
|
||||
Options: options.MergeIndexOptions(append(opts, options.Index().SetName(name))...),
|
||||
}
|
||||
} else {
|
||||
mod = mongo.IndexModel{
|
||||
Keys: indices[name],
|
||||
Options: options.MergeIndexOptions(options.Index().SetName(name), option),
|
||||
Options: options.MergeIndexOptions(append(opts, options.Index().SetName(name))...),
|
||||
}
|
||||
}
|
||||
|
||||
@ -478,10 +502,10 @@ func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[str
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc MongoClient) MakeUniqueIndices(coll CollectionName, indices map[string]bson.D) error {
|
||||
return mc.makeIndicesWithOption(coll, indices, options.Index().SetUnique(true))
|
||||
func (mc MongoClient) MakeUniqueIndices(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error {
|
||||
return mc.makeIndicesWithOption(coll, indices, append(opts, options.Index().SetUnique(true))...)
|
||||
}
|
||||
|
||||
func (mc MongoClient) MakeIndices(coll CollectionName, indices map[string]bson.D) error {
|
||||
return mc.makeIndicesWithOption(coll, indices, options.Index())
|
||||
func (mc MongoClient) MakeIndices(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error {
|
||||
return mc.makeIndicesWithOption(coll, indices, opts...)
|
||||
}
|
||||
|
||||
439
redis.go
439
redis.go
@ -2,9 +2,11 @@ package gocommon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
@ -19,23 +21,30 @@ func newRedisClient(uri string, dbidxoffset int) *redis.Client {
|
||||
return redis.NewClient(option)
|
||||
}
|
||||
|
||||
func NewRedisClient(uri string, dbidx int) (*redis.Client, error) {
|
||||
func NewRedisClient(uri string) (*redis.Client, error) {
|
||||
if !*devflag {
|
||||
return newRedisClient(uri, dbidx), nil
|
||||
return newRedisClient(uri, 0), nil
|
||||
}
|
||||
|
||||
rdb := newRedisClient(uri, 0)
|
||||
devUrl, _ := url.Parse(uri)
|
||||
option, err := redis.ParseURL(uri)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
zero := *option
|
||||
zero.DB = 0
|
||||
rdb := redis.NewClient(&zero)
|
||||
defer rdb.Close()
|
||||
|
||||
hostname, _ := os.Hostname()
|
||||
myidx, _ := rdb.HGet(context.Background(), "private_db", hostname).Result()
|
||||
if len(myidx) > 0 {
|
||||
devUrl.Path = "/" + myidx
|
||||
return newRedisClient(devUrl.String(), dbidx), nil
|
||||
offset, _ := strconv.Atoi(myidx)
|
||||
option.DB += offset
|
||||
return redis.NewClient(option), nil
|
||||
}
|
||||
|
||||
alldbs, err := rdb.HGetAll(context.Background(), "private_db").Result()
|
||||
if err != nil {
|
||||
rdb.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -53,6 +62,416 @@ func NewRedisClient(uri string, dbidx int) (*redis.Client, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
devUrl.Path = "/" + strconv.Itoa(newidx)
|
||||
return newRedisClient(devUrl.String(), dbidx), nil
|
||||
option.DB += newidx
|
||||
return redis.NewClient(option), nil
|
||||
}
|
||||
|
||||
type RedisonSetOption = string
|
||||
type RedisonGetOption = [2]any
|
||||
|
||||
const (
|
||||
// JSONSET command Options
|
||||
RedisonSetOptionNX RedisonSetOption = "NX"
|
||||
RedisonSetOptionXX RedisonSetOption = "XX"
|
||||
)
|
||||
|
||||
var (
|
||||
RedisonGetOptionSPACE = RedisonGetOption{"SPACE", " "}
|
||||
RedisonGetOptionINDENT = RedisonGetOption{"INDENT", "\t"}
|
||||
RedisonGetOptionNEWLINE = RedisonGetOption{"NEWLINE", "\n"}
|
||||
RedisonGetOptionNOESCAPE = RedisonGetOption{"NOESCAPE", ""}
|
||||
)
|
||||
|
||||
// gocommon으로 옮길 거
|
||||
type RedisonHandler struct {
|
||||
*redis.Client
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewRedisonHandler(ctx context.Context, redisClient *redis.Client) *RedisonHandler {
|
||||
return &RedisonHandler{
|
||||
Client: redisClient,
|
||||
ctx: ctx,
|
||||
}
|
||||
}
|
||||
|
||||
func respToArray[T any](resp any, err error) ([]T, error) {
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resArr := resp.([]any)
|
||||
v := make([]T, len(resArr))
|
||||
for i, e := range resArr {
|
||||
v[i] = e.(T)
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func appendArgs[T any](args []any, ext ...T) []any {
|
||||
for _, e := range ext {
|
||||
args = append(args, e)
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONMSetRel(key string, prefixPath string, kv map[string]any) error {
|
||||
if len(prefixPath) > 0 && !strings.HasSuffix(prefixPath, ".") {
|
||||
prefixPath += "."
|
||||
}
|
||||
|
||||
pl := rh.Pipeline()
|
||||
for path, obj := range kv {
|
||||
b, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pl.Do(rh.ctx, "JSON.SET", key, prefixPath+path, b)
|
||||
}
|
||||
|
||||
cmders, err := pl.Exec(rh.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, cmder := range cmders {
|
||||
if cmder.Err() != nil {
|
||||
return cmder.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONMSet(key string, kv map[string]any) error {
|
||||
return rh.JSONMSetRel(key, "", kv)
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) jsonSetMergeJSONSet(cmd, key, path string, obj any, opts ...RedisonSetOption) (bool, error) {
|
||||
b, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
args := []any{
|
||||
"JSON.SET",
|
||||
key,
|
||||
path,
|
||||
b,
|
||||
}
|
||||
if len(opts) > 0 {
|
||||
args = append(args, opts[0])
|
||||
}
|
||||
|
||||
res, err := rh.Do(rh.ctx, args...).Result()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return res.(string) == "OK", nil
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONSet(key, path string, obj any, opts ...RedisonSetOption) (bool, error) {
|
||||
return rh.jsonSetMergeJSONSet("JSON.SET", key, path, obj, opts...)
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONMerge(key, path string, obj any, opts ...RedisonSetOption) (bool, error) {
|
||||
return rh.jsonSetMergeJSONSet("JSON.MERGE", key, path, obj, opts...)
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONGet(key, path string, opts ...RedisonGetOption) (res any, err error) {
|
||||
args := appendArgs[string]([]any{
|
||||
"JSON.GET",
|
||||
key,
|
||||
}, strings.Split(path, " ")...)
|
||||
|
||||
for _, opt := range opts {
|
||||
args = append(args, opt[:]...)
|
||||
}
|
||||
|
||||
return rh.Do(rh.ctx, args...).Result()
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONGetString(key, path string) ([]string, error) {
|
||||
return respToArray[string](rh.JSONResp(key, path))
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONGetDocuments(key, path string) ([]map[string]any, error) {
|
||||
resp, err := rh.JSONGet(key, path)
|
||||
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var objs []map[string]any
|
||||
err = json.Unmarshal([]byte(resp.(string)), &objs)
|
||||
return objs, err
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONGetInt64(key, path string) ([]int64, error) {
|
||||
return respToArray[int64](rh.JSONResp(key, path))
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONMGet(path string, keys ...string) (res any, err error) {
|
||||
args := appendArgs[string]([]any{
|
||||
"JSON.MGET",
|
||||
path,
|
||||
}, keys...)
|
||||
return rh.Do(rh.ctx, args...).Result()
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONMDel(key string, paths []string) error {
|
||||
pl := rh.Pipeline()
|
||||
for _, path := range paths {
|
||||
args := []any{
|
||||
"JSON.DEL",
|
||||
key,
|
||||
path,
|
||||
}
|
||||
pl.Do(rh.ctx, args...)
|
||||
}
|
||||
_, err := pl.Exec(rh.ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONDel(key, path string) (int64, error) {
|
||||
args := []any{
|
||||
"JSON.DEL",
|
||||
key,
|
||||
path,
|
||||
}
|
||||
resp, err := rh.Do(rh.ctx, args...).Result()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return resp.(int64), nil
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONType(key, path string) ([]string, error) {
|
||||
args := []any{
|
||||
"JSON.TYPE",
|
||||
key,
|
||||
path,
|
||||
}
|
||||
return respToArray[string](rh.Do(rh.ctx, args...).Result())
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONNumIncrBy(key, path string, number int) ([]int64, error) {
|
||||
args := []any{
|
||||
"JSON.NUMINCRBY",
|
||||
key,
|
||||
path,
|
||||
number,
|
||||
}
|
||||
resp, err := rh.Do(rh.ctx, args...).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var cnts []int64
|
||||
err = json.Unmarshal([]byte(resp.(string)), &cnts)
|
||||
return cnts, err
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONNumMultBy(key, path string, number int) (res any, err error) {
|
||||
args := []any{
|
||||
"JSON.NUMMULTBY",
|
||||
key,
|
||||
path,
|
||||
number,
|
||||
}
|
||||
resp, err := rh.Do(rh.ctx, args...).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.([]any), nil
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONStrAppend(key, path string, jsonstring string) ([]int64, error) {
|
||||
args := []any{
|
||||
"JSON.STRAPPEND",
|
||||
key,
|
||||
path,
|
||||
fmt.Sprintf(`'"%s"'`, jsonstring),
|
||||
}
|
||||
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONStrLen(key, path string) (res []int64, err error) {
|
||||
args := []any{
|
||||
"JSON.STRLEN",
|
||||
key,
|
||||
path,
|
||||
}
|
||||
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONArrAppend(key, path string, values ...any) (int64, error) {
|
||||
args := appendValues([]any{
|
||||
"JSON.ARRAPPEND",
|
||||
key,
|
||||
path,
|
||||
}, values...)
|
||||
|
||||
resp, err := rh.Do(rh.ctx, args...).Result()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return resp.(int64), nil
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONArrLen(key, path string) ([]int64, error) {
|
||||
args := []any{
|
||||
"JSON.ARRLEN",
|
||||
key,
|
||||
path,
|
||||
}
|
||||
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONArrPop(key, path string, index int) (res any, err error) {
|
||||
args := []any{
|
||||
"JSON.ARRPOP",
|
||||
key,
|
||||
path,
|
||||
index,
|
||||
}
|
||||
resp, err := rh.Do(rh.ctx, args...).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp.([]any)[0], nil
|
||||
}
|
||||
|
||||
func appendValues(args []any, values ...any) []any {
|
||||
for _, jsonValue := range values {
|
||||
switch jsonValue := jsonValue.(type) {
|
||||
case string:
|
||||
args = append(args, fmt.Sprintf(`'"%s"'`, jsonValue))
|
||||
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
|
||||
args = append(args, jsonValue)
|
||||
default:
|
||||
bt, _ := json.Marshal(jsonValue)
|
||||
args = append(args, bt)
|
||||
}
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONArrIndex(key, path string, jsonValue any, optionalRange ...int) ([]int64, error) {
|
||||
args := appendValues([]any{
|
||||
"JSON.ARRINDEX",
|
||||
key,
|
||||
path,
|
||||
}, jsonValue)
|
||||
|
||||
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONArrTrim(key, path string, start, end int) (res any, err error) {
|
||||
args := []any{
|
||||
"JSON.ARRTRIM",
|
||||
key,
|
||||
path,
|
||||
start,
|
||||
end,
|
||||
}
|
||||
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONArrInsert(key, path string, index int, values ...any) (res any, err error) {
|
||||
args := appendValues([]any{
|
||||
"JSON.ARRINSERT",
|
||||
key,
|
||||
path,
|
||||
index,
|
||||
}, values...)
|
||||
|
||||
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONObjKeys(key, path string) ([]string, error) {
|
||||
args := []any{
|
||||
"JSON.OBJKEYS",
|
||||
key,
|
||||
path,
|
||||
}
|
||||
|
||||
res, err := rh.Do(rh.ctx, args...).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resArr := res.([]any)
|
||||
if len(resArr) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
resArr = resArr[0].([]any)
|
||||
slc := make([]string, len(resArr))
|
||||
|
||||
for i, r := range resArr {
|
||||
slc[i] = r.(string)
|
||||
}
|
||||
|
||||
return slc, nil
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONObjLen(key, path string) ([]int64, error) {
|
||||
args := []any{
|
||||
"JSON.OBJLEN",
|
||||
key,
|
||||
}
|
||||
|
||||
if path != "$" {
|
||||
args = append(args, path)
|
||||
}
|
||||
|
||||
resp, err := rh.Do(rh.ctx, args...).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch resp := resp.(type) {
|
||||
case []any:
|
||||
return respToArray[int64](resp, nil)
|
||||
|
||||
case int64:
|
||||
return []int64{resp}, nil
|
||||
}
|
||||
|
||||
return []int64{0}, nil
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONDebug(key, path string) (res any, err error) {
|
||||
args := []any{
|
||||
"JSON.DEBUG",
|
||||
"MEMORY",
|
||||
key,
|
||||
path,
|
||||
}
|
||||
return rh.Do(rh.ctx, args...).Result()
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONForget(key, path string) (int64, error) {
|
||||
return rh.JSONDel(key, path)
|
||||
}
|
||||
|
||||
func (rh *RedisonHandler) JSONResp(key, path string) (res any, err error) {
|
||||
args := []any{
|
||||
"JSON.RESP",
|
||||
key,
|
||||
path,
|
||||
}
|
||||
return rh.Do(rh.ctx, args...).Result()
|
||||
}
|
||||
|
||||
@ -65,10 +65,7 @@ func LoadConfig[T any](outptr *T) error {
|
||||
|
||||
type StorageAddr struct {
|
||||
Mongo string
|
||||
Redis struct {
|
||||
URL string
|
||||
Offset map[string]int
|
||||
}
|
||||
Redis map[string]string
|
||||
}
|
||||
|
||||
type RegionStorageConfig struct {
|
||||
|
||||
315
server.go
315
server.go
@ -1,7 +1,9 @@
|
||||
package gocommon
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -20,6 +22,7 @@ import (
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"repositories.action2quare.com/ayo/gocommon/flagx"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
@ -28,6 +31,15 @@ import (
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
func init() {
|
||||
gob.Register(map[string]any{})
|
||||
gob.Register(primitive.A{})
|
||||
gob.Register(primitive.M{})
|
||||
gob.Register(primitive.D{})
|
||||
gob.Register(primitive.ObjectID{})
|
||||
gob.Register([]any{})
|
||||
}
|
||||
|
||||
const (
|
||||
// HTTPStatusReloginRequired : http status를 이걸 받으면 클라이언트는 로그아웃하고 로그인 화면으로 돌아가야 한다.
|
||||
HTTPStatusReloginRequired = 599
|
||||
@ -46,6 +58,14 @@ const (
|
||||
ShutdownFlagIdle = ShutdownFlag(3)
|
||||
)
|
||||
|
||||
type ErrorWithStatus struct {
|
||||
StatusCode int
|
||||
}
|
||||
|
||||
func (e ErrorWithStatus) Error() string {
|
||||
return fmt.Sprintf("%d", e.StatusCode)
|
||||
}
|
||||
|
||||
type RpcReturnTypeInterface interface {
|
||||
Value() any
|
||||
Error() error
|
||||
@ -97,24 +117,18 @@ func NewHTTPServerWithPort(serveMux *http.ServeMux, port int) *Server {
|
||||
addr := fmt.Sprintf(":%d", port)
|
||||
serveMux.HandleFunc(MakeHttpHandlerPattern("welcome"), welcomeHandler)
|
||||
serveMux.HandleFunc(MakeHttpHandlerPattern("lb_health_chceck"), healthCheckHandler)
|
||||
serveMux.HandleFunc(MakeHttpHandlerPattern("lb_health_check"), healthCheckHandler)
|
||||
|
||||
server := &Server{
|
||||
httpserver: &http.Server{Addr: addr, Handler: serveMux},
|
||||
interrupt: make(chan os.Signal, 1),
|
||||
}
|
||||
server.httpserver.SetKeepAlivesEnabled(true)
|
||||
|
||||
signal.Notify(server.interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
go func() {
|
||||
c := <-server.interrupt
|
||||
logger.Println("interrupt!!!!!!!! :", c.String())
|
||||
server.shutdown()
|
||||
}()
|
||||
|
||||
return server
|
||||
}
|
||||
|
||||
func NewHTTPServer(serveMux *http.ServeMux) *Server {
|
||||
|
||||
// 시작시 자동으로 enable됨
|
||||
if len(*tls) > 0 && *portptr == 80 {
|
||||
*portptr = 443
|
||||
@ -132,8 +146,7 @@ func (server *Server) shutdown() {
|
||||
logger.Println("http server shutdown. healthcheckcounter :", t)
|
||||
atomic.StoreInt64(&healthcheckcounter, math.MinInt64)
|
||||
|
||||
timer := 600 // 0.1 * 600 = 1분
|
||||
for cnt := 0; cnt < 100 && timer > 0; {
|
||||
for cnt := 0; cnt < 100; {
|
||||
next := atomic.LoadInt64(&healthcheckcounter)
|
||||
if next == t {
|
||||
cnt++
|
||||
@ -142,7 +155,6 @@ func (server *Server) shutdown() {
|
||||
cnt = 0
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
timer--
|
||||
}
|
||||
logger.Println("http server shutdown. healthcheck completed")
|
||||
} else {
|
||||
@ -155,6 +167,14 @@ func (server *Server) shutdown() {
|
||||
}
|
||||
}
|
||||
|
||||
func (server *Server) Stop() {
|
||||
if server.interrupt != nil {
|
||||
server.interrupt <- os.Interrupt
|
||||
} else {
|
||||
server.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
// Start :
|
||||
func (server *Server) Start() error {
|
||||
if server.httpserver != nil {
|
||||
@ -162,6 +182,15 @@ func (server *Server) Start() error {
|
||||
if r != nil {
|
||||
return r
|
||||
}
|
||||
server.interrupt = make(chan os.Signal, 1)
|
||||
signal.Notify(server.interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
|
||||
go func() {
|
||||
c := <-server.interrupt
|
||||
logger.Println("interrupt!!!!!!!! :", c.String())
|
||||
server.shutdown()
|
||||
}()
|
||||
|
||||
proxyListener := &proxyproto.Listener{
|
||||
Listener: ln,
|
||||
ReadHeaderTimeout: 10 * time.Second,
|
||||
@ -421,6 +450,46 @@ func ReadStringFormValue(r url.Values, key string) (string, bool) {
|
||||
return strval, len(strval) > 0
|
||||
}
|
||||
|
||||
type encoder interface {
|
||||
Encode(any) error
|
||||
}
|
||||
|
||||
type nilEncoder struct{}
|
||||
|
||||
func (ne *nilEncoder) Encode(any) error { return nil }
|
||||
|
||||
type decoder interface {
|
||||
Decode(any) error
|
||||
}
|
||||
|
||||
type nilDecoder struct{}
|
||||
|
||||
func (nd *nilDecoder) Decode(any) error { return nil }
|
||||
|
||||
func MakeDecoder(r *http.Request) decoder {
|
||||
ct := r.Header.Get("Content-Type")
|
||||
if ct == "application/gob" {
|
||||
return gob.NewDecoder(r.Body)
|
||||
} else if ct == "application/json" {
|
||||
return json.NewDecoder(r.Body)
|
||||
}
|
||||
|
||||
logger.Error("Content-Type is not supported :", ct)
|
||||
return &nilDecoder{}
|
||||
}
|
||||
|
||||
func MakeEncoder(w http.ResponseWriter, r *http.Request) encoder {
|
||||
ct := r.Header.Get("Content-Type")
|
||||
if ct == "application/gob" {
|
||||
return gob.NewEncoder(w)
|
||||
} else if ct == "application/json" {
|
||||
return json.NewEncoder(w)
|
||||
}
|
||||
|
||||
logger.Error("Content-Type is not supported :", ct)
|
||||
return &nilEncoder{}
|
||||
}
|
||||
|
||||
func DotStringToTimestamp(tv string) primitive.Timestamp {
|
||||
if len(tv) == 0 {
|
||||
return primitive.Timestamp{T: 0, I: 0}
|
||||
@ -599,3 +668,227 @@ func MakeHttpRequestForLogging(r *http.Request) *http.Request {
|
||||
r.Body = ib
|
||||
return r
|
||||
}
|
||||
|
||||
type apiFuncType func(http.ResponseWriter, *http.Request)
|
||||
type HttpApiHandler struct {
|
||||
methods map[string]apiFuncType
|
||||
originalReceiverName string
|
||||
}
|
||||
|
||||
func MakeHttpApiHandler[T any](receiver *T, receiverName string) HttpApiHandler {
|
||||
methods := make(map[string]apiFuncType)
|
||||
|
||||
tp := reflect.TypeOf(receiver)
|
||||
if len(receiverName) == 0 {
|
||||
receiverName = tp.Elem().Name()
|
||||
}
|
||||
writerType := reflect.TypeOf((*http.ResponseWriter)(nil)).Elem()
|
||||
for i := 0; i < tp.NumMethod(); i++ {
|
||||
method := tp.Method(i)
|
||||
if method.Type.NumIn() != 3 {
|
||||
continue
|
||||
}
|
||||
|
||||
if method.Type.In(0) != tp {
|
||||
continue
|
||||
}
|
||||
|
||||
if !method.Type.In(1).Implements(writerType) {
|
||||
continue
|
||||
}
|
||||
|
||||
var r http.Request
|
||||
if method.Type.In(2) != reflect.TypeOf(&r) {
|
||||
continue
|
||||
}
|
||||
|
||||
if method.Name == "ServeHTTP" {
|
||||
continue
|
||||
}
|
||||
|
||||
funcptr := method.Func.Pointer()
|
||||
p1 := unsafe.Pointer(&funcptr)
|
||||
p2 := unsafe.Pointer(&p1)
|
||||
testfunc := (*func(*T, http.ResponseWriter, *http.Request))(p2)
|
||||
methods[receiverName+"."+method.Name] = func(w http.ResponseWriter, r *http.Request) {
|
||||
(*testfunc)(receiver, w, r)
|
||||
}
|
||||
}
|
||||
|
||||
return HttpApiHandler{
|
||||
methods: methods,
|
||||
originalReceiverName: tp.Elem().Name(),
|
||||
}
|
||||
}
|
||||
|
||||
type HttpApiBroker struct {
|
||||
methods map[string]apiFuncType
|
||||
methods_dup map[string][]apiFuncType
|
||||
}
|
||||
|
||||
type bufferReadCloser struct {
|
||||
*bytes.Reader
|
||||
}
|
||||
|
||||
func (buff *bufferReadCloser) Close() error { return nil }
|
||||
|
||||
type readOnlyResponseWriter struct {
|
||||
inner http.ResponseWriter
|
||||
statusCode int
|
||||
}
|
||||
|
||||
func (w *readOnlyResponseWriter) Header() http.Header {
|
||||
return w.inner.Header()
|
||||
}
|
||||
|
||||
func (w *readOnlyResponseWriter) Write(in []byte) (int, error) {
|
||||
logger.Println("readOnlyResponseWriter cannot write")
|
||||
return len(in), nil
|
||||
}
|
||||
|
||||
func (w *readOnlyResponseWriter) WriteHeader(statusCode int) {
|
||||
w.statusCode = statusCode
|
||||
}
|
||||
|
||||
func (hc *HttpApiBroker) AddHandler(receiver HttpApiHandler) {
|
||||
if hc.methods == nil {
|
||||
hc.methods = make(map[string]apiFuncType)
|
||||
hc.methods_dup = make(map[string][]apiFuncType)
|
||||
}
|
||||
|
||||
for k, v := range receiver.methods {
|
||||
ab := strings.Split(k, ".")
|
||||
logger.Printf("http api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k)
|
||||
|
||||
hc.methods_dup[k] = append(hc.methods_dup[k], v)
|
||||
if len(hc.methods_dup[k]) > 1 {
|
||||
chain := hc.methods_dup[k]
|
||||
hc.methods[k] = func(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := io.ReadAll(r.Body)
|
||||
defer r.Body.Close()
|
||||
|
||||
wrap := &readOnlyResponseWriter{inner: w, statusCode: 200}
|
||||
for _, f := range chain {
|
||||
r.Body = &bufferReadCloser{bytes.NewReader(body)}
|
||||
f(wrap, r)
|
||||
}
|
||||
|
||||
if wrap.statusCode != 200 {
|
||||
w.WriteHeader(wrap.statusCode)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
hc.methods[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (hc *HttpApiBroker) Call(w http.ResponseWriter, r *http.Request) {
|
||||
funcname := r.URL.Query().Get("call")
|
||||
if len(funcname) == 0 {
|
||||
logger.Println("query param 'call' is missing")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
|
||||
}
|
||||
hc.call(funcname, w, r)
|
||||
}
|
||||
|
||||
func (hc *HttpApiBroker) call(funcname string, w http.ResponseWriter, r *http.Request) {
|
||||
if found := hc.methods[funcname]; found != nil {
|
||||
found(w, r)
|
||||
} else {
|
||||
logger.Println("api is not found :", funcname)
|
||||
}
|
||||
}
|
||||
|
||||
func CallInternalServiceAPI[T any](url string, apitoken string, method string, data T, headers ...string) error {
|
||||
tempHeader := make(http.Header)
|
||||
tempHeader.Set("MG-X-API-TOKEN", apitoken)
|
||||
tempHeader.Set("Content-Type", "application/gob")
|
||||
|
||||
for i := 1; i < len(headers); i += 2 {
|
||||
tempHeader.Set(headers[i-1], headers[i])
|
||||
}
|
||||
|
||||
buff := new(bytes.Buffer)
|
||||
ct := tempHeader.Get("Content-Type")
|
||||
if ct == "application/gob" {
|
||||
enc := gob.NewEncoder(buff)
|
||||
enc.Encode(data)
|
||||
} else if ct == "application/json" {
|
||||
enc := json.NewEncoder(buff)
|
||||
enc.Encode(data)
|
||||
}
|
||||
|
||||
reqURL := fmt.Sprintf("%s/api?call=%s", url, method)
|
||||
req, err := http.NewRequest("POST", reqURL, buff)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header = tempHeader
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
|
||||
defer func() {
|
||||
if resp != nil && resp.Body != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
if err == nil {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return &ErrorWithStatus{StatusCode: resp.StatusCode}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func CallInternalServiceAPIAs[Tin any, Tout any](url string, apitoken string, method string, data Tin, out *Tout, headers ...string) error {
|
||||
tempHeader := make(http.Header)
|
||||
tempHeader.Set("MG-X-API-TOKEN", apitoken)
|
||||
tempHeader.Set("Content-Type", "application/gob")
|
||||
|
||||
for i := 1; i < len(headers); i += 2 {
|
||||
tempHeader.Set(headers[i-1], headers[i])
|
||||
}
|
||||
|
||||
buff := new(bytes.Buffer)
|
||||
ct := tempHeader.Get("Content-Type")
|
||||
if ct == "application/gob" {
|
||||
enc := gob.NewEncoder(buff)
|
||||
enc.Encode(data)
|
||||
} else if ct == "application/json" {
|
||||
enc := json.NewEncoder(buff)
|
||||
enc.Encode(data)
|
||||
}
|
||||
|
||||
reqURL := fmt.Sprintf("%s/api?call=%s", url, method)
|
||||
req, err := http.NewRequest("POST", reqURL, buff)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header = tempHeader
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return &ErrorWithStatus{StatusCode: resp.StatusCode}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if resp != nil && resp.Body != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
if out != nil && resp.Body != nil {
|
||||
dec := gob.NewDecoder(resp.Body)
|
||||
return dec.Decode(out)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
132
session/common.go
Normal file
132
session/common.go
Normal file
@ -0,0 +1,132 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
type Authorization struct {
|
||||
Account primitive.ObjectID `bson:"a" json:"a"`
|
||||
|
||||
// by authorization provider
|
||||
Platform string `bson:"p" json:"p"`
|
||||
Uid string `bson:"u" json:"u"`
|
||||
Email string `bson:"em" json:"em"`
|
||||
}
|
||||
|
||||
type Provider interface {
|
||||
New(*Authorization) (string, error)
|
||||
Delete(primitive.ObjectID) error
|
||||
Query(string) (Authorization, error)
|
||||
Touch(string) (bool, error)
|
||||
}
|
||||
|
||||
type Consumer interface {
|
||||
Query(string) (Authorization, error)
|
||||
Touch(string) (Authorization, error)
|
||||
}
|
||||
|
||||
type storagekey string
|
||||
type publickey string
|
||||
|
||||
var r = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
func make_storagekey(acc primitive.ObjectID) storagekey {
|
||||
bs := [4]byte{}
|
||||
binary.LittleEndian.PutUint32(bs[:], r.Uint32())
|
||||
|
||||
return storagekey(acc.Hex() + hex.EncodeToString(bs[2:]))
|
||||
}
|
||||
|
||||
func storagekey_to_publickey(sk storagekey) publickey {
|
||||
bs, _ := hex.DecodeString(string(sk))
|
||||
|
||||
acc := bs[:12]
|
||||
cs := bs[12:]
|
||||
|
||||
encoded := [14]byte{}
|
||||
for i, v := range acc[:] {
|
||||
encoded[i] = (v ^ cs[0]) ^ cs[1]
|
||||
}
|
||||
encoded[12] = cs[0]
|
||||
encoded[13] = cs[1]
|
||||
|
||||
return publickey(hex.EncodeToString(encoded[:]))
|
||||
}
|
||||
|
||||
func publickey_to_storagekey(pk publickey) storagekey {
|
||||
bs, _ := hex.DecodeString(string(pk))
|
||||
|
||||
acc := bs[:12]
|
||||
cs := bs[12:]
|
||||
|
||||
decoded := [14]byte{}
|
||||
for i, v := range acc[:] {
|
||||
decoded[i] = (v ^ cs[1]) ^ cs[0]
|
||||
}
|
||||
decoded[12] = cs[0]
|
||||
decoded[13] = cs[1]
|
||||
|
||||
return storagekey(hex.EncodeToString(decoded[:]))
|
||||
}
|
||||
|
||||
type SessionConfig struct {
|
||||
SessionTTL int64 `json:"session_ttl"`
|
||||
SessionStorage string `json:"session_storage"`
|
||||
}
|
||||
|
||||
var errInvalidScheme = errors.New("storageAddr is not valid scheme")
|
||||
var errSessionStorageMissing = errors.New("session_storageis missing")
|
||||
|
||||
func NewConsumer(ctx context.Context, storageAddr string, ttl time.Duration) (Consumer, error) {
|
||||
if strings.HasPrefix(storageAddr, "mongodb") {
|
||||
return newConsumerWithMongo(ctx, storageAddr, ttl)
|
||||
}
|
||||
|
||||
if strings.HasPrefix(storageAddr, "redis") {
|
||||
return newConsumerWithRedis(ctx, storageAddr, ttl)
|
||||
}
|
||||
|
||||
return nil, errInvalidScheme
|
||||
}
|
||||
|
||||
func NewConsumerWithConfig(ctx context.Context, cfg SessionConfig) (Consumer, error) {
|
||||
if len(cfg.SessionStorage) == 0 {
|
||||
return nil, errSessionStorageMissing
|
||||
}
|
||||
|
||||
if cfg.SessionTTL == 0 {
|
||||
cfg.SessionTTL = 3600
|
||||
}
|
||||
return NewConsumer(ctx, cfg.SessionStorage, time.Duration(cfg.SessionTTL)*time.Second)
|
||||
}
|
||||
|
||||
func NewProvider(ctx context.Context, storageAddr string, ttl time.Duration) (Provider, error) {
|
||||
if strings.HasPrefix(storageAddr, "mongodb") {
|
||||
return newProviderWithMongo(ctx, storageAddr, ttl)
|
||||
}
|
||||
|
||||
if strings.HasPrefix(storageAddr, "redis") {
|
||||
return newProviderWithRedis(ctx, storageAddr, ttl)
|
||||
}
|
||||
|
||||
return nil, errInvalidScheme
|
||||
}
|
||||
|
||||
func NewProviderWithConfig(ctx context.Context, cfg SessionConfig) (Provider, error) {
|
||||
if len(cfg.SessionStorage) == 0 {
|
||||
return nil, errSessionStorageMissing
|
||||
}
|
||||
|
||||
if cfg.SessionTTL == 0 {
|
||||
cfg.SessionTTL = 3600
|
||||
}
|
||||
return NewProvider(ctx, cfg.SessionStorage, time.Duration(cfg.SessionTTL)*time.Second)
|
||||
}
|
||||
56
session/consumer_common.go
Normal file
56
session/consumer_common.go
Normal file
@ -0,0 +1,56 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type cache_stage[T any] struct {
|
||||
cache map[storagekey]T
|
||||
deleted map[storagekey]bool
|
||||
}
|
||||
|
||||
func make_cache_stage[T any]() *cache_stage[T] {
|
||||
return &cache_stage[T]{
|
||||
cache: make(map[storagekey]T),
|
||||
deleted: make(map[storagekey]bool),
|
||||
}
|
||||
}
|
||||
|
||||
type consumer_common[T any] struct {
|
||||
lock sync.Mutex
|
||||
ttl time.Duration
|
||||
ctx context.Context
|
||||
stages [2]*cache_stage[T]
|
||||
startTime time.Time
|
||||
}
|
||||
|
||||
func (c *consumer_common[T]) add_internal(sk storagekey, si T) {
|
||||
c.stages[0].cache[sk] = si
|
||||
delete(c.stages[0].deleted, sk)
|
||||
c.stages[1].cache[sk] = si
|
||||
delete(c.stages[1].deleted, sk)
|
||||
}
|
||||
|
||||
func (c *consumer_common[T]) delete_internal(sk storagekey) {
|
||||
delete(c.stages[0].cache, sk)
|
||||
c.stages[0].deleted[sk] = true
|
||||
delete(c.stages[1].cache, sk)
|
||||
c.stages[1].deleted[sk] = true
|
||||
}
|
||||
|
||||
func (c *consumer_common[T]) delete(sk storagekey) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
c.delete_internal(sk)
|
||||
}
|
||||
|
||||
func (c *consumer_common[T]) changeStage() {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
c.stages[1] = c.stages[0]
|
||||
c.stages[0] = make_cache_stage[T]()
|
||||
}
|
||||
349
session/impl_mongo.go
Normal file
349
session/impl_mongo.go
Normal file
@ -0,0 +1,349 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
session_collection_name = gocommon.CollectionName("session")
|
||||
)
|
||||
|
||||
type provider_mongo struct {
|
||||
mongoClient gocommon.MongoClient
|
||||
}
|
||||
|
||||
type sessionMongo struct {
|
||||
Id primitive.ObjectID `bson:"_id,omitempty"`
|
||||
Auth *Authorization `bson:"auth"`
|
||||
Key storagekey `bson:"key"`
|
||||
Ts primitive.DateTime `bson:"_ts"`
|
||||
}
|
||||
|
||||
func newProviderWithMongo(ctx context.Context, mongoUrl string, ttl time.Duration) (Provider, error) {
|
||||
mc, err := gocommon.NewMongoClient(ctx, mongoUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = mc.MakeUniqueIndices(session_collection_name, map[string]bson.D{
|
||||
"key": {{Key: "key", Value: 1}},
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := mc.MakeExpireIndex(session_collection_name, int32(ttl.Seconds())); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &provider_mongo{
|
||||
mongoClient: mc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *provider_mongo) New(input *Authorization) (string, error) {
|
||||
sk := make_storagekey(input.Account)
|
||||
|
||||
_, _, err := p.mongoClient.Update(session_collection_name, bson.M{
|
||||
"_id": input.Account,
|
||||
}, bson.M{
|
||||
"$set": sessionMongo{
|
||||
Auth: input,
|
||||
Key: sk,
|
||||
Ts: primitive.NewDateTimeFromTime(time.Now().UTC()),
|
||||
},
|
||||
}, options.Update().SetUpsert(true))
|
||||
|
||||
return string(storagekey_to_publickey(sk)), err
|
||||
}
|
||||
|
||||
func (p *provider_mongo) Delete(acc primitive.ObjectID) error {
|
||||
_, err := p.mongoClient.Delete(session_collection_name, bson.M{
|
||||
"_id": acc,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *provider_mongo) Query(pk string) (Authorization, error) {
|
||||
sk := publickey_to_storagekey(publickey(pk))
|
||||
var auth Authorization
|
||||
err := p.mongoClient.FindOneAs(session_collection_name, bson.M{
|
||||
"key": sk,
|
||||
}, &auth)
|
||||
|
||||
return auth, err
|
||||
}
|
||||
|
||||
func (p *provider_mongo) Touch(pk string) (bool, error) {
|
||||
sk := publickey_to_storagekey(publickey(pk))
|
||||
worked, _, err := p.mongoClient.Update(session_collection_name, bson.M{
|
||||
"key": sk,
|
||||
}, bson.M{
|
||||
"$currentDate": bson.M{
|
||||
"_ts": bson.M{"$type": "date"},
|
||||
},
|
||||
}, options.Update().SetUpsert(false))
|
||||
|
||||
if err != nil {
|
||||
logger.Println("provider Touch :", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
return worked, nil
|
||||
}
|
||||
|
||||
type consumer_mongo struct {
|
||||
consumer_common[*sessionMongo]
|
||||
ids map[primitive.ObjectID]storagekey
|
||||
mongoClient gocommon.MongoClient
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
type sessionPipelineDocument struct {
|
||||
OperationType string `bson:"operationType"`
|
||||
DocumentKey struct {
|
||||
Id primitive.ObjectID `bson:"_id"`
|
||||
} `bson:"documentKey"`
|
||||
Session *sessionMongo `bson:"fullDocument"`
|
||||
}
|
||||
|
||||
func newConsumerWithMongo(ctx context.Context, mongoUrl string, ttl time.Duration) (Consumer, error) {
|
||||
mc, err := gocommon.NewMongoClient(ctx, mongoUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
consumer := &consumer_mongo{
|
||||
consumer_common: consumer_common[*sessionMongo]{
|
||||
ttl: ttl,
|
||||
ctx: ctx,
|
||||
stages: [2]*cache_stage[*sessionMongo]{make_cache_stage[*sessionMongo](), make_cache_stage[*sessionMongo]()},
|
||||
startTime: time.Now(),
|
||||
},
|
||||
ids: make(map[primitive.ObjectID]storagekey),
|
||||
ttl: ttl,
|
||||
mongoClient: mc,
|
||||
}
|
||||
|
||||
go func() {
|
||||
matchStage := bson.D{
|
||||
{
|
||||
Key: "$match", Value: bson.D{
|
||||
{Key: "operationType", Value: bson.D{
|
||||
{Key: "$in", Value: bson.A{
|
||||
"delete",
|
||||
"insert",
|
||||
"update",
|
||||
}},
|
||||
}},
|
||||
},
|
||||
}}
|
||||
projectStage := bson.D{
|
||||
{
|
||||
Key: "$project", Value: bson.D{
|
||||
{Key: "documentKey", Value: 1},
|
||||
{Key: "operationType", Value: 1},
|
||||
{Key: "fullDocument", Value: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var stream *mongo.ChangeStream
|
||||
nextswitch := time.Now().Add(ttl)
|
||||
for {
|
||||
if stream == nil {
|
||||
stream, err = mc.Watch(session_collection_name, mongo.Pipeline{matchStage, projectStage})
|
||||
if err != nil {
|
||||
logger.Error("watchAuthCollection watch failed :", err)
|
||||
time.Sleep(time.Minute)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
changed := stream.TryNext(ctx)
|
||||
if ctx.Err() != nil {
|
||||
logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
|
||||
break
|
||||
}
|
||||
|
||||
if changed {
|
||||
var data sessionPipelineDocument
|
||||
if err := stream.Decode(&data); err == nil {
|
||||
ot := data.OperationType
|
||||
switch ot {
|
||||
case "insert":
|
||||
consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session)
|
||||
case "update":
|
||||
if data.Session == nil {
|
||||
consumer.deleteById(data.DocumentKey.Id)
|
||||
} else {
|
||||
consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session)
|
||||
}
|
||||
case "delete":
|
||||
consumer.deleteById(data.DocumentKey.Id)
|
||||
}
|
||||
} else {
|
||||
logger.Error("watchAuthCollection stream.Decode failed :", err)
|
||||
}
|
||||
} else if stream.Err() != nil || stream.ID() == 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Println("watchAuthCollection is done")
|
||||
stream.Close(ctx)
|
||||
return
|
||||
|
||||
case <-time.After(time.Second):
|
||||
logger.Error("watchAuthCollection stream error :", stream.Err())
|
||||
stream.Close(ctx)
|
||||
stream = nil
|
||||
}
|
||||
} else {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
for now.After(nextswitch) {
|
||||
consumer.changeStage()
|
||||
nextswitch = nextswitch.Add(ttl)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return consumer, nil
|
||||
}
|
||||
|
||||
func (c *consumer_mongo) query_internal(sk storagekey) (*sessionMongo, bool, error) {
|
||||
if _, deleted := c.stages[0].deleted[sk]; deleted {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
if _, deleted := c.stages[1].deleted[sk]; deleted {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
found, ok := c.stages[0].cache[sk]
|
||||
if !ok {
|
||||
found, ok = c.stages[1].cache[sk]
|
||||
}
|
||||
|
||||
if ok {
|
||||
return found, false, nil
|
||||
}
|
||||
|
||||
var si sessionMongo
|
||||
err := c.mongoClient.FindOneAs(session_collection_name, bson.M{
|
||||
"key": sk,
|
||||
}, &si)
|
||||
|
||||
if err != nil {
|
||||
logger.Println("consumer Query :", err)
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if len(si.Key) > 0 {
|
||||
siptr := &si
|
||||
c.add_internal(sk, siptr)
|
||||
return siptr, true, nil
|
||||
}
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
func (c *consumer_mongo) Query(pk string) (Authorization, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
sk := publickey_to_storagekey(publickey(pk))
|
||||
si, _, err := c.query_internal(sk)
|
||||
if err != nil {
|
||||
return Authorization{}, err
|
||||
}
|
||||
|
||||
if si == nil {
|
||||
return Authorization{}, nil
|
||||
}
|
||||
|
||||
if time.Now().After(si.Ts.Time().Add(c.ttl)) {
|
||||
return Authorization{}, nil
|
||||
}
|
||||
|
||||
return *si.Auth, nil
|
||||
}
|
||||
|
||||
func (c *consumer_mongo) Touch(pk string) (Authorization, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
sk := publickey_to_storagekey(publickey(pk))
|
||||
worked, _, err := c.mongoClient.Update(session_collection_name, bson.M{
|
||||
"key": sk,
|
||||
}, bson.M{
|
||||
"$currentDate": bson.M{
|
||||
"_ts": bson.M{"$type": "date"},
|
||||
},
|
||||
}, options.Update().SetUpsert(false))
|
||||
|
||||
if err != nil {
|
||||
logger.Println("consumer Touch :", err)
|
||||
return Authorization{}, err
|
||||
}
|
||||
|
||||
if !worked {
|
||||
// 이미 만료되서 사라짐
|
||||
return Authorization{}, nil
|
||||
}
|
||||
|
||||
si, added, err := c.query_internal(sk)
|
||||
if err != nil {
|
||||
return Authorization{}, err
|
||||
}
|
||||
|
||||
if si == nil {
|
||||
return Authorization{}, nil
|
||||
}
|
||||
|
||||
if !added {
|
||||
var doc sessionMongo
|
||||
err := c.mongoClient.FindOneAs(session_collection_name, bson.M{
|
||||
"key": sk,
|
||||
}, &doc)
|
||||
|
||||
if err != nil {
|
||||
logger.Println("consumer Query :", err)
|
||||
return Authorization{}, err
|
||||
}
|
||||
|
||||
if len(si.Key) > 0 {
|
||||
c.add_internal(sk, &doc)
|
||||
c.ids[doc.Id] = sk
|
||||
|
||||
return *doc.Auth, nil
|
||||
}
|
||||
}
|
||||
|
||||
return *si.Auth, nil
|
||||
}
|
||||
|
||||
func (c *consumer_mongo) add(sk storagekey, id primitive.ObjectID, si *sessionMongo) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
c.consumer_common.add_internal(sk, si)
|
||||
c.ids[id] = sk
|
||||
}
|
||||
|
||||
func (c *consumer_mongo) deleteById(id primitive.ObjectID) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
if sk, ok := c.ids[id]; ok {
|
||||
c.consumer_common.delete_internal(sk)
|
||||
delete(c.ids, id)
|
||||
}
|
||||
}
|
||||
297
session/impl_redis.go
Normal file
297
session/impl_redis.go
Normal file
@ -0,0 +1,297 @@
|
||||
package session
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"repositories.action2quare.com/ayo/gocommon"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
communication_channel_name_prefix = "_sess_comm_chan_name"
|
||||
)
|
||||
|
||||
type sessionRedis struct {
|
||||
*Authorization
|
||||
expireAt time.Time
|
||||
}
|
||||
|
||||
type provider_redis struct {
|
||||
redisClient *redis.Client
|
||||
deleteChannel string
|
||||
ttl time.Duration
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func newProviderWithRedis(ctx context.Context, redisUrl string, ttl time.Duration) (Provider, error) {
|
||||
redisClient, err := gocommon.NewRedisClient(redisUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &provider_redis{
|
||||
redisClient: redisClient,
|
||||
deleteChannel: fmt.Sprintf("%s_%d", communication_channel_name_prefix, redisClient.Options().DB),
|
||||
ttl: ttl,
|
||||
ctx: ctx,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *provider_redis) New(input *Authorization) (string, error) {
|
||||
bt, err := json.Marshal(input)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
sk := make_storagekey(input.Account)
|
||||
_, err = p.redisClient.SetEX(p.ctx, string(sk), bt, p.ttl).Result()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
pk := storagekey_to_publickey(sk)
|
||||
|
||||
logger.Println("session provider new :", sk, pk)
|
||||
|
||||
return string(pk), err
|
||||
}
|
||||
|
||||
func (p *provider_redis) Delete(account primitive.ObjectID) error {
|
||||
prefix := account.Hex()
|
||||
sks, err := p.redisClient.Keys(p.ctx, prefix+"*").Result()
|
||||
if err != nil {
|
||||
logger.Println("session provider delete :", sks, err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, sk := range sks {
|
||||
logger.Println("session provider delete :", sk)
|
||||
p.redisClient.Del(p.ctx, sk).Result()
|
||||
p.redisClient.Publish(p.ctx, p.deleteChannel, sk).Result()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *provider_redis) Query(pk string) (Authorization, error) {
|
||||
sk := publickey_to_storagekey(publickey(pk))
|
||||
payload, err := p.redisClient.Get(p.ctx, string(sk)).Result()
|
||||
if err == redis.Nil {
|
||||
logger.Println("session provider query :", pk, err)
|
||||
return Authorization{}, nil
|
||||
} else if err != nil {
|
||||
logger.Println("session provider query :", pk, err)
|
||||
return Authorization{}, err
|
||||
}
|
||||
|
||||
var auth Authorization
|
||||
if err := json.Unmarshal([]byte(payload), &auth); err != nil {
|
||||
logger.Println("session provider query :", pk, err)
|
||||
return Authorization{}, err
|
||||
}
|
||||
|
||||
logger.Println("session provider query :", pk, auth)
|
||||
|
||||
return auth, nil
|
||||
}
|
||||
|
||||
func (p *provider_redis) Touch(pk string) (bool, error) {
|
||||
sk := publickey_to_storagekey(publickey(pk))
|
||||
ok, err := p.redisClient.Expire(p.ctx, string(sk), p.ttl).Result()
|
||||
logger.Println("session provider touch :", pk)
|
||||
|
||||
if err == redis.Nil {
|
||||
// 이미 만료됨
|
||||
logger.Println("session consumer touch :", pk, err)
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
logger.Println("session consumer touch :", pk, err)
|
||||
return false, err
|
||||
}
|
||||
logger.Println("session consumer touch :", pk)
|
||||
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
type consumer_redis struct {
|
||||
consumer_common[*sessionRedis]
|
||||
redisClient *redis.Client
|
||||
}
|
||||
|
||||
func newConsumerWithRedis(ctx context.Context, redisUrl string, ttl time.Duration) (Consumer, error) {
|
||||
redisClient, err := gocommon.NewRedisClient(redisUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
consumer := &consumer_redis{
|
||||
consumer_common: consumer_common[*sessionRedis]{
|
||||
ttl: ttl,
|
||||
ctx: ctx,
|
||||
stages: [2]*cache_stage[*sessionRedis]{make_cache_stage[*sessionRedis](), make_cache_stage[*sessionRedis]()},
|
||||
startTime: time.Now(),
|
||||
},
|
||||
redisClient: redisClient,
|
||||
}
|
||||
|
||||
deleteChannel := communication_channel_name_prefix + "_d"
|
||||
sub := redisClient.Subscribe(ctx, deleteChannel)
|
||||
|
||||
go func() {
|
||||
stageswitch := time.Now().Add(ttl)
|
||||
tickTimer := time.After(ttl)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-tickTimer:
|
||||
consumer.changeStage()
|
||||
stageswitch = stageswitch.Add(ttl)
|
||||
tempttl := time.Until(stageswitch)
|
||||
tickTimer = time.After(tempttl)
|
||||
|
||||
case msg := <-sub.Channel():
|
||||
if msg == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if len(msg.Payload) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
switch msg.Channel {
|
||||
case deleteChannel:
|
||||
sk := storagekey(msg.Payload)
|
||||
consumer.delete(sk)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return consumer, nil
|
||||
}
|
||||
|
||||
func (c *consumer_redis) query_internal(sk storagekey) (*sessionRedis, bool, error) {
|
||||
if _, deleted := c.stages[0].deleted[sk]; deleted {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
if _, deleted := c.stages[1].deleted[sk]; deleted {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
found, ok := c.stages[0].cache[sk]
|
||||
if !ok {
|
||||
found, ok = c.stages[1].cache[sk]
|
||||
}
|
||||
|
||||
if ok {
|
||||
if time.Now().Before(found.expireAt) {
|
||||
// 만료전 세션
|
||||
return found, false, nil
|
||||
}
|
||||
|
||||
// 다른 Consumer가 Touch했을 수도 있으므로 redis에서 읽어본다.
|
||||
}
|
||||
|
||||
payload, err := c.redisClient.Get(c.ctx, string(sk)).Result()
|
||||
if err != nil && err != redis.Nil {
|
||||
logger.Println("consumer Query :", err)
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if len(payload) == 0 {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
var auth Authorization
|
||||
if err := json.Unmarshal([]byte(payload), &auth); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
ttl, err := c.redisClient.TTL(c.ctx, string(sk)).Result()
|
||||
if err != nil {
|
||||
logger.Println("consumer Query :", err)
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
si := &sessionRedis{
|
||||
Authorization: &auth,
|
||||
expireAt: time.Now().Add(ttl),
|
||||
}
|
||||
c.add_internal(sk, si)
|
||||
|
||||
return si, true, nil
|
||||
}
|
||||
|
||||
func (c *consumer_redis) Query(pk string) (Authorization, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
sk := publickey_to_storagekey(publickey(pk))
|
||||
si, _, err := c.query_internal(sk)
|
||||
if err != nil {
|
||||
logger.Println("session consumer query :", pk, err)
|
||||
return Authorization{}, err
|
||||
}
|
||||
|
||||
if si == nil {
|
||||
logger.Println("session consumer query(si nil) :", pk, nil)
|
||||
return Authorization{}, nil
|
||||
}
|
||||
|
||||
if time.Now().After(si.expireAt) {
|
||||
logger.Println("session consumer query(expired):", pk, nil)
|
||||
return Authorization{}, nil
|
||||
}
|
||||
|
||||
return *si.Authorization, nil
|
||||
}
|
||||
|
||||
func (c *consumer_redis) Touch(pk string) (Authorization, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
sk := publickey_to_storagekey(publickey(pk))
|
||||
ok, err := c.redisClient.Expire(c.ctx, string(sk), c.ttl).Result()
|
||||
if err == redis.Nil {
|
||||
logger.Println("session consumer touch :", pk, err)
|
||||
|
||||
return Authorization{}, nil
|
||||
} else if err != nil {
|
||||
logger.Println("session consumer touch :", pk, err)
|
||||
return Authorization{}, err
|
||||
}
|
||||
|
||||
if ok {
|
||||
// redis에 살아있다.
|
||||
si, added, err := c.query_internal(sk)
|
||||
if err != nil {
|
||||
logger.Println("session consumer touch(ok) :", pk, err)
|
||||
return Authorization{}, err
|
||||
}
|
||||
|
||||
if si == nil {
|
||||
logger.Println("session consumer touch(ok, si nil) :", pk)
|
||||
return Authorization{}, nil
|
||||
}
|
||||
|
||||
if !added {
|
||||
si.expireAt = time.Now().Add(c.ttl)
|
||||
// stage 0으로 옮기기 위해 add_internal을 다시 부름
|
||||
c.add_internal(sk, si)
|
||||
}
|
||||
|
||||
logger.Println("session consumer touch(ok) :", pk)
|
||||
return *si.Authorization, nil
|
||||
}
|
||||
|
||||
logger.Println("session consumer touch(!ok) :", pk)
|
||||
return Authorization{}, nil
|
||||
}
|
||||
93
session/session_test.go
Normal file
93
session/session_test.go
Normal file
@ -0,0 +1,93 @@
|
||||
// package main ...
|
||||
package session
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
)
|
||||
|
||||
func TestExpTable(t *testing.T) {
|
||||
// pv, err := NewProvider(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second)
|
||||
// if err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
|
||||
// cs, err := NewConsumer(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second)
|
||||
// if err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
|
||||
pv, err := NewProvider(context.Background(), "mongodb://192.168.8.94:27017/maingate?replicaSet=repl01&retrywrites=false", 10*time.Second)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
cs, err := NewConsumer(context.Background(), "mongodb://192.168.8.94:27017/maingate?replicaSet=repl01&retrywrites=false", 10*time.Second)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
test := primitive.NewObjectID()
|
||||
sk := make_storagekey(test)
|
||||
pk := storagekey_to_publickey(sk)
|
||||
if publickey_to_storagekey(pk) != sk {
|
||||
t.Errorf("pk != sk : %s, %s", pk, sk)
|
||||
}
|
||||
|
||||
au1 := &Authorization{
|
||||
Account: primitive.NewObjectID(),
|
||||
Platform: "editor",
|
||||
Uid: "uid-1",
|
||||
}
|
||||
sk1, err := pv.New(au1)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
au2 := &Authorization{
|
||||
Account: primitive.NewObjectID(),
|
||||
Platform: "editor",
|
||||
Uid: "uid-2",
|
||||
}
|
||||
sk2, err := pv.New(au2)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
q1, err := cs.Query(sk1)
|
||||
logger.Println("query :", q1, err)
|
||||
|
||||
q2, err := cs.Query(sk2)
|
||||
logger.Println("query :", q2, err)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
cs.Touch(sk1)
|
||||
time.Sleep(2 * time.Second)
|
||||
cs.Touch(sk2)
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
pv.Delete(au1.Account)
|
||||
|
||||
cs.Touch(sk1)
|
||||
time.Sleep(2 * time.Second)
|
||||
cs.Touch(sk2)
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
cs2, err := NewConsumer(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
q2, err := cs2.Query(sk2)
|
||||
logger.Println("queryf :", q2, err)
|
||||
time.Sleep(20 * time.Second)
|
||||
}
|
||||
185
wshandler/api_handler.go
Normal file
185
wshandler/api_handler.go
Normal file
@ -0,0 +1,185 @@
|
||||
package wshandler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"reflect"
|
||||
"strings"
|
||||
"unsafe"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
ClientConnected = "ClientConnected"
|
||||
ClientDisconnected = "ClientDisconnected"
|
||||
)
|
||||
|
||||
type apiFuncType func(ApiCallContext)
|
||||
type connFuncType func(*websocket.Conn, *Sender)
|
||||
type disconnFuncType func(string, *Sender)
|
||||
|
||||
type WebsocketApiHandler struct {
|
||||
methods map[string]apiFuncType
|
||||
connfunc connFuncType
|
||||
disconnfunc disconnFuncType
|
||||
originalReceiverName string
|
||||
}
|
||||
|
||||
type ApiCallContext struct {
|
||||
CallBy *Sender
|
||||
Arguments []any
|
||||
}
|
||||
|
||||
func MakeWebsocketApiHandler[T any](receiver *T, receiverName string) WebsocketApiHandler {
|
||||
methods := make(map[string]apiFuncType)
|
||||
|
||||
tp := reflect.TypeOf(receiver)
|
||||
if len(receiverName) == 0 {
|
||||
receiverName = tp.Elem().Name()
|
||||
}
|
||||
|
||||
var connfunc connFuncType
|
||||
var disconnfunc disconnFuncType
|
||||
|
||||
for i := 0; i < tp.NumMethod(); i++ {
|
||||
method := tp.Method(i)
|
||||
if method.Type.In(0) != tp {
|
||||
continue
|
||||
}
|
||||
|
||||
if method.Name == ClientConnected {
|
||||
if method.Type.NumIn() != 3 {
|
||||
continue
|
||||
}
|
||||
if method.Type.In(1) != reflect.TypeOf((*websocket.Conn)(nil)) {
|
||||
continue
|
||||
}
|
||||
if method.Type.In(2) != reflect.TypeOf((*Sender)(nil)) {
|
||||
continue
|
||||
}
|
||||
funcptr := method.Func.Pointer()
|
||||
p1 := unsafe.Pointer(&funcptr)
|
||||
p2 := unsafe.Pointer(&p1)
|
||||
connfuncptr := (*func(*T, *websocket.Conn, *Sender))(p2)
|
||||
|
||||
connfunc = func(c *websocket.Conn, s *Sender) {
|
||||
(*connfuncptr)(receiver, c, s)
|
||||
}
|
||||
} else if method.Name == ClientDisconnected {
|
||||
if method.Type.NumIn() != 3 {
|
||||
continue
|
||||
}
|
||||
if method.Type.In(1) != reflect.TypeOf("") {
|
||||
continue
|
||||
}
|
||||
if method.Type.In(2) != reflect.TypeOf((*Sender)(nil)) {
|
||||
continue
|
||||
}
|
||||
funcptr := method.Func.Pointer()
|
||||
p1 := unsafe.Pointer(&funcptr)
|
||||
p2 := unsafe.Pointer(&p1)
|
||||
disconnfuncptr := (*func(*T, string, *Sender))(p2)
|
||||
|
||||
disconnfunc = func(msg string, s *Sender) {
|
||||
(*disconnfuncptr)(receiver, msg, s)
|
||||
}
|
||||
} else {
|
||||
if method.Type.NumIn() != 2 {
|
||||
continue
|
||||
}
|
||||
if method.Type.In(1) != reflect.TypeOf((*ApiCallContext)(nil)).Elem() {
|
||||
continue
|
||||
}
|
||||
|
||||
funcptr := method.Func.Pointer()
|
||||
p1 := unsafe.Pointer(&funcptr)
|
||||
p2 := unsafe.Pointer(&p1)
|
||||
apifuncptr := (*func(*T, ApiCallContext))(p2)
|
||||
methods[receiverName+"."+method.Name] = func(ctx ApiCallContext) {
|
||||
(*apifuncptr)(receiver, ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return WebsocketApiHandler{
|
||||
methods: methods,
|
||||
connfunc: connfunc,
|
||||
disconnfunc: disconnfunc,
|
||||
originalReceiverName: tp.Elem().Name(),
|
||||
}
|
||||
}
|
||||
|
||||
type WebsocketApiBroker struct {
|
||||
methods map[string]apiFuncType
|
||||
methods_dup map[string][]apiFuncType
|
||||
connFuncs []connFuncType
|
||||
disconnFuncs []disconnFuncType
|
||||
}
|
||||
|
||||
func (hc *WebsocketApiBroker) AddHandler(receiver WebsocketApiHandler) {
|
||||
if hc.methods == nil {
|
||||
hc.methods = make(map[string]apiFuncType)
|
||||
hc.methods_dup = make(map[string][]apiFuncType)
|
||||
}
|
||||
|
||||
for k, v := range receiver.methods {
|
||||
ab := strings.Split(k, ".")
|
||||
logger.Printf("ws api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k)
|
||||
|
||||
hc.methods_dup[k] = append(hc.methods_dup[k], v)
|
||||
if len(hc.methods_dup[k]) > 1 {
|
||||
chain := hc.methods_dup[k]
|
||||
hc.methods[k] = func(ctx ApiCallContext) {
|
||||
for _, f := range chain {
|
||||
f(ctx)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
hc.methods[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if receiver.connfunc != nil {
|
||||
logger.Printf("ws api registered : %s.ClientConnected\n", receiver.originalReceiverName)
|
||||
hc.connFuncs = append(hc.connFuncs, receiver.connfunc)
|
||||
}
|
||||
|
||||
if receiver.disconnfunc != nil {
|
||||
// disconnfunc은 역순
|
||||
logger.Printf("ws api registered : %s.ClientDisconnected\n", receiver.originalReceiverName)
|
||||
hc.disconnFuncs = append([]disconnFuncType{receiver.disconnfunc}, hc.disconnFuncs...)
|
||||
}
|
||||
}
|
||||
|
||||
func (hc *WebsocketApiBroker) ClientConnected(c *wsconn) {
|
||||
for _, v := range hc.connFuncs {
|
||||
v(c.Conn, c.sender)
|
||||
}
|
||||
}
|
||||
|
||||
func (hc *WebsocketApiBroker) ClientDisconnected(c *wsconn) {
|
||||
for _, v := range hc.disconnFuncs {
|
||||
v(c.closeMessage, c.sender)
|
||||
}
|
||||
}
|
||||
|
||||
func (hc *WebsocketApiBroker) Call(callby *Sender, funcname string, r io.Reader) {
|
||||
if found := hc.methods[funcname]; found != nil {
|
||||
var args []any
|
||||
if r != nil {
|
||||
dec := json.NewDecoder(r)
|
||||
if err := dec.Decode(&args); err != nil {
|
||||
logger.Println("WebsocketApiBroker.Call failed. decode returns err :", err)
|
||||
}
|
||||
}
|
||||
|
||||
found(ApiCallContext{
|
||||
CallBy: callby,
|
||||
Arguments: args,
|
||||
})
|
||||
} else {
|
||||
logger.Println("api is not found :", funcname)
|
||||
}
|
||||
}
|
||||
30
wshandler/api_handler_test.go
Normal file
30
wshandler/api_handler_test.go
Normal file
@ -0,0 +1,30 @@
|
||||
// package main ...
|
||||
package wshandler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type TestReceiver struct {
|
||||
}
|
||||
|
||||
func (tr *TestReceiver) Func1([]any) {
|
||||
|
||||
}
|
||||
|
||||
func (tr *TestReceiver) Func2(args []any) {
|
||||
fmt.Println(args...)
|
||||
}
|
||||
|
||||
func TestExpTable(t *testing.T) {
|
||||
// src := []any{"a", 1, false}
|
||||
// payload, _ := json.Marshal(src)
|
||||
|
||||
// tr := new(TestReceiver)
|
||||
// receiver := MakeWebsocketApiHandler(tr, "test")
|
||||
|
||||
// var con WebsocketApiBroker
|
||||
// con.AddHandler(receiver)
|
||||
|
||||
}
|
||||
104
wshandler/room.go
Normal file
104
wshandler/room.go
Normal file
@ -0,0 +1,104 @@
|
||||
package wshandler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"repositories.action2quare.com/ayo/gocommon/logger"
|
||||
)
|
||||
|
||||
type room struct {
|
||||
inChan chan *wsconn
|
||||
outChan chan primitive.ObjectID
|
||||
messageChan chan *UpstreamMessage
|
||||
name string
|
||||
destroyChan chan<- string
|
||||
sendMsgChan chan<- send_msg_queue_elem
|
||||
}
|
||||
|
||||
// 만약 destroyChan가 nil이면 room이 비어도 파괴되지 않는다. 영구 유지되는 room
|
||||
func makeRoom(name string, destroyChan chan<- string, sendMsgChan chan<- send_msg_queue_elem) *room {
|
||||
return &room{
|
||||
inChan: make(chan *wsconn, 10),
|
||||
outChan: make(chan primitive.ObjectID, 10),
|
||||
messageChan: make(chan *UpstreamMessage, 100),
|
||||
name: name,
|
||||
destroyChan: destroyChan,
|
||||
sendMsgChan: sendMsgChan,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *room) broadcast(msg *UpstreamMessage) {
|
||||
r.messageChan <- msg
|
||||
}
|
||||
|
||||
func (r *room) in(conn *wsconn) *room {
|
||||
r.inChan <- conn
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *room) out(accid primitive.ObjectID) *room {
|
||||
r.outChan <- accid
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *room) start(ctx context.Context) {
|
||||
go func(ctx context.Context) {
|
||||
conns := make(map[string]*wsconn)
|
||||
normal := false
|
||||
for !normal {
|
||||
normal = r.loop(ctx, &conns)
|
||||
}
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd bool) {
|
||||
defer func() {
|
||||
s := recover()
|
||||
if s != nil {
|
||||
logger.Error(s)
|
||||
normalEnd = false
|
||||
}
|
||||
}()
|
||||
|
||||
tag := "#" + r.name
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
|
||||
case conn := <-r.inChan:
|
||||
(*conns)[conn.sender.Accid.Hex()] = conn
|
||||
|
||||
case accid := <-r.outChan:
|
||||
delete((*conns), accid.Hex())
|
||||
if len(*conns) == 0 && r.destroyChan != nil {
|
||||
r.destroyChan <- r.name
|
||||
return true
|
||||
}
|
||||
|
||||
case msg := <-r.messageChan:
|
||||
ds := DownstreamMessage{
|
||||
Alias: msg.Alias,
|
||||
Body: msg.Body,
|
||||
Tag: append(msg.Tag, tag),
|
||||
}
|
||||
|
||||
buff := new(bytes.Buffer)
|
||||
enc := json.NewEncoder(buff)
|
||||
enc.SetEscapeHTML(false)
|
||||
enc.Encode(ds)
|
||||
|
||||
for _, conn := range *conns {
|
||||
r.sendMsgChan <- send_msg_queue_elem{
|
||||
to: conn,
|
||||
mt: websocket.TextMessage,
|
||||
msg: buff.Bytes(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user