Compare commits

12 Commits

21 changed files with 551 additions and 2681 deletions

View File

@ -30,7 +30,7 @@ type Authinfo struct {
} }
const ( const (
sessionSyncChannelNamePrefix = "session-sync-channel2" sessionSyncChannelName = "session-sync-channel2"
) )
type AuthinfoCell interface { type AuthinfoCell interface {
@ -99,8 +99,6 @@ func (ac *redisAuthCell) ToBytes() []byte {
func newAuthCollectionWithRedis(redisClient *redis.Client, subctx context.Context, maingateURL string, apiToken string) *AuthCollection { func newAuthCollectionWithRedis(redisClient *redis.Client, subctx context.Context, maingateURL string, apiToken string) *AuthCollection {
sessionTTL := int64(3600) sessionTTL := int64(3600)
ac := MakeAuthCollection(time.Duration(sessionTTL * int64(time.Second))) ac := MakeAuthCollection(time.Duration(sessionTTL * int64(time.Second)))
sessionSyncChannelName := fmt.Sprintf("%s-%d", sessionSyncChannelNamePrefix, redisClient.Options().DB)
pubsub := redisClient.Subscribe(subctx, sessionSyncChannelName) pubsub := redisClient.Subscribe(subctx, sessionSyncChannelName)
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
go func(ctx context.Context, sub *redis.PubSub, authCache *AuthCollection) { go func(ctx context.Context, sub *redis.PubSub, authCache *AuthCollection) {
@ -207,7 +205,7 @@ func (acg *AuthCollectionGlobal) Reload(context context.Context) error {
for r, url := range config.RegionStorage { for r, url := range config.RegionStorage {
if _, ok := oldval[r]; !ok { if _, ok := oldval[r]; !ok {
// 새로 생겼네 // 새로 생겼네
redisClient, err := NewRedisClient(url.Redis["session"]) redisClient, err := NewRedisClient(url.Redis.URL, url.Redis.Offset["session"])
if err != nil { if err != nil {
return err return err
} }
@ -230,7 +228,7 @@ func NewAuthCollectionGlobal(context context.Context, apiToken string) (AuthColl
output := make(map[string]*AuthCollection) output := make(map[string]*AuthCollection)
for region, url := range config.RegionStorage { for region, url := range config.RegionStorage {
redisClient, err := NewRedisClient(url.Redis["session"]) redisClient, err := NewRedisClient(url.Redis.URL, url.Redis.Offset["session"])
if err != nil { if err != nil {
return AuthCollectionGlobal{}, err return AuthCollectionGlobal{}, err
} }

View File

@ -18,6 +18,8 @@ import (
"repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
"go.mongodb.org/mongo-driver/bson"
) )
var linkupdate = flagx.String("updatelink", "", "") var linkupdate = flagx.String("updatelink", "", "")
@ -237,3 +239,17 @@ func ReplyUpdateComplete() {
// return (uint32(b[0]) << 0) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24) // return (uint32(b[0]) << 0) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
// } // }
type BsonMarshaler[T any] struct {
val T
}
func NewBsonMarshaler[T any](val T) *BsonMarshaler[T] {
return &BsonMarshaler[T]{
val: val,
}
}
func (m *BsonMarshaler[T]) MarshalBinary() (data []byte, err error) {
return bson.Marshal(m.val)
}

View File

@ -1,6 +1,7 @@
package coupon package coupon
import ( import (
"crypto/md5"
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"math/rand" "math/rand"
@ -41,8 +42,12 @@ func DisolveCouponCode(code string) (round string, key string) {
} }
func MakeCouponRoundHash(name string) (hash string, roundNumber uint32) { func MakeCouponRoundHash(name string) (hash string, roundNumber uint32) {
m5 := md5.New()
m5.Write([]byte(name))
hashbt := m5.Sum(nil)
roundbt := make([]byte, 8) roundbt := make([]byte, 8)
copy(roundbt, []byte(strings.ToLower(name))) copy(roundbt, hashbt[:8])
roundseed := int64(binary.BigEndian.Uint64(roundbt)) roundseed := int64(binary.BigEndian.Uint64(roundbt))
roundhash := make([]byte, 4) roundhash := make([]byte, 4)

View File

@ -1,7 +1,6 @@
package flagx package flagx
import ( import (
"encoding"
"flag" "flag"
"fmt" "fmt"
"os" "os"
@ -125,9 +124,6 @@ func Duration(name string, value time.Duration, usage string) *time.Duration {
return findProperFlagSet(name).Duration(name, value, usage) return findProperFlagSet(name).Duration(name, value, usage)
} }
func TextVar(p encoding.TextUnmarshaler, name string, value encoding.TextMarshaler, usage string) {
findProperFlagSet(name).TextVar(p, name, value, usage)
}
func Func(name, usage string, fn func(string) error) { func Func(name, usage string, fn func(string) error) {
findProperFlagSet(name).Func(name, usage, fn) findProperFlagSet(name).Func(name, usage, fn)
} }

4
go.mod
View File

@ -1,6 +1,8 @@
module repositories.action2quare.com/ayo/gocommon module repositories.action2quare.com/ayo/gocommon
go 1.20 go 1.18
replace repositories.action2quare.com/ayo/gocommon => ./
require ( require (
github.com/go-redis/redis/v8 v8.11.5 github.com/go-redis/redis/v8 v8.11.5

View File

@ -16,7 +16,7 @@ var txSetArgs redis.SetArgs = redis.SetArgs{
} }
type LockerWithRedis struct { type LockerWithRedis struct {
locked_key string key string
} }
var ErrTransactionLocked = errors.New("transaction is already locked") var ErrTransactionLocked = errors.New("transaction is already locked")
@ -34,10 +34,10 @@ func (locker *LockerWithRedis) Lock(rc *redis.Client, key string) error {
return ErrTransactionLocked return ErrTransactionLocked
} }
locker.locked_key = key locker.key = key
return nil return nil
} }
func (locker *LockerWithRedis) Unlock(rc *redis.Client) { func (locker *LockerWithRedis) Unlock(rc *redis.Client) {
rc.Del(context.Background(), locker.locked_key).Result() rc.Del(context.Background(), locker.key).Result()
} }

View File

@ -7,12 +7,12 @@ import (
"log" "log"
"os" "os"
"path" "path"
"runtime"
"runtime/debug" "runtime/debug"
"strings" "strings"
) )
var stdlogger *log.Logger var stdlogger *log.Logger
var errlogger *log.Logger
var _ = flag.Bool("logfile", false, "") var _ = flag.Bool("logfile", false, "")
func init() { func init() {
@ -20,7 +20,9 @@ func init() {
binname := path.Base(strings.ReplaceAll(binpath, "\\", "/")) binname := path.Base(strings.ReplaceAll(binpath, "\\", "/"))
var outWriter io.Writer var outWriter io.Writer
var errWriter io.Writer
outWriter = os.Stdout outWriter = os.Stdout
errWriter = os.Stderr
args := os.Args args := os.Args
useLogFile := false useLogFile := false
@ -44,9 +46,11 @@ func init() {
} }
outWriter = io.MultiWriter(outWriter, logFile) outWriter = io.MultiWriter(outWriter, logFile)
errWriter = io.MultiWriter(errWriter, logFile)
} }
stdlogger = log.New(outWriter, "", log.LstdFlags) stdlogger = log.New(outWriter, "", log.LstdFlags)
errlogger = log.New(errWriter, "", log.LstdFlags)
} }
func Println(v ...interface{}) { func Println(v ...interface{}) {
@ -58,83 +62,42 @@ func Printf(format string, v ...interface{}) {
} }
func Error(v ...interface{}) { func Error(v ...interface{}) {
stdlogger.Output(2, fmt.Sprintln(v...)) errlogger.Output(2, fmt.Sprintln(v...))
stdlogger.Output(2, string(debug.Stack())) errlogger.Output(2, string(debug.Stack()))
} }
func Errorf(format string, v ...interface{}) { func Errorf(format string, v ...interface{}) {
stdlogger.Output(2, fmt.Sprintf(format, v...)) errlogger.Output(2, fmt.Sprintf(format, v...))
stdlogger.Output(2, string(debug.Stack())) errlogger.Output(2, string(debug.Stack()))
} }
func Fatal(v ...interface{}) { func Fatal(v ...interface{}) {
stdlogger.Output(2, fmt.Sprint(v...)) errlogger.Output(2, fmt.Sprint(v...))
stdlogger.Output(2, string(debug.Stack())) errlogger.Output(2, string(debug.Stack()))
os.Exit(1) os.Exit(1)
} }
func Fatalln(v ...interface{}) { func Fatalln(v ...interface{}) {
stdlogger.Output(2, fmt.Sprintln(v...)) errlogger.Output(2, fmt.Sprintln(v...))
stdlogger.Output(2, string(debug.Stack())) errlogger.Output(2, string(debug.Stack()))
os.Exit(1) os.Exit(1)
} }
func Panic(v ...interface{}) { func Panic(v ...interface{}) {
s := fmt.Sprint(v...) s := fmt.Sprint(v...)
stdlogger.Output(2, s) errlogger.Output(2, s)
stdlogger.Output(2, string(debug.Stack())) errlogger.Output(2, string(debug.Stack()))
panic(s) panic(s)
} }
func Panicf(format string, v ...interface{}) { func Panicf(format string, v ...interface{}) {
s := fmt.Sprintf(format, v...) s := fmt.Sprintf(format, v...)
stdlogger.Output(2, s) errlogger.Output(2, s)
stdlogger.Output(2, string(debug.Stack())) errlogger.Output(2, string(debug.Stack()))
panic(s) panic(s)
} }
func Panicln(v ...interface{}) { func Panicln(v ...interface{}) {
s := fmt.Sprintln(v...) s := fmt.Sprintln(v...)
stdlogger.Output(2, s) errlogger.Output(2, s)
stdlogger.Output(2, string(debug.Stack())) errlogger.Output(2, string(debug.Stack()))
panic(s) panic(s)
} }
type errWithCallstack struct {
inner error
frames []*runtime.Frame
}
func (ecs *errWithCallstack) Error() string {
if ecs.frames == nil {
return ecs.inner.Error()
}
out := make([]string, 0, len(ecs.frames)+1)
out = append(out, ecs.inner.Error())
for i := len(ecs.frames) - 1; i >= 0; i-- {
frame := ecs.frames[i]
out = append(out, fmt.Sprintf("%s\n\t%s:%d", frame.Function, frame.File, frame.Line))
}
return strings.Join(out, "\n")
}
func ErrorWithCallStack(err error) error {
var frames []*runtime.Frame
if recur, ok := err.(*errWithCallstack); ok {
err = recur.inner
frames = recur.frames
}
pc, _, _, ok := runtime.Caller(1)
if ok {
curframes := runtime.CallersFrames([]uintptr{pc})
f, _ := curframes.Next()
frames = append(frames, &f)
}
return &errWithCallstack{
inner: err,
frames: frames,
}
}

View File

@ -1,179 +0,0 @@
package metric
import (
"crypto/md5"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
"path"
"strings"
"sync/atomic"
"repositories.action2quare.com/ayo/gocommon/logger"
)
const (
METRIC_HEAD_INLINE = byte(14)
METRIC_TAIL_INLINE = byte(15)
)
type MetricType int
const (
MetricCounter = MetricType(1)
MetricGuage = MetricType(2)
metric_key_size = 8
)
type MetricDescription struct {
Key string
Type MetricType
Name string `json:",omitempty"`
Help string `json:",omitempty"`
ConstLabels map[string]string `json:",omitempty"`
}
type writeRequest struct {
key string
valfunc func() float64
}
type metricCollection struct {
writerChan chan *writeRequest
}
var mc = metricCollection{
writerChan: make(chan *writeRequest, 100),
}
type MetricWriter interface {
Add(int64)
Set(int64)
}
type metric_empty struct{}
func (mw *metric_empty) Set(newval int64) {}
func (mw *metric_empty) Add(inc int64) {}
var MetricWriterNil = MetricWriter(&metric_empty{})
type metric_int64 struct {
valptr *int64
key string
writerChan chan *writeRequest
}
func (mw *metric_int64) requestMetricWrite() {
mw.writerChan <- &writeRequest{
key: mw.key,
valfunc: func() float64 { return float64(atomic.LoadInt64(mw.valptr)) },
}
}
func (mw *metric_int64) Set(newval int64) {
atomic.StoreInt64(mw.valptr, newval)
mw.requestMetricWrite()
}
func (mw *metric_int64) Add(inc int64) {
atomic.AddInt64(mw.valptr, inc)
mw.requestMetricWrite()
}
func (mc *metricCollection) metricWriter() {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
// head + metric_key_size + 8byte + tail + cr = 19
var buff [19]byte
buff[0] = METRIC_HEAD_INLINE
buff[17] = METRIC_TAIL_INLINE
buff[18] = '\n'
for req := range mc.writerChan {
copy(buff[1:], []byte(req.key))
binary.BigEndian.PutUint64(buff[9:], math.Float64bits(req.valfunc()))
os.Stdout.Write(buff[:])
}
}
var NewMetric func(MetricType, string, string, map[string]string) MetricWriter
var ConcurrentUser = MetricWriterNil
func init() {
NewMetric = func(MetricType, string, string, map[string]string) MetricWriter {
return &metric_empty{}
}
if path.Base(os.Args[0]) == "houston" {
logger.Println("metrics are going to be generated for myself(houston)")
go mc.metricWriter()
NewMetric = newMetricImpl
return
}
ppid := os.Getppid()
if parent, _ := os.FindProcess(ppid); parent != nil {
filename := fmt.Sprintf(`/proc/%d/stat`, os.Getppid())
if fn, err := os.ReadFile(filename); err == nil {
stats := strings.SplitN(string(fn), " ", 3)
parentname := strings.Trim(stats[1], "()")
if path.Base(parentname) == "houston" {
logger.Println("metrics are going to be generated for houston")
go mc.metricWriter()
NewMetric = newMetricImpl
ConcurrentUser = NewMetric(MetricGuage, "concurrent_user", "concurrent user count", map[string]string{"location": "lobby"})
} else {
logger.Println("metrics are NOT going to be generated. parent is not houston :", filename, string(fn))
}
} else {
logger.Println("metrics are NOT going to be generated. ppid proc is missing :", filename)
}
} else {
logger.Println("metrics are NOT going to be generated. parent process is missing. ppid :", ppid)
}
}
func newMetricImpl(mt MetricType, name string, help string, constLabels map[string]string) (writer MetricWriter) {
hash := md5.New()
hash.Write([]byte(name))
if constLabels == nil {
constLabels = make(map[string]string)
}
hn, _ := os.Hostname()
sn := path.Base(os.Args[0])
constLabels["hostname"] = hn
constLabels["service"] = sn
key := hex.EncodeToString(hash.Sum(nil))[:metric_key_size]
temp, _ := json.Marshal(MetricDescription{
Key: key,
Type: mt,
Name: name,
Help: help,
ConstLabels: constLabels,
})
writer = &metric_int64{
valptr: new(int64),
key: key,
writerChan: mc.writerChan,
}
output := append([]byte{METRIC_HEAD_INLINE}, temp...)
output = append(output, METRIC_TAIL_INLINE, '\n')
os.Stdout.Write(output)
return
}

View File

@ -15,7 +15,6 @@ import (
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref" "go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
) )
type MongoClient struct { type MongoClient struct {
@ -61,19 +60,8 @@ func (ci *ConnectionInfo) SetDatabase(dbname string) *ConnectionInfo {
return ci return ci
} }
var errNoDatabaseNameInMongoUri = errors.New("mongo uri has no database name") func NewMongoClient(ctx context.Context, url string, dbname string) (MongoClient, error) {
return newMongoClient(ctx, NewMongoConnectionInfo(url, dbname))
func NewMongoClient(ctx context.Context, url string) (MongoClient, error) {
connstr, err := connstring.ParseAndValidate(url)
if err != nil {
return MongoClient{}, err
}
if len(connstr.Database) == 0 {
return MongoClient{}, errNoDatabaseNameInMongoUri
}
return newMongoClient(ctx, NewMongoConnectionInfo(url, connstr.Database))
} }
func newMongoClient(ctx context.Context, ci *ConnectionInfo) (MongoClient, error) { func newMongoClient(ctx context.Context, ci *ConnectionInfo) (MongoClient, error) {
@ -106,21 +94,21 @@ func newMongoClient(ctx context.Context, ci *ConnectionInfo) (MongoClient, error
return MongoClient{}, err return MongoClient{}, err
} }
// go func() { go func() {
// for { for {
// if err := client.Ping(ctx, nil); err != nil { if err := client.Ping(ctx, nil); err != nil {
// logger.Error("mongo client ping err :", err) logger.Error("mongo client ping err :", err)
// } }
// select { select {
// case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
// continue continue
// case <-ctx.Done(): case <-ctx.Done():
// return return
// } }
// } }
// }() }()
mdb := client.Database(ci.Database, nil) mdb := client.Database(ci.Database, nil)
return MongoClient{c: client, db: mdb}, nil return MongoClient{c: client, db: mdb}, nil
@ -136,18 +124,6 @@ func (mc MongoClient) Close() {
} }
} }
func (mc MongoClient) DropIndex(coll CollectionName, name string) error {
matchcoll := mc.Collection(coll)
_, err := matchcoll.Indexes().DropOne(context.Background(), name)
if commanderr, ok := err.(mongo.CommandError); ok {
if commanderr.Code == 27 {
// 인덱스가 없는 것이므로 그냥 성공
return nil
}
}
return err
}
func (mc MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) { func (mc MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) {
if len(opts) == 0 { if len(opts) == 0 {
opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)} opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)}
@ -456,7 +432,7 @@ IndexSearchLabel:
return err return err
} }
func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error { func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[string]bson.D, option *options.IndexOptions) error {
collection := mc.Collection(coll) collection := mc.Collection(coll)
cursor, err := collection.Indexes().List(context.Background(), options.ListIndexes().SetMaxTime(time.Second)) cursor, err := collection.Indexes().List(context.Background(), options.ListIndexes().SetMaxTime(time.Second))
if err != nil { if err != nil {
@ -484,12 +460,12 @@ func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[str
if len(v) == 1 { if len(v) == 1 {
mod = mongo.IndexModel{ mod = mongo.IndexModel{
Keys: primitive.M{v[0].Key: v[0].Value}, Keys: primitive.M{v[0].Key: v[0].Value},
Options: options.MergeIndexOptions(append(opts, options.Index().SetName(name))...), Options: options.MergeIndexOptions(options.Index().SetName(name), option),
} }
} else { } else {
mod = mongo.IndexModel{ mod = mongo.IndexModel{
Keys: indices[name], Keys: indices[name],
Options: options.MergeIndexOptions(append(opts, options.Index().SetName(name))...), Options: options.MergeIndexOptions(options.Index().SetName(name), option),
} }
} }
@ -502,10 +478,10 @@ func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[str
return nil return nil
} }
func (mc MongoClient) MakeUniqueIndices(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error { func (mc MongoClient) MakeUniqueIndices(coll CollectionName, indices map[string]bson.D) error {
return mc.makeIndicesWithOption(coll, indices, append(opts, options.Index().SetUnique(true))...) return mc.makeIndicesWithOption(coll, indices, options.Index().SetUnique(true))
} }
func (mc MongoClient) MakeIndices(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error { func (mc MongoClient) MakeIndices(coll CollectionName, indices map[string]bson.D) error {
return mc.makeIndicesWithOption(coll, indices, opts...) return mc.makeIndicesWithOption(coll, indices, options.Index())
} }

439
redis.go
View File

@ -2,11 +2,9 @@ package gocommon
import ( import (
"context" "context"
"encoding/json" "net/url"
"fmt"
"os" "os"
"strconv" "strconv"
"strings"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
@ -21,30 +19,23 @@ func newRedisClient(uri string, dbidxoffset int) *redis.Client {
return redis.NewClient(option) return redis.NewClient(option)
} }
func NewRedisClient(uri string) (*redis.Client, error) { func NewRedisClient(uri string, dbidx int) (*redis.Client, error) {
if !*devflag { if !*devflag {
return newRedisClient(uri, 0), nil return newRedisClient(uri, dbidx), nil
} }
option, err := redis.ParseURL(uri) rdb := newRedisClient(uri, 0)
if err != nil { devUrl, _ := url.Parse(uri)
return nil, err
}
zero := *option
zero.DB = 0
rdb := redis.NewClient(&zero)
defer rdb.Close()
hostname, _ := os.Hostname() hostname, _ := os.Hostname()
myidx, _ := rdb.HGet(context.Background(), "private_db", hostname).Result() myidx, _ := rdb.HGet(context.Background(), "private_db", hostname).Result()
if len(myidx) > 0 { if len(myidx) > 0 {
offset, _ := strconv.Atoi(myidx) devUrl.Path = "/" + myidx
option.DB += offset return newRedisClient(devUrl.String(), dbidx), nil
return redis.NewClient(option), nil
} }
alldbs, err := rdb.HGetAll(context.Background(), "private_db").Result() alldbs, err := rdb.HGetAll(context.Background(), "private_db").Result()
if err != nil { if err != nil {
rdb.Close()
return nil, err return nil, err
} }
@ -62,416 +53,6 @@ func NewRedisClient(uri string) (*redis.Client, error) {
return nil, err return nil, err
} }
option.DB += newidx devUrl.Path = "/" + strconv.Itoa(newidx)
return redis.NewClient(option), nil return newRedisClient(devUrl.String(), dbidx), nil
}
type RedisonSetOption = string
type RedisonGetOption = [2]any
const (
// JSONSET command Options
RedisonSetOptionNX RedisonSetOption = "NX"
RedisonSetOptionXX RedisonSetOption = "XX"
)
var (
RedisonGetOptionSPACE = RedisonGetOption{"SPACE", " "}
RedisonGetOptionINDENT = RedisonGetOption{"INDENT", "\t"}
RedisonGetOptionNEWLINE = RedisonGetOption{"NEWLINE", "\n"}
RedisonGetOptionNOESCAPE = RedisonGetOption{"NOESCAPE", ""}
)
// gocommon으로 옮길 거
type RedisonHandler struct {
*redis.Client
ctx context.Context
}
func NewRedisonHandler(ctx context.Context, redisClient *redis.Client) *RedisonHandler {
return &RedisonHandler{
Client: redisClient,
ctx: ctx,
}
}
func respToArray[T any](resp any, err error) ([]T, error) {
if err != nil {
if err == redis.Nil {
return nil, nil
}
return nil, err
}
resArr := resp.([]any)
v := make([]T, len(resArr))
for i, e := range resArr {
v[i] = e.(T)
}
return v, nil
}
func appendArgs[T any](args []any, ext ...T) []any {
for _, e := range ext {
args = append(args, e)
}
return args
}
func (rh *RedisonHandler) JSONMSetRel(key string, prefixPath string, kv map[string]any) error {
if len(prefixPath) > 0 && !strings.HasSuffix(prefixPath, ".") {
prefixPath += "."
}
pl := rh.Pipeline()
for path, obj := range kv {
b, err := json.Marshal(obj)
if err != nil {
return err
}
pl.Do(rh.ctx, "JSON.SET", key, prefixPath+path, b)
}
cmders, err := pl.Exec(rh.ctx)
if err != nil {
return err
}
for _, cmder := range cmders {
if cmder.Err() != nil {
return cmder.Err()
}
}
return nil
}
func (rh *RedisonHandler) JSONMSet(key string, kv map[string]any) error {
return rh.JSONMSetRel(key, "", kv)
}
func (rh *RedisonHandler) jsonSetMergeJSONSet(cmd, key, path string, obj any, opts ...RedisonSetOption) (bool, error) {
b, err := json.Marshal(obj)
if err != nil {
return false, err
}
args := []any{
"JSON.SET",
key,
path,
b,
}
if len(opts) > 0 {
args = append(args, opts[0])
}
res, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return false, err
}
return res.(string) == "OK", nil
}
func (rh *RedisonHandler) JSONSet(key, path string, obj any, opts ...RedisonSetOption) (bool, error) {
return rh.jsonSetMergeJSONSet("JSON.SET", key, path, obj, opts...)
}
func (rh *RedisonHandler) JSONMerge(key, path string, obj any, opts ...RedisonSetOption) (bool, error) {
return rh.jsonSetMergeJSONSet("JSON.MERGE", key, path, obj, opts...)
}
func (rh *RedisonHandler) JSONGet(key, path string, opts ...RedisonGetOption) (res any, err error) {
args := appendArgs[string]([]any{
"JSON.GET",
key,
}, strings.Split(path, " ")...)
for _, opt := range opts {
args = append(args, opt[:]...)
}
return rh.Do(rh.ctx, args...).Result()
}
func (rh *RedisonHandler) JSONGetString(key, path string) ([]string, error) {
return respToArray[string](rh.JSONResp(key, path))
}
func (rh *RedisonHandler) JSONGetDocuments(key, path string) ([]map[string]any, error) {
resp, err := rh.JSONGet(key, path)
if err != nil {
if err == redis.Nil {
return nil, nil
}
return nil, err
}
var objs []map[string]any
err = json.Unmarshal([]byte(resp.(string)), &objs)
return objs, err
}
func (rh *RedisonHandler) JSONGetInt64(key, path string) ([]int64, error) {
return respToArray[int64](rh.JSONResp(key, path))
}
func (rh *RedisonHandler) JSONMGet(path string, keys ...string) (res any, err error) {
args := appendArgs[string]([]any{
"JSON.MGET",
path,
}, keys...)
return rh.Do(rh.ctx, args...).Result()
}
func (rh *RedisonHandler) JSONMDel(key string, paths []string) error {
pl := rh.Pipeline()
for _, path := range paths {
args := []any{
"JSON.DEL",
key,
path,
}
pl.Do(rh.ctx, args...)
}
_, err := pl.Exec(rh.ctx)
return err
}
func (rh *RedisonHandler) JSONDel(key, path string) (int64, error) {
args := []any{
"JSON.DEL",
key,
path,
}
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return 0, err
}
return resp.(int64), nil
}
func (rh *RedisonHandler) JSONType(key, path string) ([]string, error) {
args := []any{
"JSON.TYPE",
key,
path,
}
return respToArray[string](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONNumIncrBy(key, path string, number int) ([]int64, error) {
args := []any{
"JSON.NUMINCRBY",
key,
path,
number,
}
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return nil, err
}
var cnts []int64
err = json.Unmarshal([]byte(resp.(string)), &cnts)
return cnts, err
}
func (rh *RedisonHandler) JSONNumMultBy(key, path string, number int) (res any, err error) {
args := []any{
"JSON.NUMMULTBY",
key,
path,
number,
}
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return nil, err
}
return resp.([]any), nil
}
func (rh *RedisonHandler) JSONStrAppend(key, path string, jsonstring string) ([]int64, error) {
args := []any{
"JSON.STRAPPEND",
key,
path,
fmt.Sprintf(`'"%s"'`, jsonstring),
}
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONStrLen(key, path string) (res []int64, err error) {
args := []any{
"JSON.STRLEN",
key,
path,
}
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONArrAppend(key, path string, values ...any) (int64, error) {
args := appendValues([]any{
"JSON.ARRAPPEND",
key,
path,
}, values...)
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return 0, err
}
return resp.(int64), nil
}
func (rh *RedisonHandler) JSONArrLen(key, path string) ([]int64, error) {
args := []any{
"JSON.ARRLEN",
key,
path,
}
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONArrPop(key, path string, index int) (res any, err error) {
args := []any{
"JSON.ARRPOP",
key,
path,
index,
}
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return nil, err
}
return resp.([]any)[0], nil
}
func appendValues(args []any, values ...any) []any {
for _, jsonValue := range values {
switch jsonValue := jsonValue.(type) {
case string:
args = append(args, fmt.Sprintf(`'"%s"'`, jsonValue))
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
args = append(args, jsonValue)
default:
bt, _ := json.Marshal(jsonValue)
args = append(args, bt)
}
}
return args
}
func (rh *RedisonHandler) JSONArrIndex(key, path string, jsonValue any, optionalRange ...int) ([]int64, error) {
args := appendValues([]any{
"JSON.ARRINDEX",
key,
path,
}, jsonValue)
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONArrTrim(key, path string, start, end int) (res any, err error) {
args := []any{
"JSON.ARRTRIM",
key,
path,
start,
end,
}
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONArrInsert(key, path string, index int, values ...any) (res any, err error) {
args := appendValues([]any{
"JSON.ARRINSERT",
key,
path,
index,
}, values...)
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONObjKeys(key, path string) ([]string, error) {
args := []any{
"JSON.OBJKEYS",
key,
path,
}
res, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return nil, err
}
resArr := res.([]any)
if len(resArr) == 0 {
return nil, nil
}
resArr = resArr[0].([]any)
slc := make([]string, len(resArr))
for i, r := range resArr {
slc[i] = r.(string)
}
return slc, nil
}
func (rh *RedisonHandler) JSONObjLen(key, path string) ([]int64, error) {
args := []any{
"JSON.OBJLEN",
key,
}
if path != "$" {
args = append(args, path)
}
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return nil, err
}
switch resp := resp.(type) {
case []any:
return respToArray[int64](resp, nil)
case int64:
return []int64{resp}, nil
}
return []int64{0}, nil
}
func (rh *RedisonHandler) JSONDebug(key, path string) (res any, err error) {
args := []any{
"JSON.DEBUG",
"MEMORY",
key,
path,
}
return rh.Do(rh.ctx, args...).Result()
}
func (rh *RedisonHandler) JSONForget(key, path string) (int64, error) {
return rh.JSONDel(key, path)
}
func (rh *RedisonHandler) JSONResp(key, path string) (res any, err error) {
args := []any{
"JSON.RESP",
key,
path,
}
return rh.Do(rh.ctx, args...).Result()
} }

View File

@ -65,7 +65,10 @@ func LoadConfig[T any](outptr *T) error {
type StorageAddr struct { type StorageAddr struct {
Mongo string Mongo string
Redis map[string]string Redis struct {
URL string
Offset map[string]int
}
} }
type RegionStorageConfig struct { type RegionStorageConfig struct {

315
server.go
View File

@ -1,9 +1,7 @@
package gocommon package gocommon
import ( import (
"bytes"
"context" "context"
"encoding/gob"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -22,7 +20,6 @@ import (
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"time" "time"
"unsafe"
"repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
@ -31,15 +28,6 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
) )
func init() {
gob.Register(map[string]any{})
gob.Register(primitive.A{})
gob.Register(primitive.M{})
gob.Register(primitive.D{})
gob.Register(primitive.ObjectID{})
gob.Register([]any{})
}
const ( const (
// HTTPStatusReloginRequired : http status를 이걸 받으면 클라이언트는 로그아웃하고 로그인 화면으로 돌아가야 한다. // HTTPStatusReloginRequired : http status를 이걸 받으면 클라이언트는 로그아웃하고 로그인 화면으로 돌아가야 한다.
HTTPStatusReloginRequired = 599 HTTPStatusReloginRequired = 599
@ -58,14 +46,6 @@ const (
ShutdownFlagIdle = ShutdownFlag(3) ShutdownFlagIdle = ShutdownFlag(3)
) )
type ErrorWithStatus struct {
StatusCode int
}
func (e ErrorWithStatus) Error() string {
return fmt.Sprintf("%d", e.StatusCode)
}
type RpcReturnTypeInterface interface { type RpcReturnTypeInterface interface {
Value() any Value() any
Error() error Error() error
@ -117,18 +97,24 @@ func NewHTTPServerWithPort(serveMux *http.ServeMux, port int) *Server {
addr := fmt.Sprintf(":%d", port) addr := fmt.Sprintf(":%d", port)
serveMux.HandleFunc(MakeHttpHandlerPattern("welcome"), welcomeHandler) serveMux.HandleFunc(MakeHttpHandlerPattern("welcome"), welcomeHandler)
serveMux.HandleFunc(MakeHttpHandlerPattern("lb_health_chceck"), healthCheckHandler) serveMux.HandleFunc(MakeHttpHandlerPattern("lb_health_chceck"), healthCheckHandler)
serveMux.HandleFunc(MakeHttpHandlerPattern("lb_health_check"), healthCheckHandler)
server := &Server{ server := &Server{
httpserver: &http.Server{Addr: addr, Handler: serveMux}, httpserver: &http.Server{Addr: addr, Handler: serveMux},
interrupt: make(chan os.Signal, 1),
} }
server.httpserver.SetKeepAlivesEnabled(true) server.httpserver.SetKeepAlivesEnabled(true)
signal.Notify(server.interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
c := <-server.interrupt
logger.Println("interrupt!!!!!!!! :", c.String())
server.shutdown()
}()
return server return server
} }
func NewHTTPServer(serveMux *http.ServeMux) *Server { func NewHTTPServer(serveMux *http.ServeMux) *Server {
// 시작시 자동으로 enable됨 // 시작시 자동으로 enable됨
if len(*tls) > 0 && *portptr == 80 { if len(*tls) > 0 && *portptr == 80 {
*portptr = 443 *portptr = 443
@ -146,7 +132,8 @@ func (server *Server) shutdown() {
logger.Println("http server shutdown. healthcheckcounter :", t) logger.Println("http server shutdown. healthcheckcounter :", t)
atomic.StoreInt64(&healthcheckcounter, math.MinInt64) atomic.StoreInt64(&healthcheckcounter, math.MinInt64)
for cnt := 0; cnt < 100; { timer := 600 // 0.1 * 600 = 1분
for cnt := 0; cnt < 100 && timer > 0; {
next := atomic.LoadInt64(&healthcheckcounter) next := atomic.LoadInt64(&healthcheckcounter)
if next == t { if next == t {
cnt++ cnt++
@ -155,6 +142,7 @@ func (server *Server) shutdown() {
cnt = 0 cnt = 0
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
timer--
} }
logger.Println("http server shutdown. healthcheck completed") logger.Println("http server shutdown. healthcheck completed")
} else { } else {
@ -167,14 +155,6 @@ func (server *Server) shutdown() {
} }
} }
func (server *Server) Stop() {
if server.interrupt != nil {
server.interrupt <- os.Interrupt
} else {
server.shutdown()
}
}
// Start : // Start :
func (server *Server) Start() error { func (server *Server) Start() error {
if server.httpserver != nil { if server.httpserver != nil {
@ -182,15 +162,6 @@ func (server *Server) Start() error {
if r != nil { if r != nil {
return r return r
} }
server.interrupt = make(chan os.Signal, 1)
signal.Notify(server.interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
c := <-server.interrupt
logger.Println("interrupt!!!!!!!! :", c.String())
server.shutdown()
}()
proxyListener := &proxyproto.Listener{ proxyListener := &proxyproto.Listener{
Listener: ln, Listener: ln,
ReadHeaderTimeout: 10 * time.Second, ReadHeaderTimeout: 10 * time.Second,
@ -450,46 +421,6 @@ func ReadStringFormValue(r url.Values, key string) (string, bool) {
return strval, len(strval) > 0 return strval, len(strval) > 0
} }
type encoder interface {
Encode(any) error
}
type nilEncoder struct{}
func (ne *nilEncoder) Encode(any) error { return nil }
type decoder interface {
Decode(any) error
}
type nilDecoder struct{}
func (nd *nilDecoder) Decode(any) error { return nil }
func MakeDecoder(r *http.Request) decoder {
ct := r.Header.Get("Content-Type")
if ct == "application/gob" {
return gob.NewDecoder(r.Body)
} else if ct == "application/json" {
return json.NewDecoder(r.Body)
}
logger.Error("Content-Type is not supported :", ct)
return &nilDecoder{}
}
func MakeEncoder(w http.ResponseWriter, r *http.Request) encoder {
ct := r.Header.Get("Content-Type")
if ct == "application/gob" {
return gob.NewEncoder(w)
} else if ct == "application/json" {
return json.NewEncoder(w)
}
logger.Error("Content-Type is not supported :", ct)
return &nilEncoder{}
}
func DotStringToTimestamp(tv string) primitive.Timestamp { func DotStringToTimestamp(tv string) primitive.Timestamp {
if len(tv) == 0 { if len(tv) == 0 {
return primitive.Timestamp{T: 0, I: 0} return primitive.Timestamp{T: 0, I: 0}
@ -668,227 +599,3 @@ func MakeHttpRequestForLogging(r *http.Request) *http.Request {
r.Body = ib r.Body = ib
return r return r
} }
type apiFuncType func(http.ResponseWriter, *http.Request)
type HttpApiHandler struct {
methods map[string]apiFuncType
originalReceiverName string
}
func MakeHttpApiHandler[T any](receiver *T, receiverName string) HttpApiHandler {
methods := make(map[string]apiFuncType)
tp := reflect.TypeOf(receiver)
if len(receiverName) == 0 {
receiverName = tp.Elem().Name()
}
writerType := reflect.TypeOf((*http.ResponseWriter)(nil)).Elem()
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
if method.Type.NumIn() != 3 {
continue
}
if method.Type.In(0) != tp {
continue
}
if !method.Type.In(1).Implements(writerType) {
continue
}
var r http.Request
if method.Type.In(2) != reflect.TypeOf(&r) {
continue
}
if method.Name == "ServeHTTP" {
continue
}
funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1)
testfunc := (*func(*T, http.ResponseWriter, *http.Request))(p2)
methods[receiverName+"."+method.Name] = func(w http.ResponseWriter, r *http.Request) {
(*testfunc)(receiver, w, r)
}
}
return HttpApiHandler{
methods: methods,
originalReceiverName: tp.Elem().Name(),
}
}
type HttpApiBroker struct {
methods map[string]apiFuncType
methods_dup map[string][]apiFuncType
}
type bufferReadCloser struct {
*bytes.Reader
}
func (buff *bufferReadCloser) Close() error { return nil }
type readOnlyResponseWriter struct {
inner http.ResponseWriter
statusCode int
}
func (w *readOnlyResponseWriter) Header() http.Header {
return w.inner.Header()
}
func (w *readOnlyResponseWriter) Write(in []byte) (int, error) {
logger.Println("readOnlyResponseWriter cannot write")
return len(in), nil
}
func (w *readOnlyResponseWriter) WriteHeader(statusCode int) {
w.statusCode = statusCode
}
func (hc *HttpApiBroker) AddHandler(receiver HttpApiHandler) {
if hc.methods == nil {
hc.methods = make(map[string]apiFuncType)
hc.methods_dup = make(map[string][]apiFuncType)
}
for k, v := range receiver.methods {
ab := strings.Split(k, ".")
logger.Printf("http api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k)
hc.methods_dup[k] = append(hc.methods_dup[k], v)
if len(hc.methods_dup[k]) > 1 {
chain := hc.methods_dup[k]
hc.methods[k] = func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
defer r.Body.Close()
wrap := &readOnlyResponseWriter{inner: w, statusCode: 200}
for _, f := range chain {
r.Body = &bufferReadCloser{bytes.NewReader(body)}
f(wrap, r)
}
if wrap.statusCode != 200 {
w.WriteHeader(wrap.statusCode)
}
}
} else {
hc.methods[k] = v
}
}
}
func (hc *HttpApiBroker) Call(w http.ResponseWriter, r *http.Request) {
funcname := r.URL.Query().Get("call")
if len(funcname) == 0 {
logger.Println("query param 'call' is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
hc.call(funcname, w, r)
}
func (hc *HttpApiBroker) call(funcname string, w http.ResponseWriter, r *http.Request) {
if found := hc.methods[funcname]; found != nil {
found(w, r)
} else {
logger.Println("api is not found :", funcname)
}
}
func CallInternalServiceAPI[T any](url string, apitoken string, method string, data T, headers ...string) error {
tempHeader := make(http.Header)
tempHeader.Set("MG-X-API-TOKEN", apitoken)
tempHeader.Set("Content-Type", "application/gob")
for i := 1; i < len(headers); i += 2 {
tempHeader.Set(headers[i-1], headers[i])
}
buff := new(bytes.Buffer)
ct := tempHeader.Get("Content-Type")
if ct == "application/gob" {
enc := gob.NewEncoder(buff)
enc.Encode(data)
} else if ct == "application/json" {
enc := json.NewEncoder(buff)
enc.Encode(data)
}
reqURL := fmt.Sprintf("%s/api?call=%s", url, method)
req, err := http.NewRequest("POST", reqURL, buff)
if err != nil {
return err
}
req.Header = tempHeader
resp, err := http.DefaultClient.Do(req)
defer func() {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}()
if err == nil {
if resp.StatusCode != http.StatusOK {
return &ErrorWithStatus{StatusCode: resp.StatusCode}
}
}
return err
}
func CallInternalServiceAPIAs[Tin any, Tout any](url string, apitoken string, method string, data Tin, out *Tout, headers ...string) error {
tempHeader := make(http.Header)
tempHeader.Set("MG-X-API-TOKEN", apitoken)
tempHeader.Set("Content-Type", "application/gob")
for i := 1; i < len(headers); i += 2 {
tempHeader.Set(headers[i-1], headers[i])
}
buff := new(bytes.Buffer)
ct := tempHeader.Get("Content-Type")
if ct == "application/gob" {
enc := gob.NewEncoder(buff)
enc.Encode(data)
} else if ct == "application/json" {
enc := json.NewEncoder(buff)
enc.Encode(data)
}
reqURL := fmt.Sprintf("%s/api?call=%s", url, method)
req, err := http.NewRequest("POST", reqURL, buff)
if err != nil {
return err
}
req.Header = tempHeader
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return &ErrorWithStatus{StatusCode: resp.StatusCode}
}
defer func() {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}()
if out != nil && resp.Body != nil {
dec := gob.NewDecoder(resp.Body)
return dec.Decode(out)
}
return nil
}

View File

@ -1,132 +0,0 @@
package session
import (
"context"
"encoding/binary"
"encoding/hex"
"errors"
"math/rand"
"strings"
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type Authorization struct {
Account primitive.ObjectID `bson:"a" json:"a"`
// by authorization provider
Platform string `bson:"p" json:"p"`
Uid string `bson:"u" json:"u"`
Email string `bson:"em" json:"em"`
}
type Provider interface {
New(*Authorization) (string, error)
Delete(primitive.ObjectID) error
Query(string) (Authorization, error)
Touch(string) (bool, error)
}
type Consumer interface {
Query(string) (Authorization, error)
Touch(string) (Authorization, error)
}
type storagekey string
type publickey string
var r = rand.New(rand.NewSource(time.Now().UnixNano()))
func make_storagekey(acc primitive.ObjectID) storagekey {
bs := [4]byte{}
binary.LittleEndian.PutUint32(bs[:], r.Uint32())
return storagekey(acc.Hex() + hex.EncodeToString(bs[2:]))
}
func storagekey_to_publickey(sk storagekey) publickey {
bs, _ := hex.DecodeString(string(sk))
acc := bs[:12]
cs := bs[12:]
encoded := [14]byte{}
for i, v := range acc[:] {
encoded[i] = (v ^ cs[0]) ^ cs[1]
}
encoded[12] = cs[0]
encoded[13] = cs[1]
return publickey(hex.EncodeToString(encoded[:]))
}
func publickey_to_storagekey(pk publickey) storagekey {
bs, _ := hex.DecodeString(string(pk))
acc := bs[:12]
cs := bs[12:]
decoded := [14]byte{}
for i, v := range acc[:] {
decoded[i] = (v ^ cs[1]) ^ cs[0]
}
decoded[12] = cs[0]
decoded[13] = cs[1]
return storagekey(hex.EncodeToString(decoded[:]))
}
type SessionConfig struct {
SessionTTL int64 `json:"session_ttl"`
SessionStorage string `json:"session_storage"`
}
var errInvalidScheme = errors.New("storageAddr is not valid scheme")
var errSessionStorageMissing = errors.New("session_storageis missing")
func NewConsumer(ctx context.Context, storageAddr string, ttl time.Duration) (Consumer, error) {
if strings.HasPrefix(storageAddr, "mongodb") {
return newConsumerWithMongo(ctx, storageAddr, ttl)
}
if strings.HasPrefix(storageAddr, "redis") {
return newConsumerWithRedis(ctx, storageAddr, ttl)
}
return nil, errInvalidScheme
}
func NewConsumerWithConfig(ctx context.Context, cfg SessionConfig) (Consumer, error) {
if len(cfg.SessionStorage) == 0 {
return nil, errSessionStorageMissing
}
if cfg.SessionTTL == 0 {
cfg.SessionTTL = 3600
}
return NewConsumer(ctx, cfg.SessionStorage, time.Duration(cfg.SessionTTL)*time.Second)
}
func NewProvider(ctx context.Context, storageAddr string, ttl time.Duration) (Provider, error) {
if strings.HasPrefix(storageAddr, "mongodb") {
return newProviderWithMongo(ctx, storageAddr, ttl)
}
if strings.HasPrefix(storageAddr, "redis") {
return newProviderWithRedis(ctx, storageAddr, ttl)
}
return nil, errInvalidScheme
}
func NewProviderWithConfig(ctx context.Context, cfg SessionConfig) (Provider, error) {
if len(cfg.SessionStorage) == 0 {
return nil, errSessionStorageMissing
}
if cfg.SessionTTL == 0 {
cfg.SessionTTL = 3600
}
return NewProvider(ctx, cfg.SessionStorage, time.Duration(cfg.SessionTTL)*time.Second)
}

View File

@ -1,56 +0,0 @@
package session
import (
"context"
"sync"
"time"
)
type cache_stage[T any] struct {
cache map[storagekey]T
deleted map[storagekey]bool
}
func make_cache_stage[T any]() *cache_stage[T] {
return &cache_stage[T]{
cache: make(map[storagekey]T),
deleted: make(map[storagekey]bool),
}
}
type consumer_common[T any] struct {
lock sync.Mutex
ttl time.Duration
ctx context.Context
stages [2]*cache_stage[T]
startTime time.Time
}
func (c *consumer_common[T]) add_internal(sk storagekey, si T) {
c.stages[0].cache[sk] = si
delete(c.stages[0].deleted, sk)
c.stages[1].cache[sk] = si
delete(c.stages[1].deleted, sk)
}
func (c *consumer_common[T]) delete_internal(sk storagekey) {
delete(c.stages[0].cache, sk)
c.stages[0].deleted[sk] = true
delete(c.stages[1].cache, sk)
c.stages[1].deleted[sk] = true
}
func (c *consumer_common[T]) delete(sk storagekey) {
c.lock.Lock()
defer c.lock.Unlock()
c.delete_internal(sk)
}
func (c *consumer_common[T]) changeStage() {
c.lock.Lock()
defer c.lock.Unlock()
c.stages[1] = c.stages[0]
c.stages[0] = make_cache_stage[T]()
}

View File

@ -1,349 +0,0 @@
package session
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
)
const (
session_collection_name = gocommon.CollectionName("session")
)
type provider_mongo struct {
mongoClient gocommon.MongoClient
}
type sessionMongo struct {
Id primitive.ObjectID `bson:"_id,omitempty"`
Auth *Authorization `bson:"auth"`
Key storagekey `bson:"key"`
Ts primitive.DateTime `bson:"_ts"`
}
func newProviderWithMongo(ctx context.Context, mongoUrl string, ttl time.Duration) (Provider, error) {
mc, err := gocommon.NewMongoClient(ctx, mongoUrl)
if err != nil {
return nil, err
}
if err = mc.MakeUniqueIndices(session_collection_name, map[string]bson.D{
"key": {{Key: "key", Value: 1}},
}); err != nil {
return nil, err
}
if err := mc.MakeExpireIndex(session_collection_name, int32(ttl.Seconds())); err != nil {
return nil, err
}
return &provider_mongo{
mongoClient: mc,
}, nil
}
func (p *provider_mongo) New(input *Authorization) (string, error) {
sk := make_storagekey(input.Account)
_, _, err := p.mongoClient.Update(session_collection_name, bson.M{
"_id": input.Account,
}, bson.M{
"$set": sessionMongo{
Auth: input,
Key: sk,
Ts: primitive.NewDateTimeFromTime(time.Now().UTC()),
},
}, options.Update().SetUpsert(true))
return string(storagekey_to_publickey(sk)), err
}
func (p *provider_mongo) Delete(acc primitive.ObjectID) error {
_, err := p.mongoClient.Delete(session_collection_name, bson.M{
"_id": acc,
})
return err
}
func (p *provider_mongo) Query(pk string) (Authorization, error) {
sk := publickey_to_storagekey(publickey(pk))
var auth Authorization
err := p.mongoClient.FindOneAs(session_collection_name, bson.M{
"key": sk,
}, &auth)
return auth, err
}
func (p *provider_mongo) Touch(pk string) (bool, error) {
sk := publickey_to_storagekey(publickey(pk))
worked, _, err := p.mongoClient.Update(session_collection_name, bson.M{
"key": sk,
}, bson.M{
"$currentDate": bson.M{
"_ts": bson.M{"$type": "date"},
},
}, options.Update().SetUpsert(false))
if err != nil {
logger.Println("provider Touch :", err)
return false, err
}
return worked, nil
}
type consumer_mongo struct {
consumer_common[*sessionMongo]
ids map[primitive.ObjectID]storagekey
mongoClient gocommon.MongoClient
ttl time.Duration
}
type sessionPipelineDocument struct {
OperationType string `bson:"operationType"`
DocumentKey struct {
Id primitive.ObjectID `bson:"_id"`
} `bson:"documentKey"`
Session *sessionMongo `bson:"fullDocument"`
}
func newConsumerWithMongo(ctx context.Context, mongoUrl string, ttl time.Duration) (Consumer, error) {
mc, err := gocommon.NewMongoClient(ctx, mongoUrl)
if err != nil {
return nil, err
}
consumer := &consumer_mongo{
consumer_common: consumer_common[*sessionMongo]{
ttl: ttl,
ctx: ctx,
stages: [2]*cache_stage[*sessionMongo]{make_cache_stage[*sessionMongo](), make_cache_stage[*sessionMongo]()},
startTime: time.Now(),
},
ids: make(map[primitive.ObjectID]storagekey),
ttl: ttl,
mongoClient: mc,
}
go func() {
matchStage := bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{
"delete",
"insert",
"update",
}},
}},
},
}}
projectStage := bson.D{
{
Key: "$project", Value: bson.D{
{Key: "documentKey", Value: 1},
{Key: "operationType", Value: 1},
{Key: "fullDocument", Value: 1},
},
},
}
var stream *mongo.ChangeStream
nextswitch := time.Now().Add(ttl)
for {
if stream == nil {
stream, err = mc.Watch(session_collection_name, mongo.Pipeline{matchStage, projectStage})
if err != nil {
logger.Error("watchAuthCollection watch failed :", err)
time.Sleep(time.Minute)
continue
}
}
changed := stream.TryNext(ctx)
if ctx.Err() != nil {
logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
break
}
if changed {
var data sessionPipelineDocument
if err := stream.Decode(&data); err == nil {
ot := data.OperationType
switch ot {
case "insert":
consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session)
case "update":
if data.Session == nil {
consumer.deleteById(data.DocumentKey.Id)
} else {
consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session)
}
case "delete":
consumer.deleteById(data.DocumentKey.Id)
}
} else {
logger.Error("watchAuthCollection stream.Decode failed :", err)
}
} else if stream.Err() != nil || stream.ID() == 0 {
select {
case <-ctx.Done():
logger.Println("watchAuthCollection is done")
stream.Close(ctx)
return
case <-time.After(time.Second):
logger.Error("watchAuthCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
}
} else {
time.Sleep(time.Second)
}
now := time.Now()
for now.After(nextswitch) {
consumer.changeStage()
nextswitch = nextswitch.Add(ttl)
}
}
}()
return consumer, nil
}
func (c *consumer_mongo) query_internal(sk storagekey) (*sessionMongo, bool, error) {
if _, deleted := c.stages[0].deleted[sk]; deleted {
return nil, false, nil
}
if _, deleted := c.stages[1].deleted[sk]; deleted {
return nil, false, nil
}
found, ok := c.stages[0].cache[sk]
if !ok {
found, ok = c.stages[1].cache[sk]
}
if ok {
return found, false, nil
}
var si sessionMongo
err := c.mongoClient.FindOneAs(session_collection_name, bson.M{
"key": sk,
}, &si)
if err != nil {
logger.Println("consumer Query :", err)
return nil, false, err
}
if len(si.Key) > 0 {
siptr := &si
c.add_internal(sk, siptr)
return siptr, true, nil
}
return nil, false, nil
}
func (c *consumer_mongo) Query(pk string) (Authorization, error) {
c.lock.Lock()
defer c.lock.Unlock()
sk := publickey_to_storagekey(publickey(pk))
si, _, err := c.query_internal(sk)
if err != nil {
return Authorization{}, err
}
if si == nil {
return Authorization{}, nil
}
if time.Now().After(si.Ts.Time().Add(c.ttl)) {
return Authorization{}, nil
}
return *si.Auth, nil
}
func (c *consumer_mongo) Touch(pk string) (Authorization, error) {
c.lock.Lock()
defer c.lock.Unlock()
sk := publickey_to_storagekey(publickey(pk))
worked, _, err := c.mongoClient.Update(session_collection_name, bson.M{
"key": sk,
}, bson.M{
"$currentDate": bson.M{
"_ts": bson.M{"$type": "date"},
},
}, options.Update().SetUpsert(false))
if err != nil {
logger.Println("consumer Touch :", err)
return Authorization{}, err
}
if !worked {
// 이미 만료되서 사라짐
return Authorization{}, nil
}
si, added, err := c.query_internal(sk)
if err != nil {
return Authorization{}, err
}
if si == nil {
return Authorization{}, nil
}
if !added {
var doc sessionMongo
err := c.mongoClient.FindOneAs(session_collection_name, bson.M{
"key": sk,
}, &doc)
if err != nil {
logger.Println("consumer Query :", err)
return Authorization{}, err
}
if len(si.Key) > 0 {
c.add_internal(sk, &doc)
c.ids[doc.Id] = sk
return *doc.Auth, nil
}
}
return *si.Auth, nil
}
func (c *consumer_mongo) add(sk storagekey, id primitive.ObjectID, si *sessionMongo) {
c.lock.Lock()
defer c.lock.Unlock()
c.consumer_common.add_internal(sk, si)
c.ids[id] = sk
}
func (c *consumer_mongo) deleteById(id primitive.ObjectID) {
c.lock.Lock()
defer c.lock.Unlock()
if sk, ok := c.ids[id]; ok {
c.consumer_common.delete_internal(sk)
delete(c.ids, id)
}
}

View File

@ -1,297 +0,0 @@
package session
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
)
const (
communication_channel_name_prefix = "_sess_comm_chan_name"
)
type sessionRedis struct {
*Authorization
expireAt time.Time
}
type provider_redis struct {
redisClient *redis.Client
deleteChannel string
ttl time.Duration
ctx context.Context
}
func newProviderWithRedis(ctx context.Context, redisUrl string, ttl time.Duration) (Provider, error) {
redisClient, err := gocommon.NewRedisClient(redisUrl)
if err != nil {
return nil, err
}
return &provider_redis{
redisClient: redisClient,
deleteChannel: fmt.Sprintf("%s_%d", communication_channel_name_prefix, redisClient.Options().DB),
ttl: ttl,
ctx: ctx,
}, nil
}
func (p *provider_redis) New(input *Authorization) (string, error) {
bt, err := json.Marshal(input)
if err != nil {
return "", err
}
sk := make_storagekey(input.Account)
_, err = p.redisClient.SetEX(p.ctx, string(sk), bt, p.ttl).Result()
if err != nil {
return "", err
}
pk := storagekey_to_publickey(sk)
logger.Println("session provider new :", sk, pk)
return string(pk), err
}
func (p *provider_redis) Delete(account primitive.ObjectID) error {
prefix := account.Hex()
sks, err := p.redisClient.Keys(p.ctx, prefix+"*").Result()
if err != nil {
logger.Println("session provider delete :", sks, err)
return err
}
for _, sk := range sks {
logger.Println("session provider delete :", sk)
p.redisClient.Del(p.ctx, sk).Result()
p.redisClient.Publish(p.ctx, p.deleteChannel, sk).Result()
}
return nil
}
func (p *provider_redis) Query(pk string) (Authorization, error) {
sk := publickey_to_storagekey(publickey(pk))
payload, err := p.redisClient.Get(p.ctx, string(sk)).Result()
if err == redis.Nil {
logger.Println("session provider query :", pk, err)
return Authorization{}, nil
} else if err != nil {
logger.Println("session provider query :", pk, err)
return Authorization{}, err
}
var auth Authorization
if err := json.Unmarshal([]byte(payload), &auth); err != nil {
logger.Println("session provider query :", pk, err)
return Authorization{}, err
}
logger.Println("session provider query :", pk, auth)
return auth, nil
}
func (p *provider_redis) Touch(pk string) (bool, error) {
sk := publickey_to_storagekey(publickey(pk))
ok, err := p.redisClient.Expire(p.ctx, string(sk), p.ttl).Result()
logger.Println("session provider touch :", pk)
if err == redis.Nil {
// 이미 만료됨
logger.Println("session consumer touch :", pk, err)
return false, nil
} else if err != nil {
logger.Println("session consumer touch :", pk, err)
return false, err
}
logger.Println("session consumer touch :", pk)
return ok, nil
}
type consumer_redis struct {
consumer_common[*sessionRedis]
redisClient *redis.Client
}
func newConsumerWithRedis(ctx context.Context, redisUrl string, ttl time.Duration) (Consumer, error) {
redisClient, err := gocommon.NewRedisClient(redisUrl)
if err != nil {
return nil, err
}
consumer := &consumer_redis{
consumer_common: consumer_common[*sessionRedis]{
ttl: ttl,
ctx: ctx,
stages: [2]*cache_stage[*sessionRedis]{make_cache_stage[*sessionRedis](), make_cache_stage[*sessionRedis]()},
startTime: time.Now(),
},
redisClient: redisClient,
}
deleteChannel := communication_channel_name_prefix + "_d"
sub := redisClient.Subscribe(ctx, deleteChannel)
go func() {
stageswitch := time.Now().Add(ttl)
tickTimer := time.After(ttl)
for {
select {
case <-ctx.Done():
return
case <-tickTimer:
consumer.changeStage()
stageswitch = stageswitch.Add(ttl)
tempttl := time.Until(stageswitch)
tickTimer = time.After(tempttl)
case msg := <-sub.Channel():
if msg == nil {
return
}
if len(msg.Payload) == 0 {
continue
}
switch msg.Channel {
case deleteChannel:
sk := storagekey(msg.Payload)
consumer.delete(sk)
}
}
}
}()
return consumer, nil
}
func (c *consumer_redis) query_internal(sk storagekey) (*sessionRedis, bool, error) {
if _, deleted := c.stages[0].deleted[sk]; deleted {
return nil, false, nil
}
if _, deleted := c.stages[1].deleted[sk]; deleted {
return nil, false, nil
}
found, ok := c.stages[0].cache[sk]
if !ok {
found, ok = c.stages[1].cache[sk]
}
if ok {
if time.Now().Before(found.expireAt) {
// 만료전 세션
return found, false, nil
}
// 다른 Consumer가 Touch했을 수도 있으므로 redis에서 읽어본다.
}
payload, err := c.redisClient.Get(c.ctx, string(sk)).Result()
if err != nil && err != redis.Nil {
logger.Println("consumer Query :", err)
return nil, false, err
}
if len(payload) == 0 {
return nil, false, nil
}
var auth Authorization
if err := json.Unmarshal([]byte(payload), &auth); err != nil {
return nil, false, err
}
ttl, err := c.redisClient.TTL(c.ctx, string(sk)).Result()
if err != nil {
logger.Println("consumer Query :", err)
return nil, false, err
}
si := &sessionRedis{
Authorization: &auth,
expireAt: time.Now().Add(ttl),
}
c.add_internal(sk, si)
return si, true, nil
}
func (c *consumer_redis) Query(pk string) (Authorization, error) {
c.lock.Lock()
defer c.lock.Unlock()
sk := publickey_to_storagekey(publickey(pk))
si, _, err := c.query_internal(sk)
if err != nil {
logger.Println("session consumer query :", pk, err)
return Authorization{}, err
}
if si == nil {
logger.Println("session consumer query(si nil) :", pk, nil)
return Authorization{}, nil
}
if time.Now().After(si.expireAt) {
logger.Println("session consumer query(expired):", pk, nil)
return Authorization{}, nil
}
return *si.Authorization, nil
}
func (c *consumer_redis) Touch(pk string) (Authorization, error) {
c.lock.Lock()
defer c.lock.Unlock()
sk := publickey_to_storagekey(publickey(pk))
ok, err := c.redisClient.Expire(c.ctx, string(sk), c.ttl).Result()
if err == redis.Nil {
logger.Println("session consumer touch :", pk, err)
return Authorization{}, nil
} else if err != nil {
logger.Println("session consumer touch :", pk, err)
return Authorization{}, err
}
if ok {
// redis에 살아있다.
si, added, err := c.query_internal(sk)
if err != nil {
logger.Println("session consumer touch(ok) :", pk, err)
return Authorization{}, err
}
if si == nil {
logger.Println("session consumer touch(ok, si nil) :", pk)
return Authorization{}, nil
}
if !added {
si.expireAt = time.Now().Add(c.ttl)
// stage 0으로 옮기기 위해 add_internal을 다시 부름
c.add_internal(sk, si)
}
logger.Println("session consumer touch(ok) :", pk)
return *si.Authorization, nil
}
logger.Println("session consumer touch(!ok) :", pk)
return Authorization{}, nil
}

View File

@ -1,93 +0,0 @@
// package main ...
package session
import (
"context"
"testing"
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon/logger"
)
func TestExpTable(t *testing.T) {
// pv, err := NewProvider(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second)
// if err != nil {
// t.Error(err)
// }
// cs, err := NewConsumer(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second)
// if err != nil {
// t.Error(err)
// }
pv, err := NewProvider(context.Background(), "mongodb://192.168.8.94:27017/maingate?replicaSet=repl01&retrywrites=false", 10*time.Second)
if err != nil {
t.Error(err)
}
cs, err := NewConsumer(context.Background(), "mongodb://192.168.8.94:27017/maingate?replicaSet=repl01&retrywrites=false", 10*time.Second)
if err != nil {
t.Error(err)
}
test := primitive.NewObjectID()
sk := make_storagekey(test)
pk := storagekey_to_publickey(sk)
if publickey_to_storagekey(pk) != sk {
t.Errorf("pk != sk : %s, %s", pk, sk)
}
au1 := &Authorization{
Account: primitive.NewObjectID(),
Platform: "editor",
Uid: "uid-1",
}
sk1, err := pv.New(au1)
if err != nil {
t.Error(err)
}
au2 := &Authorization{
Account: primitive.NewObjectID(),
Platform: "editor",
Uid: "uid-2",
}
sk2, err := pv.New(au2)
if err != nil {
t.Error(err)
}
go func() {
for {
q1, err := cs.Query(sk1)
logger.Println("query :", q1, err)
q2, err := cs.Query(sk2)
logger.Println("query :", q2, err)
time.Sleep(time.Second)
}
}()
cs.Touch(sk1)
time.Sleep(2 * time.Second)
cs.Touch(sk2)
time.Sleep(2 * time.Second)
time.Sleep(2 * time.Second)
pv.Delete(au1.Account)
cs.Touch(sk1)
time.Sleep(2 * time.Second)
cs.Touch(sk2)
time.Sleep(2 * time.Second)
cs2, err := NewConsumer(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second)
if err != nil {
t.Error(err)
}
q2, err := cs2.Query(sk2)
logger.Println("queryf :", q2, err)
time.Sleep(20 * time.Second)
}

View File

@ -1,185 +0,0 @@
package wshandler
import (
"encoding/json"
"io"
"reflect"
"strings"
"unsafe"
"github.com/gorilla/websocket"
"repositories.action2quare.com/ayo/gocommon/logger"
)
const (
ClientConnected = "ClientConnected"
ClientDisconnected = "ClientDisconnected"
)
type apiFuncType func(ApiCallContext)
type connFuncType func(*websocket.Conn, *Sender)
type disconnFuncType func(string, *Sender)
type WebsocketApiHandler struct {
methods map[string]apiFuncType
connfunc connFuncType
disconnfunc disconnFuncType
originalReceiverName string
}
type ApiCallContext struct {
CallBy *Sender
Arguments []any
}
func MakeWebsocketApiHandler[T any](receiver *T, receiverName string) WebsocketApiHandler {
methods := make(map[string]apiFuncType)
tp := reflect.TypeOf(receiver)
if len(receiverName) == 0 {
receiverName = tp.Elem().Name()
}
var connfunc connFuncType
var disconnfunc disconnFuncType
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
if method.Type.In(0) != tp {
continue
}
if method.Name == ClientConnected {
if method.Type.NumIn() != 3 {
continue
}
if method.Type.In(1) != reflect.TypeOf((*websocket.Conn)(nil)) {
continue
}
if method.Type.In(2) != reflect.TypeOf((*Sender)(nil)) {
continue
}
funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1)
connfuncptr := (*func(*T, *websocket.Conn, *Sender))(p2)
connfunc = func(c *websocket.Conn, s *Sender) {
(*connfuncptr)(receiver, c, s)
}
} else if method.Name == ClientDisconnected {
if method.Type.NumIn() != 3 {
continue
}
if method.Type.In(1) != reflect.TypeOf("") {
continue
}
if method.Type.In(2) != reflect.TypeOf((*Sender)(nil)) {
continue
}
funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1)
disconnfuncptr := (*func(*T, string, *Sender))(p2)
disconnfunc = func(msg string, s *Sender) {
(*disconnfuncptr)(receiver, msg, s)
}
} else {
if method.Type.NumIn() != 2 {
continue
}
if method.Type.In(1) != reflect.TypeOf((*ApiCallContext)(nil)).Elem() {
continue
}
funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1)
apifuncptr := (*func(*T, ApiCallContext))(p2)
methods[receiverName+"."+method.Name] = func(ctx ApiCallContext) {
(*apifuncptr)(receiver, ctx)
}
}
}
return WebsocketApiHandler{
methods: methods,
connfunc: connfunc,
disconnfunc: disconnfunc,
originalReceiverName: tp.Elem().Name(),
}
}
type WebsocketApiBroker struct {
methods map[string]apiFuncType
methods_dup map[string][]apiFuncType
connFuncs []connFuncType
disconnFuncs []disconnFuncType
}
func (hc *WebsocketApiBroker) AddHandler(receiver WebsocketApiHandler) {
if hc.methods == nil {
hc.methods = make(map[string]apiFuncType)
hc.methods_dup = make(map[string][]apiFuncType)
}
for k, v := range receiver.methods {
ab := strings.Split(k, ".")
logger.Printf("ws api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k)
hc.methods_dup[k] = append(hc.methods_dup[k], v)
if len(hc.methods_dup[k]) > 1 {
chain := hc.methods_dup[k]
hc.methods[k] = func(ctx ApiCallContext) {
for _, f := range chain {
f(ctx)
}
}
} else {
hc.methods[k] = v
}
}
if receiver.connfunc != nil {
logger.Printf("ws api registered : %s.ClientConnected\n", receiver.originalReceiverName)
hc.connFuncs = append(hc.connFuncs, receiver.connfunc)
}
if receiver.disconnfunc != nil {
// disconnfunc은 역순
logger.Printf("ws api registered : %s.ClientDisconnected\n", receiver.originalReceiverName)
hc.disconnFuncs = append([]disconnFuncType{receiver.disconnfunc}, hc.disconnFuncs...)
}
}
func (hc *WebsocketApiBroker) ClientConnected(c *wsconn) {
for _, v := range hc.connFuncs {
v(c.Conn, c.sender)
}
}
func (hc *WebsocketApiBroker) ClientDisconnected(c *wsconn) {
for _, v := range hc.disconnFuncs {
v(c.closeMessage, c.sender)
}
}
func (hc *WebsocketApiBroker) Call(callby *Sender, funcname string, r io.Reader) {
if found := hc.methods[funcname]; found != nil {
var args []any
if r != nil {
dec := json.NewDecoder(r)
if err := dec.Decode(&args); err != nil {
logger.Println("WebsocketApiBroker.Call failed. decode returns err :", err)
}
}
found(ApiCallContext{
CallBy: callby,
Arguments: args,
})
} else {
logger.Println("api is not found :", funcname)
}
}

View File

@ -1,30 +0,0 @@
// package main ...
package wshandler
import (
"fmt"
"testing"
)
type TestReceiver struct {
}
func (tr *TestReceiver) Func1([]any) {
}
func (tr *TestReceiver) Func2(args []any) {
fmt.Println(args...)
}
func TestExpTable(t *testing.T) {
// src := []any{"a", 1, false}
// payload, _ := json.Marshal(src)
// tr := new(TestReceiver)
// receiver := MakeWebsocketApiHandler(tr, "test")
// var con WebsocketApiBroker
// con.AddHandler(receiver)
}

View File

@ -1,104 +0,0 @@
package wshandler
import (
"bytes"
"context"
"encoding/json"
"github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon/logger"
)
type room struct {
inChan chan *wsconn
outChan chan primitive.ObjectID
messageChan chan *UpstreamMessage
name string
destroyChan chan<- string
sendMsgChan chan<- send_msg_queue_elem
}
// 만약 destroyChan가 nil이면 room이 비어도 파괴되지 않는다. 영구 유지되는 room
func makeRoom(name string, destroyChan chan<- string, sendMsgChan chan<- send_msg_queue_elem) *room {
return &room{
inChan: make(chan *wsconn, 10),
outChan: make(chan primitive.ObjectID, 10),
messageChan: make(chan *UpstreamMessage, 100),
name: name,
destroyChan: destroyChan,
sendMsgChan: sendMsgChan,
}
}
func (r *room) broadcast(msg *UpstreamMessage) {
r.messageChan <- msg
}
func (r *room) in(conn *wsconn) *room {
r.inChan <- conn
return r
}
func (r *room) out(accid primitive.ObjectID) *room {
r.outChan <- accid
return r
}
func (r *room) start(ctx context.Context) {
go func(ctx context.Context) {
conns := make(map[string]*wsconn)
normal := false
for !normal {
normal = r.loop(ctx, &conns)
}
}(ctx)
}
func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd bool) {
defer func() {
s := recover()
if s != nil {
logger.Error(s)
normalEnd = false
}
}()
tag := "#" + r.name
for {
select {
case <-ctx.Done():
return true
case conn := <-r.inChan:
(*conns)[conn.sender.Accid.Hex()] = conn
case accid := <-r.outChan:
delete((*conns), accid.Hex())
if len(*conns) == 0 && r.destroyChan != nil {
r.destroyChan <- r.name
return true
}
case msg := <-r.messageChan:
ds := DownstreamMessage{
Alias: msg.Alias,
Body: msg.Body,
Tag: append(msg.Tag, tag),
}
buff := new(bytes.Buffer)
enc := json.NewEncoder(buff)
enc.SetEscapeHTML(false)
enc.Encode(ds)
for _, conn := range *conns {
r.sendMsgChan <- send_msg_queue_elem{
to: conn,
mt: websocket.TextMessage,
msg: buff.Bytes(),
}
}
}
}
}

View File

@ -1,490 +1,515 @@
package wshandler package wshandler
import ( import (
"bytes"
"context" "context"
"encoding/base64"
"encoding/gob"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"os"
"strings" "strings"
"sync" "sync"
"time"
"go.mongodb.org/mongo-driver/bson/primitive" common "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/session"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
) )
var noAuthFlag = flagx.Bool("noauth", false, "") var noSessionFlag = flagx.Bool("nosession", false, "nosession=[true|false]")
type wsconn struct { const (
connStateCachePrefix = "conn_state_"
connStateScript = `
local hosts = redis.call('keys',KEYS[1])
for index, key in ipairs(hosts) do
local ok = redis.call('hexists', key, KEYS[2])
if ok == 1 then
return redis.call('hget', key, KEYS[2])
end
end
return ""
`
)
var ConnStateCacheKey = func() string {
hn, _ := os.Hostname()
return connStateCachePrefix + hn
}()
type Richconn struct {
*websocket.Conn *websocket.Conn
sender *Sender lock sync.Mutex
closeMessage string alias primitive.ObjectID
tags []string
onClose map[string]func()
} }
type UpstreamMessage struct { func (rc *Richconn) AddTag(name, val string) {
Alias string rc.lock.Lock()
Accid primitive.ObjectID defer rc.lock.Unlock()
Target string
Body any prefix := name + "="
Tag []string for i, tag := range rc.tags {
if strings.HasPrefix(tag, prefix) {
rc.tags[i] = prefix + val
return
}
}
rc.tags = append(rc.tags, prefix+val)
} }
type DownstreamMessage struct { func (rc *Richconn) GetTag(name string) string {
Alias string `json:",omitempty"` rc.lock.Lock()
Body any `json:",omitempty"` defer rc.lock.Unlock()
Tag []string `json:",omitempty"`
prefix := name + "="
for _, tag := range rc.tags {
if strings.HasPrefix(tag, prefix) {
return tag[len(prefix):]
}
}
return ""
} }
type commandType string func (rc *Richconn) RemoveTag(name string, val string) {
rc.lock.Lock()
defer rc.lock.Unlock()
const ( whole := fmt.Sprintf("%s=%s", name, val)
commandType_EnterRoom = commandType("enter_room") for i, tag := range rc.tags {
commandType_LeaveRoom = commandType("leave_room") if tag == whole {
) if i == 0 && len(rc.tags) == 1 {
rc.tags = nil
type commandMessage struct { } else {
Cmd commandType lastidx := len(rc.tags) - 1
Args []any if i < lastidx {
rc.tags[i] = rc.tags[lastidx]
}
rc.tags = rc.tags[:lastidx]
}
return
}
}
} }
type WebSocketMessageType int func (rc *Richconn) RegistOnCloseFunc(name string, f func()) {
rc.lock.Lock()
defer rc.lock.Unlock()
const ( if rc.onClose == nil {
TextMessage = WebSocketMessageType(websocket.TextMessage) f()
BinaryMessage = WebSocketMessageType(websocket.BinaryMessage) return
CloseMessage = WebSocketMessageType(websocket.CloseMessage) }
PingMessage = WebSocketMessageType(websocket.PingMessage) rc.onClose[name] = f
PongMessage = WebSocketMessageType(websocket.PongMessage)
)
type Sender struct {
Accid primitive.ObjectID
Alias string
} }
type EventReceiver interface { func (rc *Richconn) HasOnCloseFunc(name string) bool {
OnClientMessageReceived(sender *Sender, messageType WebSocketMessageType, body io.Reader) rc.lock.Lock()
OnRoomCreated(name string) defer rc.lock.Unlock()
OnRoomDestroyed(name string)
if rc.onClose == nil {
return false
} }
type send_msg_queue_elem struct { _, ok := rc.onClose[name]
to *wsconn return ok
mt int
msg []byte
} }
type WebsocketHandler struct { func (rc *Richconn) UnregistOnCloseFunc(name string) (out func()) {
WebsocketApiBroker rc.lock.Lock()
defer rc.lock.Unlock()
redisMsgChanName string if rc.onClose == nil {
redisCmdChanName string return
}
out = rc.onClose[name]
delete(rc.onClose, name)
return
}
func (rc *Richconn) WriteBytes(data []byte) error {
rc.lock.Lock()
defer rc.lock.Unlock()
return rc.WriteMessage(websocket.TextMessage, data)
}
type DeliveryMessage struct {
Alias primitive.ObjectID
Body []byte
Command string
Conn *Richconn
}
func (dm *DeliveryMessage) Parse(out any) error {
return json.Unmarshal(dm.Body, out)
}
func (dm *DeliveryMessage) MarshalBinary() (data []byte, err error) {
return append(dm.Alias[:], dm.Body...), nil
}
func (dm *DeliveryMessage) UnmarshalBinary(data []byte) error {
copy(dm.Alias[:], data[:12])
dm.Body = data[12:]
return nil
}
type tagconn struct {
rc *Richconn
state string
}
type tagconnsmap = map[primitive.ObjectID]*tagconn
type tagconns struct {
sync.Mutex
tagconnsmap
}
type subhandler struct {
sync.Mutex
authCache *common.AuthCollection
conns map[primitive.ObjectID]*Richconn
aliases map[primitive.ObjectID]primitive.ObjectID
tags map[primitive.ObjectID]*tagconns
deliveryChan chan DeliveryMessage
url string
redisSync *redis.Client redisSync *redis.Client
connInOutChan chan *wsconn }
deliveryChan chan any
localDeliveryChan chan any
sendMsgChan chan send_msg_queue_elem
connWaitGroup sync.WaitGroup // WebsocketHandler :
sessionConsumer session.Consumer type WebsocketHandler struct {
authCaches map[string]*subhandler
RedisSync *redis.Client
} }
type wsConfig struct { type wsConfig struct {
gocommon.StorageAddr `json:"storage"` SyncPipeline string `json:"ws_sync_pipeline"`
} }
func init() { func NewWebsocketHandler(authglobal common.AuthCollectionGlobal) (wsh *WebsocketHandler) {
gob.Register(UpstreamMessage{}) authCaches := make(map[string]*subhandler)
gob.Register(commandMessage{}) for _, region := range authglobal.Regions() {
gob.Register(map[string]any{}) sh := &subhandler{
gob.Register(primitive.A{}) authCache: authglobal.Get(region),
gob.Register(primitive.M{}) conns: make(map[primitive.ObjectID]*Richconn),
gob.Register(primitive.D{}) aliases: make(map[primitive.ObjectID]primitive.ObjectID),
gob.Register(primitive.ObjectID{}) tags: make(map[primitive.ObjectID]*tagconns),
gob.Register([]any{}) deliveryChan: make(chan DeliveryMessage, 1000),
}
authCaches[region] = sh
} }
func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*WebsocketHandler, error) {
var config wsConfig var config wsConfig
if err := gocommon.LoadConfig(&config); err != nil { common.LoadConfig(&config)
return nil, err
}
redisSync, err := gocommon.NewRedisClient(redisUrl) redisSync, err := common.NewRedisClient(config.SyncPipeline, 0)
if err != nil { if err != nil {
return nil, logger.ErrorWithCallStack(err) panic(err)
} }
sendchan := make(chan send_msg_queue_elem, 1000)
go func() {
sender := func(elem *send_msg_queue_elem) {
defer func() {
r := recover()
if r != nil {
logger.Println("send_msg_queue_elem sender recover :", r, string(elem.msg))
}
}()
if elem == nil {
return
}
if elem.to == nil {
return
}
elem.to.WriteMessage(elem.mt, elem.msg)
}
for elem := range sendchan {
sender(&elem)
}
}()
return &WebsocketHandler{ return &WebsocketHandler{
redisMsgChanName: fmt.Sprintf("_wsh_msg_%d", redisSync.Options().DB), authCaches: authCaches,
redisCmdChanName: fmt.Sprintf("_wsh_cmd_%d", redisSync.Options().DB), RedisSync: redisSync,
redisSync: redisSync, }
connInOutChan: make(chan *wsconn),
deliveryChan: make(chan any, 1000),
localDeliveryChan: make(chan any, 100),
sendMsgChan: sendchan,
sessionConsumer: consumer,
}, nil
} }
func (ws *WebsocketHandler) Start(ctx context.Context) { func (ws *WebsocketHandler) Destructor() {
ws.connWaitGroup.Add(1) if ws.RedisSync != nil {
go ws.mainLoop(ctx) ws.RedisSync.Del(context.Background(), ConnStateCacheKey)
}
} }
func (ws *WebsocketHandler) Cleanup() { func (ws *WebsocketHandler) DeliveryChannel(region string) <-chan DeliveryMessage {
ws.connWaitGroup.Wait() return ws.authCaches[region].deliveryChan
} }
func (ws *WebsocketHandler) RegisterHandlers(serveMux *http.ServeMux, prefix string) error { func (ws *WebsocketHandler) Conn(region string, alias primitive.ObjectID) *Richconn {
url := gocommon.MakeHttpHandlerPattern(prefix, "ws") if sh := ws.authCaches[region]; sh != nil {
if *noAuthFlag { return sh.conns[alias]
serveMux.HandleFunc(url, ws.upgrade_nosession) }
return nil
}
func (ws *WebsocketHandler) JoinTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, rc *Richconn, hint string) error {
if sh := ws.authCaches[region]; sh != nil {
sh.joinTag(tag, tid, rc, hint)
}
return nil
}
func (ws *WebsocketHandler) LeaveTag(region string, tag primitive.ObjectID, tid primitive.ObjectID) error {
if sh := ws.authCaches[region]; sh != nil {
sh.leaveTag(tag, tid)
}
return nil
}
func (ws *WebsocketHandler) SetStateInTag(region string, tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) error {
if sh := ws.authCaches[region]; sh != nil {
sh.setStateInTag(tag, tid, state, hint)
}
return nil
}
func (ws *WebsocketHandler) BroadcastRaw(region string, tag primitive.ObjectID, raw []byte) {
if sh := ws.authCaches[region]; sh != nil {
if cs := sh.cloneTag(tag); len(cs) > 0 {
go func(raw []byte) {
for _, c := range cs {
if c != nil {
c.WriteBytes(raw)
}
}
}(raw)
}
}
}
func (ws *WebsocketHandler) Broadcast(region string, tag primitive.ObjectID, doc bson.M) {
raw, _ := json.Marshal(doc)
ws.BroadcastRaw(region, tag, raw)
}
var onlineQueryScriptHash string
func (ws *WebsocketHandler) RegisterHandlers(ctx context.Context, serveMux *http.ServeMux, prefix string) error {
ws.RedisSync.Del(context.Background(), ConnStateCacheKey)
scriptHash, err := ws.RedisSync.ScriptLoad(context.Background(), connStateScript).Result()
if err != nil {
return err
}
onlineQueryScriptHash = scriptHash
for region, sh := range ws.authCaches {
if region == "default" {
region = ""
}
sh.url = common.MakeHttpHandlerPattern(prefix, region, "ws")
sh.redisSync = ws.RedisSync
if *noSessionFlag {
serveMux.HandleFunc(sh.url, sh.upgrade_nosession)
} else { } else {
serveMux.HandleFunc(url, ws.upgrade) serveMux.HandleFunc(sh.url, sh.upgrade)
}
} }
return nil return nil
} }
func (ws *WebsocketHandler) SendUpstreamMessage(msg *UpstreamMessage) { func (sh *subhandler) cloneTag(tag primitive.ObjectID) (out []*Richconn) {
ws.localDeliveryChan <- msg sh.Lock()
cs := sh.tags[tag]
sh.Unlock()
if cs == nil {
return nil
} }
func (ws *WebsocketHandler) EnterRoom(room string, accid primitive.ObjectID) { cs.Lock()
ws.localDeliveryChan <- &commandMessage{ defer cs.Unlock()
Cmd: commandType_EnterRoom,
Args: []any{room, accid},
}
}
func (ws *WebsocketHandler) LeaveRoom(room string, accid primitive.ObjectID) { out = make([]*Richconn, 0, len(cs.tagconnsmap))
ws.localDeliveryChan <- &commandMessage{ for _, c := range cs.tagconnsmap {
Cmd: commandType_LeaveRoom, out = append(out, c.rc)
Args: []any{room, accid},
} }
}
func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
defer func() {
ws.connWaitGroup.Done()
s := recover()
if s != nil {
logger.Error(s)
}
}()
// redis channel에서 유저가 보낸 메시지를 읽는 go rountine
go func() {
var pubsub *redis.PubSub
waittimer := time.Second
for {
if pubsub == nil {
pubsub = ws.redisSync.Subscribe(ctx, ws.redisMsgChanName, ws.redisCmdChanName)
}
raw, err := pubsub.ReceiveMessage(ctx)
if err == nil {
waittimer = time.Second
buffer := bytes.NewBuffer([]byte(raw.Payload))
dec := gob.NewDecoder(buffer)
if raw.Channel == ws.redisMsgChanName {
var msg UpstreamMessage
if err := dec.Decode(&msg); err == nil {
ws.deliveryChan <- &msg
} else {
logger.Println("decode UpstreamMessage failed :", err)
}
} else if raw.Channel == ws.redisCmdChanName {
var cmd commandMessage
if err := dec.Decode(&cmd); err == nil {
ws.deliveryChan <- &cmd
} else {
logger.Println("decode UpstreamMessage failed :", err)
}
}
} else {
logger.Println("pubsub.ReceiveMessage failed :", err)
pubsub.Close()
pubsub = nil
time.Sleep(waittimer)
waittimer = waittimer * 2
if waittimer > time.Minute {
waittimer = time.Minute
}
if ctx.Err() != nil {
break
}
}
}
}()
entireConns := make(map[string]*wsconn)
rooms := make(map[string]*room)
roomDestroyChan := make(chan string, 1000)
findRoom := func(name string, create bool) *room {
room := rooms[name]
if room == nil && create {
room = makeRoom(name, roomDestroyChan, ws.sendMsgChan)
rooms[name] = room
room.start(ctx)
//go ws.callReceiver.OnRoomCreated(name)
}
return room
}
defer func() {
for _, conn := range entireConns {
ws.Call(conn.sender, ClientDisconnected, nil)
conn.Close()
}
}()
errProcessFailed_NotInRoom := errors.New("not in room")
processPrivateUpstreamMessage := func(usermsg *UpstreamMessage) bool {
target := usermsg.Target
if target[0] == '#' {
return false
}
roomindex := strings.IndexByte(target, '@')
var accid string
var roomid string
if roomindex > 0 {
accid = target[:roomindex]
roomid = target[roomindex+1:]
} else {
accid = target
}
conn := entireConns[accid]
if conn == nil {
return false
}
if len(roomid) > 0 {
if room := findRoom(roomid, false); room == nil {
return false
}
}
// conn에게 메시지 보내면 처리 완료
ds, _ := json.Marshal(DownstreamMessage{
Alias: usermsg.Alias,
Body: usermsg.Body,
Tag: usermsg.Tag,
})
ws.sendMsgChan <- send_msg_queue_elem{
to: conn,
mt: websocket.TextMessage,
msg: ds,
}
return true
}
errCommandArgumentNotCorrect := errors.New("command arguments are not correct")
processCommandMessage := func(usermsg *commandMessage) (bool, error) {
switch usermsg.Cmd {
case commandType_EnterRoom:
if len(usermsg.Args) != 2 {
return false, errCommandArgumentNotCorrect
}
roomName := usermsg.Args[0].(string)
accid := usermsg.Args[1].(primitive.ObjectID)
conn := entireConns[accid.Hex()]
if conn == nil {
return false, nil
}
findRoom(roomName, true).in(conn)
case commandType_LeaveRoom:
if len(usermsg.Args) != 2 {
return false, errCommandArgumentNotCorrect
}
roomName := usermsg.Args[0].(string)
accid := usermsg.Args[1].(primitive.ObjectID)
room := findRoom(roomName, false)
if room == nil {
return false, errProcessFailed_NotInRoom
}
room.out(accid)
}
return true, nil
}
// 유저에게서 온 메세지, 소켓 연결/해체 처리
for {
buffer := bytes.NewBuffer(make([]byte, 0, 1024))
buffer.Reset()
select {
case <-ctx.Done():
return return
case destroyedRoom := <-roomDestroyChan:
delete(rooms, destroyedRoom)
//go ws.callReceiver.OnRoomDestroyed(destroyedRoom)
case usermsg := <-ws.localDeliveryChan:
// 로컬에 connection이 있는지 먼저 확인해 보기 위한 채널
// 없으면 publish한다.
switch usermsg := usermsg.(type) {
case *UpstreamMessage:
if !processPrivateUpstreamMessage(usermsg) {
// private메시지가 처리가 안됐으므로 publish
enc := gob.NewEncoder(buffer)
var err error
if err = enc.Encode(usermsg); err == nil {
_, err = ws.redisSync.Publish(context.Background(), ws.redisMsgChanName, buffer.Bytes()).Result()
} }
if err != nil { func (sh *subhandler) joinTag(tag primitive.ObjectID, tid primitive.ObjectID, rc *Richconn, hint string) {
logger.Println("gob.Encode or publish failed :", err) sh.Lock()
cs := sh.tags[tag]
if cs == nil {
cs = &tagconns{
tagconnsmap: make(map[primitive.ObjectID]*tagconn),
}
}
sh.Unlock()
cs.Lock()
states := make([]bson.M, 0, len(cs.tagconnsmap))
for tid, conn := range cs.tagconnsmap {
states = append(states, bson.M{
"_id": tid,
"_hint": hint,
"state": conn.state,
})
}
cs.tagconnsmap[tid] = &tagconn{rc: rc}
cs.Unlock()
sh.Lock()
sh.tags[tag] = cs
sh.Unlock()
if len(states) > 0 {
s, _ := json.Marshal(states)
rc.WriteBytes(s)
} }
} }
case *commandMessage: func (sh *subhandler) leaveTag(tag primitive.ObjectID, tid primitive.ObjectID) {
processed, err := processCommandMessage(usermsg) sh.Lock()
if err != nil { defer sh.Unlock()
logger.Println("processCommandMessage returns err :", err)
break cs := sh.tags[tag]
if cs == nil {
return
} }
if !processed { delete(cs.tagconnsmap, tid)
var err error if len(cs.tagconnsmap) == 0 {
enc := gob.NewEncoder(buffer) delete(sh.tags, tag)
if err = enc.Encode(usermsg); err == nil {
_, err = ws.redisSync.Publish(context.Background(), ws.redisCmdChanName, buffer.Bytes()).Result()
}
if err != nil {
logger.Println("gob.Encode or Publish failed :", err)
}
}
}
case usermsg := <-ws.deliveryChan:
switch usermsg := usermsg.(type) {
case *UpstreamMessage:
target := usermsg.Target
if target[0] == '#' {
// 룸에 브로드캐스팅
roomName := target[1:]
if room := findRoom(roomName, false); room != nil {
room.broadcast(usermsg)
}
} else { } else {
processPrivateUpstreamMessage(usermsg) sh.tags[tag] = cs
}
} }
case *commandMessage: func (sh *subhandler) setStateInTag(tag primitive.ObjectID, tid primitive.ObjectID, state string, hint string) {
_, err := processCommandMessage(usermsg) sh.Lock()
cs := sh.tags[tag]
sh.Unlock()
if cs == nil {
return
}
cs.Lock()
defer cs.Unlock()
if tagconn := cs.tagconnsmap[tid]; tagconn != nil {
tagconn.state = state
var clone []*Richconn
for _, c := range cs.tagconnsmap {
clone = append(clone, c.rc)
}
raw, _ := json.Marshal(map[string]any{
"_id": tid,
"_hint": hint,
"state": state,
})
go func(raw []byte) {
for _, c := range clone {
c.WriteBytes(raw)
}
}(raw)
}
}
func (wsh *WebsocketHandler) GetState(alias primitive.ObjectID) (string, error) {
state, err := wsh.RedisSync.EvalSha(context.Background(), onlineQueryScriptHash, []string{
connStateCachePrefix + "*", alias.Hex(),
}).Result()
if err != nil { if err != nil {
logger.Println("processCommandMessage returns err :", err) return "", err
break
} }
default: return state.(string), nil
logger.Println("usermsg is unknown type")
} }
case c := <-ws.connInOutChan: func (wsh *WebsocketHandler) IsOnline(alias primitive.ObjectID) (bool, error) {
if c.Conn == nil { state, err := wsh.GetState(alias)
delete(entireConns, c.sender.Accid.Hex()) if err != nil {
logger.Println("ClientDisconnected :", c.sender.Alias) logger.Error("IsOnline failed. err :", err)
go ws.ClientDisconnected(c) return false, err
} else {
entireConns[c.sender.Accid.Hex()] = c
logger.Println("ClientConnected :", c.sender.Alias)
go ws.ClientConnected(c)
} }
return len(state) > 0, nil
}
func (sh *subhandler) closeConn(accid primitive.ObjectID) {
sh.Lock()
defer sh.Unlock()
if alias, ok := sh.aliases[accid]; ok {
if old := sh.conns[alias]; old != nil {
old.Close()
} }
} }
} }
func upgrade_core(ws *WebsocketHandler, conn *websocket.Conn, accid primitive.ObjectID, alias string) { func (sh *subhandler) addConn(conn *Richconn, accid primitive.ObjectID) {
newconn := &wsconn{ sh.Lock()
Conn: conn, defer sh.Unlock()
sender: &Sender{
Alias: alias,
Accid: accid,
},
}
ws.connInOutChan <- newconn
ws.connWaitGroup.Add(1) sh.conns[conn.alias] = conn
go func(c *wsconn, accid primitive.ObjectID, deliveryChan chan<- any) { sh.aliases[accid] = conn.alias
}
func upgrade_core(sh *subhandler, conn *websocket.Conn, initState string, accid primitive.ObjectID, alias primitive.ObjectID) {
sh.closeConn(accid)
newconn := sh.makeRichConn(alias, conn)
sh.addConn(newconn, accid)
sh.redisSync.HSet(context.Background(), ConnStateCacheKey, alias.Hex(), initState).Result()
go func(c *Richconn, accid primitive.ObjectID, deliveryChan chan<- DeliveryMessage) {
for { for {
messageType, r, err := c.NextReader() mt, p, err := c.ReadMessage()
if err != nil { if err != nil {
if ce, ok := err.(*websocket.CloseError); ok {
c.closeMessage = ce.Text
}
c.Close() c.Close()
break break
} }
if messageType == websocket.CloseMessage { switch mt {
closeMsg, _ := io.ReadAll(r) case websocket.BinaryMessage:
logger.Println("close message :", string(closeMsg)) msg := DeliveryMessage{
break Alias: c.alias,
Body: p,
Conn: c,
}
deliveryChan <- msg
case websocket.TextMessage:
msg := string(p)
opcodes := strings.Split(msg, ";")
for _, opcode := range opcodes {
if strings.HasPrefix(opcode, "ps:") {
sh.redisSync.HSet(context.Background(), ConnStateCacheKey, alias.Hex(), opcode[3:]).Result()
} else if strings.HasPrefix(opcode, "cmd:") {
cmd := opcode[4:]
msg := DeliveryMessage{
Alias: c.alias,
Command: cmd,
Conn: c,
}
deliveryChan <- msg
}
} }
if messageType == websocket.BinaryMessage {
var size [1]byte
r.Read(size[:])
cmd := make([]byte, size[0])
r.Read(cmd)
ws.Call(newconn.sender, string(cmd), r)
} }
} }
ws.connWaitGroup.Done()
c.Conn = nil
ws.connInOutChan <- c
}(newconn, accid, ws.deliveryChan)
}
func (ws *WebsocketHandler) upgrade_nosession(w http.ResponseWriter, r *http.Request) { sh.redisSync.HDel(context.Background(), ConnStateCacheKey, c.alias.Hex()).Result()
sh.Lock()
delete(sh.conns, c.alias)
delete(sh.aliases, accid)
sh.Unlock()
var funcs []func()
c.lock.Lock()
for _, f := range c.onClose {
funcs = append(funcs, f)
}
c.onClose = nil
c.lock.Unlock()
for _, f := range funcs {
f()
}
}(newconn, accid, sh.deliveryChan)
}
func (sh *subhandler) upgrade_nosession(w http.ResponseWriter, r *http.Request) {
// 클라이언트 접속 // 클라이언트 접속
defer func() { defer func() {
s := recover() s := recover()
@ -501,6 +526,10 @@ func (ws *WebsocketHandler) upgrade_nosession(w http.ResponseWriter, r *http.Req
return return
} }
if auth[0] != "Editor" {
w.WriteHeader(http.StatusBadRequest)
return
}
temp, err := hex.DecodeString(auth[1]) temp, err := hex.DecodeString(auth[1])
if err != nil { if err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
@ -522,18 +551,20 @@ func (ws *WebsocketHandler) upgrade_nosession(w http.ResponseWriter, r *http.Req
return return
} }
var alias string alias := accid
if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 { if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 {
vt, _ := base64.StdEncoding.DecodeString(v) alias = common.ParseObjectID(v)
alias = string(vt)
} else {
alias = accid.Hex()
} }
upgrade_core(ws, conn, accid, alias) initState := r.Header.Get("As-X-Tavern-InitialState")
if len(initState) == 0 {
initState = "online"
} }
func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) { upgrade_core(sh, conn, initState, accid, alias)
}
func (sh *subhandler) upgrade(w http.ResponseWriter, r *http.Request) {
// 클라이언트 접속 // 클라이언트 접속
defer func() { defer func() {
s := recover() s := recover()
@ -545,11 +576,17 @@ func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
}() }()
sk := r.Header.Get("AS-X-SESSION") sk := r.Header.Get("AS-X-SESSION")
logger.Println("WebsocketHandler.upgrade sk :", sk) auth := strings.Split(r.Header.Get("Authorization"), " ")
authinfo, err := ws.sessionConsumer.Query(sk) if len(auth) != 2 {
if err != nil { //TODO : 클라이언트는 BadRequest를 받으면 로그인 화면으로 돌아가야 한다.
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusBadRequest)
logger.Error("authorize query failed :", err) return
}
authtoken := auth[1]
accid, success := sh.authCache.IsValid(sk, authtoken)
if !success {
w.WriteHeader(http.StatusUnauthorized)
return return
} }
@ -560,13 +597,24 @@ func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
return return
} }
var alias string alias := accid
if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 { if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 {
vt, _ := base64.StdEncoding.DecodeString(v) alias = common.ParseObjectID(v)
alias = string(vt)
} else {
alias = authinfo.Account.Hex()
} }
upgrade_core(ws, conn, authinfo.Account, alias) initState := r.Header.Get("As-X-Tavern-InitialState")
if len(initState) == 0 {
initState = "online"
}
upgrade_core(sh, conn, initState, accid, alias)
}
func (sh *subhandler) makeRichConn(alias primitive.ObjectID, conn *websocket.Conn) *Richconn {
rc := Richconn{
Conn: conn,
alias: alias,
onClose: make(map[string]func()),
}
return &rc
} }