Compare commits

12 Commits

27 changed files with 549 additions and 3651 deletions

View File

@ -30,7 +30,7 @@ type Authinfo struct {
}
const (
sessionSyncChannelNamePrefix = "session-sync-channel2"
sessionSyncChannelName = "session-sync-channel2"
)
type AuthinfoCell interface {
@ -99,8 +99,6 @@ func (ac *redisAuthCell) ToBytes() []byte {
func newAuthCollectionWithRedis(redisClient *redis.Client, subctx context.Context, maingateURL string, apiToken string) *AuthCollection {
sessionTTL := int64(3600)
ac := MakeAuthCollection(time.Duration(sessionTTL * int64(time.Second)))
sessionSyncChannelName := fmt.Sprintf("%s-%d", sessionSyncChannelNamePrefix, redisClient.Options().DB)
pubsub := redisClient.Subscribe(subctx, sessionSyncChannelName)
ctx, cancel := context.WithCancel(context.TODO())
go func(ctx context.Context, sub *redis.PubSub, authCache *AuthCollection) {
@ -207,7 +205,7 @@ func (acg *AuthCollectionGlobal) Reload(context context.Context) error {
for r, url := range config.RegionStorage {
if _, ok := oldval[r]; !ok {
// 새로 생겼네
redisClient, err := NewRedisClient(url.Redis["session"])
redisClient, err := NewRedisClient(url.Redis.URL, url.Redis.Offset["session"])
if err != nil {
return err
}
@ -230,7 +228,7 @@ func NewAuthCollectionGlobal(context context.Context, apiToken string) (AuthColl
output := make(map[string]*AuthCollection)
for region, url := range config.RegionStorage {
redisClient, err := NewRedisClient(url.Redis["session"])
redisClient, err := NewRedisClient(url.Redis.URL, url.Redis.Offset["session"])
if err != nil {
return AuthCollectionGlobal{}, err
}

View File

@ -18,6 +18,8 @@ import (
"repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger"
"go.mongodb.org/mongo-driver/bson"
)
var linkupdate = flagx.String("updatelink", "", "")
@ -237,3 +239,17 @@ func ReplyUpdateComplete() {
// return (uint32(b[0]) << 0) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
// }
type BsonMarshaler[T any] struct {
val T
}
func NewBsonMarshaler[T any](val T) *BsonMarshaler[T] {
return &BsonMarshaler[T]{
val: val,
}
}
func (m *BsonMarshaler[T]) MarshalBinary() (data []byte, err error) {
return bson.Marshal(m.val)
}

View File

@ -1,7 +1,6 @@
package flagx
import (
"encoding"
"flag"
"fmt"
"os"
@ -125,9 +124,6 @@ func Duration(name string, value time.Duration, usage string) *time.Duration {
return findProperFlagSet(name).Duration(name, value, usage)
}
func TextVar(p encoding.TextUnmarshaler, name string, value encoding.TextMarshaler, usage string) {
findProperFlagSet(name).TextVar(p, name, value, usage)
}
func Func(name, usage string, fn func(string) error) {
findProperFlagSet(name).Func(name, usage, fn)
}

19
go.mod
View File

@ -1,36 +1,29 @@
module repositories.action2quare.com/ayo/gocommon
go 1.20
go 1.18
replace repositories.action2quare.com/ayo/gocommon => ./
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/gorilla/websocket v1.5.0
github.com/pires/go-proxyproto v0.7.0
github.com/prometheus/client_golang v1.17.0
go.mongodb.org/mongo-driver v1.11.6
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/text v0.9.0
golang.org/x/text v0.3.7
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.1 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.11.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
)

43
go.sum
View File

@ -1,7 +1,5 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -12,26 +10,19 @@ github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
@ -43,15 +34,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@ -69,28 +51,21 @@ go.mongodb.org/mongo-driver v1.11.6 h1:XM7G6PjiGAO5betLF13BIa5TlLUUE3uJ/2Ox3Lz1K
go.mongodb.org/mongo-driver v1.11.6/go.mod h1:G9TgswdsWjX4tmDA5zfs2+6AEPpYJwqblyjsfuh8oXY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=

View File

@ -16,7 +16,7 @@ var txSetArgs redis.SetArgs = redis.SetArgs{
}
type LockerWithRedis struct {
locked_key string
key string
}
var ErrTransactionLocked = errors.New("transaction is already locked")
@ -34,10 +34,10 @@ func (locker *LockerWithRedis) Lock(rc *redis.Client, key string) error {
return ErrTransactionLocked
}
locker.locked_key = key
locker.key = key
return nil
}
func (locker *LockerWithRedis) Unlock(rc *redis.Client) {
rc.Del(context.Background(), locker.locked_key).Result()
rc.Del(context.Background(), locker.key).Result()
}

View File

@ -7,12 +7,12 @@ import (
"log"
"os"
"path"
"runtime"
"runtime/debug"
"strings"
)
var stdlogger *log.Logger
var errlogger *log.Logger
var _ = flag.Bool("logfile", false, "")
func init() {
@ -20,7 +20,9 @@ func init() {
binname := path.Base(strings.ReplaceAll(binpath, "\\", "/"))
var outWriter io.Writer
var errWriter io.Writer
outWriter = os.Stdout
errWriter = os.Stderr
args := os.Args
useLogFile := false
@ -44,9 +46,11 @@ func init() {
}
outWriter = io.MultiWriter(outWriter, logFile)
errWriter = io.MultiWriter(errWriter, logFile)
}
stdlogger = log.New(outWriter, "", log.LstdFlags)
errlogger = log.New(errWriter, "", log.LstdFlags)
}
func Println(v ...interface{}) {
@ -58,83 +62,42 @@ func Printf(format string, v ...interface{}) {
}
func Error(v ...interface{}) {
stdlogger.Output(2, fmt.Sprintln(v...))
stdlogger.Output(2, string(debug.Stack()))
errlogger.Output(2, fmt.Sprintln(v...))
errlogger.Output(2, string(debug.Stack()))
}
func Errorf(format string, v ...interface{}) {
stdlogger.Output(2, fmt.Sprintf(format, v...))
stdlogger.Output(2, string(debug.Stack()))
errlogger.Output(2, fmt.Sprintf(format, v...))
errlogger.Output(2, string(debug.Stack()))
}
func Fatal(v ...interface{}) {
stdlogger.Output(2, fmt.Sprint(v...))
stdlogger.Output(2, string(debug.Stack()))
errlogger.Output(2, fmt.Sprint(v...))
errlogger.Output(2, string(debug.Stack()))
os.Exit(1)
}
func Fatalln(v ...interface{}) {
stdlogger.Output(2, fmt.Sprintln(v...))
stdlogger.Output(2, string(debug.Stack()))
errlogger.Output(2, fmt.Sprintln(v...))
errlogger.Output(2, string(debug.Stack()))
os.Exit(1)
}
func Panic(v ...interface{}) {
s := fmt.Sprint(v...)
stdlogger.Output(2, s)
stdlogger.Output(2, string(debug.Stack()))
errlogger.Output(2, s)
errlogger.Output(2, string(debug.Stack()))
panic(s)
}
func Panicf(format string, v ...interface{}) {
s := fmt.Sprintf(format, v...)
stdlogger.Output(2, s)
stdlogger.Output(2, string(debug.Stack()))
errlogger.Output(2, s)
errlogger.Output(2, string(debug.Stack()))
panic(s)
}
func Panicln(v ...interface{}) {
s := fmt.Sprintln(v...)
stdlogger.Output(2, s)
stdlogger.Output(2, string(debug.Stack()))
errlogger.Output(2, s)
errlogger.Output(2, string(debug.Stack()))
panic(s)
}
type errWithCallstack struct {
inner error
frames []*runtime.Frame
}
func (ecs *errWithCallstack) Error() string {
if ecs.frames == nil {
return ecs.inner.Error()
}
out := make([]string, 0, len(ecs.frames)+1)
out = append(out, ecs.inner.Error())
for i := len(ecs.frames) - 1; i >= 0; i-- {
frame := ecs.frames[i]
out = append(out, fmt.Sprintf("%s\n\t%s:%d", frame.Function, frame.File, frame.Line))
}
return strings.Join(out, "\n")
}
func ErrorWithCallStack(err error) error {
var frames []*runtime.Frame
if recur, ok := err.(*errWithCallstack); ok {
err = recur.inner
frames = recur.frames
}
pc, _, _, ok := runtime.Caller(1)
if ok {
curframes := runtime.CallersFrames([]uintptr{pc})
f, _ := curframes.Next()
frames = append(frames, &f)
}
return &errWithCallstack{
inner: err,
frames: frames,
}
}

View File

@ -1,161 +0,0 @@
package metric
import (
"crypto/md5"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
"path"
"sort"
"strings"
"sync/atomic"
"repositories.action2quare.com/ayo/gocommon/logger"
)
const metric_value_line_size = 19
type MetricDescription struct {
Key string
Type MetricType
Name string `json:",omitempty"`
Help string `json:",omitempty"`
ConstLabels map[string]string `json:",omitempty"`
}
type Exporter interface {
RegisterMetric(*MetricDescription)
UpdateMetric(string, float64)
Shutdown()
}
type MetricWriter interface {
Add(int64)
Set(int64)
}
type metric_empty struct{}
func (mw *metric_empty) Set(int64) {}
func (mw *metric_empty) Add(int64) {}
var MetricWriterNil = MetricWriter(&metric_empty{})
type metric_int64 struct {
valptr *int64
buff [metric_value_line_size]byte
}
func (mw *metric_int64) printOut() {
binary.LittleEndian.PutUint64(mw.buff[9:], math.Float64bits(float64(atomic.LoadInt64(mw.valptr))))
os.Stdout.Write(mw.buff[:])
}
func (mw *metric_int64) Set(newval int64) {
atomic.StoreInt64(mw.valptr, newval)
mw.printOut()
}
func (mw *metric_int64) Add(inc int64) {
atomic.AddInt64(mw.valptr, inc)
mw.printOut()
}
func NewMetric(mt MetricType, name string, help string, constLabels map[string]string) (writer MetricWriter) {
if !metricEnabled {
return MetricWriterNil
}
var disorder []struct {
k string
v string
}
for k, v := range constLabels {
disorder = append(disorder, struct {
k string
v string
}{k: strings.ToLower(k), v: strings.ToLower(v)})
}
sort.Slice(disorder, func(i, j int) bool {
return disorder[i].k < disorder[j].k
})
hash := md5.New()
hash.Write([]byte(strings.ToLower(name)))
for _, d := range disorder {
hash.Write([]byte(d.k))
hash.Write([]byte(d.v))
}
key := hex.EncodeToString(hash.Sum(nil))[:metric_key_size]
temp, _ := json.Marshal(MetricDescription{
Key: key,
Type: mt,
Name: name,
Help: help,
ConstLabels: constLabels,
})
impl := &metric_int64{
valptr: new(int64),
}
impl.buff[0] = METRIC_HEAD_INLINE
impl.buff[17] = METRIC_TAIL_INLINE
impl.buff[18] = '\n'
copy(impl.buff[1:], []byte(key))
output := append([]byte{METRIC_HEAD_INLINE}, temp...)
output = append(output, METRIC_TAIL_INLINE, '\n')
os.Stdout.Write(output)
// writer
return impl
}
func ReadMetricValue(line []byte) (string, float64) {
if len(line) < 16 {
return "", 0
}
key := string(line[0:8])
valbits := binary.LittleEndian.Uint64(line[8:])
val := math.Float64frombits(valbits)
return key, val
}
var metricEnabled = false
func init() {
if path.Base(os.Args[0]) == "houston" {
logger.Println("metrics are going to be generated for myself(houston)")
metricEnabled = true
return
}
ppid := os.Getppid()
if parent, _ := os.FindProcess(ppid); parent != nil {
filename := fmt.Sprintf(`/proc/%d/stat`, os.Getppid())
if fn, err := os.ReadFile(filename); err == nil {
stats := strings.SplitN(string(fn), " ", 3)
parentname := strings.Trim(stats[1], "()")
if path.Base(parentname) == "houston" {
logger.Println("metrics are going to be generated for houston")
metricEnabled = true
} else {
logger.Println("metrics are NOT going to be generated. parent is not houston :", filename, string(fn))
}
} else {
logger.Println("metrics are NOT going to be generated. ppid proc is missing :", filename)
}
} else {
logger.Println("metrics are NOT going to be generated. parent process is missing. ppid :", ppid)
}
}

View File

@ -1,155 +0,0 @@
package metric
import (
"context"
"math"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
"repositories.action2quare.com/ayo/gocommon/logger"
)
const (
METRIC_HEAD_INLINE = byte(14)
METRIC_TAIL_INLINE = byte(15)
)
type MetricType int
const (
MetricCounter = MetricType(1)
MetricGuage = MetricType(2)
metric_key_size = 8
)
func convertValueType(in MetricType) prometheus.ValueType {
switch in {
case MetricCounter:
return prometheus.CounterValue
case MetricGuage:
return prometheus.GaugeValue
}
return prometheus.UntypedValue
}
type writeRequest struct {
key string
val float64
}
type prometheusMetricDesc struct {
*prometheus.Desc
valueType prometheus.ValueType
valptr *uint64
key string
}
type prometheusExporter struct {
writerChan chan *writeRequest
registerChan chan *prometheusMetricDesc
namespace string
cancel context.CancelFunc
}
func (pe *prometheusExporter) RegisterMetric(nm *MetricDescription) {
pe.registerChan <- &prometheusMetricDesc{
Desc: prometheus.NewDesc(prometheus.BuildFQName(pe.namespace, "", nm.Name), nm.Help, nil, nm.ConstLabels),
valueType: convertValueType(nm.Type),
valptr: new(uint64),
key: nm.Key,
}
}
func (pe *prometheusExporter) UpdateMetric(key string, val float64) {
pe.writerChan <- &writeRequest{key: key, val: val}
}
func (pe *prometheusExporter) Shutdown() {
if pe.cancel != nil {
pe.cancel()
}
}
type prometheusCollector struct {
metrics map[string]*prometheusMetricDesc
}
func (pc *prometheusCollector) Describe(ch chan<- *prometheus.Desc) {
for _, v := range pc.metrics {
ch <- v.Desc
}
}
func (pc *prometheusCollector) Collect(ch chan<- prometheus.Metric) {
for _, v := range pc.metrics {
cm, err := prometheus.NewConstMetric(v.Desc, v.valueType, math.Float64frombits(atomic.LoadUint64(v.valptr)))
if err == nil {
ch <- cm
}
}
}
func (pe *prometheusExporter) loop(ctx context.Context) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
var collector *prometheusCollector
defer func() {
if collector != nil {
prometheus.Unregister(collector)
}
}()
for {
select {
case <-ctx.Done():
return
case req := <-pe.writerChan:
if m := collector.metrics[req.key]; m != nil {
atomic.StoreUint64(m.valptr, math.Float64bits(req.val))
}
case nm := <-pe.registerChan:
var nextmetrics map[string]*prometheusMetricDesc
if collector != nil {
nextmetrics = collector.metrics
prometheus.Unregister(collector)
nextmetrics[nm.key] = nm
} else {
nextmetrics = map[string]*prometheusMetricDesc{
nm.key: nm,
}
}
nextcollector := &prometheusCollector{
metrics: nextmetrics,
}
if err := prometheus.Register(nextcollector); err != nil {
logger.Error("prometheus register err :", *nm, err)
} else {
collector = nextcollector
}
}
}
}
func NewPrometheusExport(namespace string) Exporter {
ctx, cancel := context.WithCancel(context.Background())
exp := &prometheusExporter{
registerChan: make(chan *prometheusMetricDesc, 10),
writerChan: make(chan *writeRequest, 100),
namespace: namespace,
cancel: cancel,
}
go exp.loop(ctx)
return exp
}

38
misc.go
View File

@ -103,41 +103,3 @@ func SerializeInterface(w io.Writer, val interface{}) (err error) {
return
}
func ShrinkSliceAt[T any](in []T, from int) []T {
if len(in) == 0 {
return in
}
cursor := from
for i := from + 1; i < len(in); i++ {
in[cursor] = in[i]
cursor++
}
return in[:len(in)-1]
}
func ShrinkSlice[T any](in []T, compare func(elem T) bool) []T {
if len(in) == 0 {
return in
}
cursor := 0
for i := 0; i < len(in); i++ {
if compare(in[i]) {
continue
}
in[cursor] = in[i]
cursor++
}
return in[:cursor]
}
func FindOneInSlice[T any](in []T, compare func(elem *T) bool) (int, bool) {
for i, e := range in {
if compare(&e) {
return i, true
}
}
return -1, false
}

View File

@ -15,7 +15,6 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
)
type MongoClient struct {
@ -61,19 +60,8 @@ func (ci *ConnectionInfo) SetDatabase(dbname string) *ConnectionInfo {
return ci
}
var errNoDatabaseNameInMongoUri = errors.New("mongo uri has no database name")
func NewMongoClient(ctx context.Context, url string) (MongoClient, error) {
connstr, err := connstring.ParseAndValidate(url)
if err != nil {
return MongoClient{}, err
}
if len(connstr.Database) == 0 {
return MongoClient{}, errNoDatabaseNameInMongoUri
}
return newMongoClient(ctx, NewMongoConnectionInfo(url, connstr.Database))
func NewMongoClient(ctx context.Context, url string, dbname string) (MongoClient, error) {
return newMongoClient(ctx, NewMongoConnectionInfo(url, dbname))
}
func newMongoClient(ctx context.Context, ci *ConnectionInfo) (MongoClient, error) {
@ -106,21 +94,21 @@ func newMongoClient(ctx context.Context, ci *ConnectionInfo) (MongoClient, error
return MongoClient{}, err
}
// go func() {
// for {
// if err := client.Ping(ctx, nil); err != nil {
// logger.Error("mongo client ping err :", err)
// }
go func() {
for {
if err := client.Ping(ctx, nil); err != nil {
logger.Error("mongo client ping err :", err)
}
// select {
// case <-time.After(10 * time.Second):
// continue
select {
case <-time.After(10 * time.Second):
continue
// case <-ctx.Done():
// return
// }
// }
// }()
case <-ctx.Done():
return
}
}
}()
mdb := client.Database(ci.Database, nil)
return MongoClient{c: client, db: mdb}, nil
@ -136,18 +124,6 @@ func (mc MongoClient) Close() {
}
}
func (mc MongoClient) DropIndex(coll CollectionName, name string) error {
matchcoll := mc.Collection(coll)
_, err := matchcoll.Indexes().DropOne(context.Background(), name)
if commanderr, ok := err.(mongo.CommandError); ok {
if commanderr.Code == 27 {
// 인덱스가 없는 것이므로 그냥 성공
return nil
}
}
return err
}
func (mc MongoClient) Watch(coll CollectionName, pipeline mongo.Pipeline, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) {
if len(opts) == 0 {
opts = []*options.ChangeStreamOptions{options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(0)}
@ -456,7 +432,7 @@ IndexSearchLabel:
return err
}
func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error {
func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[string]bson.D, option *options.IndexOptions) error {
collection := mc.Collection(coll)
cursor, err := collection.Indexes().List(context.Background(), options.ListIndexes().SetMaxTime(time.Second))
if err != nil {
@ -484,12 +460,12 @@ func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[str
if len(v) == 1 {
mod = mongo.IndexModel{
Keys: primitive.M{v[0].Key: v[0].Value},
Options: options.MergeIndexOptions(append(opts, options.Index().SetName(name))...),
Options: options.MergeIndexOptions(options.Index().SetName(name), option),
}
} else {
mod = mongo.IndexModel{
Keys: indices[name],
Options: options.MergeIndexOptions(append(opts, options.Index().SetName(name))...),
Options: options.MergeIndexOptions(options.Index().SetName(name), option),
}
}
@ -502,10 +478,10 @@ func (mc MongoClient) makeIndicesWithOption(coll CollectionName, indices map[str
return nil
}
func (mc MongoClient) MakeUniqueIndices(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error {
return mc.makeIndicesWithOption(coll, indices, append(opts, options.Index().SetUnique(true))...)
func (mc MongoClient) MakeUniqueIndices(coll CollectionName, indices map[string]bson.D) error {
return mc.makeIndicesWithOption(coll, indices, options.Index().SetUnique(true))
}
func (mc MongoClient) MakeIndices(coll CollectionName, indices map[string]bson.D, opts ...*options.IndexOptions) error {
return mc.makeIndicesWithOption(coll, indices, opts...)
func (mc MongoClient) MakeIndices(coll CollectionName, indices map[string]bson.D) error {
return mc.makeIndicesWithOption(coll, indices, options.Index())
}

439
redis.go
View File

@ -2,11 +2,9 @@ package gocommon
import (
"context"
"encoding/json"
"fmt"
"net/url"
"os"
"strconv"
"strings"
"github.com/go-redis/redis/v8"
)
@ -21,30 +19,23 @@ func newRedisClient(uri string, dbidxoffset int) *redis.Client {
return redis.NewClient(option)
}
func NewRedisClient(uri string) (*redis.Client, error) {
func NewRedisClient(uri string, dbidx int) (*redis.Client, error) {
if !*devflag {
return newRedisClient(uri, 0), nil
return newRedisClient(uri, dbidx), nil
}
option, err := redis.ParseURL(uri)
if err != nil {
return nil, err
}
zero := *option
zero.DB = 0
rdb := redis.NewClient(&zero)
defer rdb.Close()
rdb := newRedisClient(uri, 0)
devUrl, _ := url.Parse(uri)
hostname, _ := os.Hostname()
myidx, _ := rdb.HGet(context.Background(), "private_db", hostname).Result()
if len(myidx) > 0 {
offset, _ := strconv.Atoi(myidx)
option.DB += offset
return redis.NewClient(option), nil
devUrl.Path = "/" + myidx
return newRedisClient(devUrl.String(), dbidx), nil
}
alldbs, err := rdb.HGetAll(context.Background(), "private_db").Result()
if err != nil {
rdb.Close()
return nil, err
}
@ -62,416 +53,6 @@ func NewRedisClient(uri string) (*redis.Client, error) {
return nil, err
}
option.DB += newidx
return redis.NewClient(option), nil
}
type RedisonSetOption = string
type RedisonGetOption = [2]any
const (
// JSONSET command Options
RedisonSetOptionNX RedisonSetOption = "NX"
RedisonSetOptionXX RedisonSetOption = "XX"
)
var (
RedisonGetOptionSPACE = RedisonGetOption{"SPACE", " "}
RedisonGetOptionINDENT = RedisonGetOption{"INDENT", "\t"}
RedisonGetOptionNEWLINE = RedisonGetOption{"NEWLINE", "\n"}
RedisonGetOptionNOESCAPE = RedisonGetOption{"NOESCAPE", ""}
)
// gocommon으로 옮길 거
type RedisonHandler struct {
*redis.Client
ctx context.Context
}
func NewRedisonHandler(ctx context.Context, redisClient *redis.Client) *RedisonHandler {
return &RedisonHandler{
Client: redisClient,
ctx: ctx,
}
}
func respToArray[T any](resp any, err error) ([]T, error) {
if err != nil {
if err == redis.Nil {
return nil, nil
}
return nil, err
}
resArr := resp.([]any)
v := make([]T, len(resArr))
for i, e := range resArr {
v[i] = e.(T)
}
return v, nil
}
func appendArgs[T any](args []any, ext ...T) []any {
for _, e := range ext {
args = append(args, e)
}
return args
}
func (rh *RedisonHandler) JSONMSetRel(key string, prefixPath string, kv map[string]any) error {
if len(prefixPath) > 0 && !strings.HasSuffix(prefixPath, ".") {
prefixPath += "."
}
pl := rh.Pipeline()
for path, obj := range kv {
b, err := json.Marshal(obj)
if err != nil {
return err
}
pl.Do(rh.ctx, "JSON.SET", key, prefixPath+path, b)
}
cmders, err := pl.Exec(rh.ctx)
if err != nil {
return err
}
for _, cmder := range cmders {
if cmder.Err() != nil {
return cmder.Err()
}
}
return nil
}
func (rh *RedisonHandler) JSONMSet(key string, kv map[string]any) error {
return rh.JSONMSetRel(key, "", kv)
}
func (rh *RedisonHandler) jsonSetMergeJSONSet(cmd, key, path string, obj any, opts ...RedisonSetOption) (bool, error) {
b, err := json.Marshal(obj)
if err != nil {
return false, err
}
args := []any{
"JSON.SET",
key,
path,
b,
}
if len(opts) > 0 {
args = append(args, opts[0])
}
res, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return false, err
}
return res.(string) == "OK", nil
}
func (rh *RedisonHandler) JSONSet(key, path string, obj any, opts ...RedisonSetOption) (bool, error) {
return rh.jsonSetMergeJSONSet("JSON.SET", key, path, obj, opts...)
}
func (rh *RedisonHandler) JSONMerge(key, path string, obj any, opts ...RedisonSetOption) (bool, error) {
return rh.jsonSetMergeJSONSet("JSON.MERGE", key, path, obj, opts...)
}
func (rh *RedisonHandler) JSONGet(key, path string, opts ...RedisonGetOption) (res any, err error) {
args := appendArgs[string]([]any{
"JSON.GET",
key,
}, strings.Split(path, " ")...)
for _, opt := range opts {
args = append(args, opt[:]...)
}
return rh.Do(rh.ctx, args...).Result()
}
func (rh *RedisonHandler) JSONGetString(key, path string) ([]string, error) {
return respToArray[string](rh.JSONResp(key, path))
}
func (rh *RedisonHandler) JSONGetDocuments(key, path string) ([]map[string]any, error) {
resp, err := rh.JSONGet(key, path)
if err != nil {
if err == redis.Nil {
return nil, nil
}
return nil, err
}
var objs []map[string]any
err = json.Unmarshal([]byte(resp.(string)), &objs)
return objs, err
}
func (rh *RedisonHandler) JSONGetInt64(key, path string) ([]int64, error) {
return respToArray[int64](rh.JSONResp(key, path))
}
func (rh *RedisonHandler) JSONMGet(path string, keys ...string) (res any, err error) {
args := appendArgs[string]([]any{
"JSON.MGET",
path,
}, keys...)
return rh.Do(rh.ctx, args...).Result()
}
func (rh *RedisonHandler) JSONMDel(key string, paths []string) error {
pl := rh.Pipeline()
for _, path := range paths {
args := []any{
"JSON.DEL",
key,
path,
}
pl.Do(rh.ctx, args...)
}
_, err := pl.Exec(rh.ctx)
return err
}
func (rh *RedisonHandler) JSONDel(key, path string) (int64, error) {
args := []any{
"JSON.DEL",
key,
path,
}
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return 0, err
}
return resp.(int64), nil
}
func (rh *RedisonHandler) JSONType(key, path string) ([]string, error) {
args := []any{
"JSON.TYPE",
key,
path,
}
return respToArray[string](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONNumIncrBy(key, path string, number int) ([]int64, error) {
args := []any{
"JSON.NUMINCRBY",
key,
path,
number,
}
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return nil, err
}
var cnts []int64
err = json.Unmarshal([]byte(resp.(string)), &cnts)
return cnts, err
}
func (rh *RedisonHandler) JSONNumMultBy(key, path string, number int) (res any, err error) {
args := []any{
"JSON.NUMMULTBY",
key,
path,
number,
}
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return nil, err
}
return resp.([]any), nil
}
func (rh *RedisonHandler) JSONStrAppend(key, path string, jsonstring string) ([]int64, error) {
args := []any{
"JSON.STRAPPEND",
key,
path,
fmt.Sprintf(`'"%s"'`, jsonstring),
}
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONStrLen(key, path string) (res []int64, err error) {
args := []any{
"JSON.STRLEN",
key,
path,
}
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONArrAppend(key, path string, values ...any) (int64, error) {
args := appendValues([]any{
"JSON.ARRAPPEND",
key,
path,
}, values...)
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return 0, err
}
return resp.(int64), nil
}
func (rh *RedisonHandler) JSONArrLen(key, path string) ([]int64, error) {
args := []any{
"JSON.ARRLEN",
key,
path,
}
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONArrPop(key, path string, index int) (res any, err error) {
args := []any{
"JSON.ARRPOP",
key,
path,
index,
}
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return nil, err
}
return resp.([]any)[0], nil
}
func appendValues(args []any, values ...any) []any {
for _, jsonValue := range values {
switch jsonValue := jsonValue.(type) {
case string:
args = append(args, fmt.Sprintf(`'"%s"'`, jsonValue))
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
args = append(args, jsonValue)
default:
bt, _ := json.Marshal(jsonValue)
args = append(args, bt)
}
}
return args
}
func (rh *RedisonHandler) JSONArrIndex(key, path string, jsonValue any, optionalRange ...int) ([]int64, error) {
args := appendValues([]any{
"JSON.ARRINDEX",
key,
path,
}, jsonValue)
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONArrTrim(key, path string, start, end int) (res any, err error) {
args := []any{
"JSON.ARRTRIM",
key,
path,
start,
end,
}
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONArrInsert(key, path string, index int, values ...any) (res any, err error) {
args := appendValues([]any{
"JSON.ARRINSERT",
key,
path,
index,
}, values...)
return respToArray[int64](rh.Do(rh.ctx, args...).Result())
}
func (rh *RedisonHandler) JSONObjKeys(key, path string) ([]string, error) {
args := []any{
"JSON.OBJKEYS",
key,
path,
}
res, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return nil, err
}
resArr := res.([]any)
if len(resArr) == 0 {
return nil, nil
}
resArr = resArr[0].([]any)
slc := make([]string, len(resArr))
for i, r := range resArr {
slc[i] = r.(string)
}
return slc, nil
}
func (rh *RedisonHandler) JSONObjLen(key, path string) ([]int64, error) {
args := []any{
"JSON.OBJLEN",
key,
}
if path != "$" {
args = append(args, path)
}
resp, err := rh.Do(rh.ctx, args...).Result()
if err != nil {
return nil, err
}
switch resp := resp.(type) {
case []any:
return respToArray[int64](resp, nil)
case int64:
return []int64{resp}, nil
}
return []int64{0}, nil
}
func (rh *RedisonHandler) JSONDebug(key, path string) (res any, err error) {
args := []any{
"JSON.DEBUG",
"MEMORY",
key,
path,
}
return rh.Do(rh.ctx, args...).Result()
}
func (rh *RedisonHandler) JSONForget(key, path string) (int64, error) {
return rh.JSONDel(key, path)
}
func (rh *RedisonHandler) JSONResp(key, path string) (res any, err error) {
args := []any{
"JSON.RESP",
key,
path,
}
return rh.Do(rh.ctx, args...).Result()
devUrl.Path = "/" + strconv.Itoa(newidx)
return newRedisClient(devUrl.String(), dbidx), nil
}

View File

@ -16,11 +16,6 @@ func configFilePath() string {
configfilepath = *configfileflag
}
// if !strings.HasPrefix(configfilepath, "/") {
// exe, _ := os.Executable()
// configfilepath = path.Join(path.Dir(exe), configfilepath)
// }
return configfilepath
}
@ -70,7 +65,10 @@ func LoadConfig[T any](outptr *T) error {
type StorageAddr struct {
Mongo string
Redis map[string]string
Redis struct {
URL string
Offset map[string]int
}
}
type RegionStorageConfig struct {

329
server.go
View File

@ -1,9 +1,7 @@
package gocommon
import (
"bytes"
"context"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
@ -22,7 +20,6 @@ import (
"sync/atomic"
"syscall"
"time"
"unsafe"
"repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger"
@ -31,15 +28,6 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
)
func init() {
gob.Register(map[string]any{})
gob.Register(primitive.A{})
gob.Register(primitive.M{})
gob.Register(primitive.D{})
gob.Register(primitive.ObjectID{})
gob.Register([]any{})
}
const (
// HTTPStatusReloginRequired : http status를 이걸 받으면 클라이언트는 로그아웃하고 로그인 화면으로 돌아가야 한다.
HTTPStatusReloginRequired = 599
@ -58,14 +46,6 @@ const (
ShutdownFlagIdle = ShutdownFlag(3)
)
type ErrorWithStatus struct {
StatusCode int
}
func (e ErrorWithStatus) Error() string {
return fmt.Sprintf("%d", e.StatusCode)
}
type RpcReturnTypeInterface interface {
Value() any
Error() error
@ -117,18 +97,24 @@ func NewHTTPServerWithPort(serveMux *http.ServeMux, port int) *Server {
addr := fmt.Sprintf(":%d", port)
serveMux.HandleFunc(MakeHttpHandlerPattern("welcome"), welcomeHandler)
serveMux.HandleFunc(MakeHttpHandlerPattern("lb_health_chceck"), healthCheckHandler)
serveMux.HandleFunc(MakeHttpHandlerPattern("lb_health_check"), healthCheckHandler)
server := &Server{
httpserver: &http.Server{Addr: addr, Handler: serveMux},
interrupt: make(chan os.Signal, 1),
}
server.httpserver.SetKeepAlivesEnabled(true)
signal.Notify(server.interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
c := <-server.interrupt
logger.Println("interrupt!!!!!!!! :", c.String())
server.shutdown()
}()
return server
}
func NewHTTPServer(serveMux *http.ServeMux) *Server {
// 시작시 자동으로 enable됨
if len(*tls) > 0 && *portptr == 80 {
*portptr = 443
@ -146,7 +132,8 @@ func (server *Server) shutdown() {
logger.Println("http server shutdown. healthcheckcounter :", t)
atomic.StoreInt64(&healthcheckcounter, math.MinInt64)
for cnt := 0; cnt < 100; {
timer := 600 // 0.1 * 600 = 1분
for cnt := 0; cnt < 100 && timer > 0; {
next := atomic.LoadInt64(&healthcheckcounter)
if next == t {
cnt++
@ -155,6 +142,7 @@ func (server *Server) shutdown() {
cnt = 0
}
time.Sleep(100 * time.Millisecond)
timer--
}
logger.Println("http server shutdown. healthcheck completed")
} else {
@ -167,14 +155,6 @@ func (server *Server) shutdown() {
}
}
func (server *Server) Stop() {
if server.interrupt != nil {
server.interrupt <- os.Interrupt
} else {
server.shutdown()
}
}
// Start :
func (server *Server) Start() error {
if server.httpserver != nil {
@ -182,15 +162,6 @@ func (server *Server) Start() error {
if r != nil {
return r
}
server.interrupt = make(chan os.Signal, 1)
signal.Notify(server.interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
c := <-server.interrupt
logger.Println("interrupt!!!!!!!! :", c.String())
server.shutdown()
}()
proxyListener := &proxyproto.Listener{
Listener: ln,
ReadHeaderTimeout: 10 * time.Second,
@ -450,46 +421,6 @@ func ReadStringFormValue(r url.Values, key string) (string, bool) {
return strval, len(strval) > 0
}
type encoder interface {
Encode(any) error
}
type nilEncoder struct{}
func (ne *nilEncoder) Encode(any) error { return nil }
type decoder interface {
Decode(any) error
}
type nilDecoder struct{}
func (nd *nilDecoder) Decode(any) error { return nil }
func MakeDecoder(r *http.Request) decoder {
ct := r.Header.Get("Content-Type")
if ct == "application/gob" {
return gob.NewDecoder(r.Body)
} else if ct == "application/json" {
return json.NewDecoder(r.Body)
}
logger.Error("Content-Type is not supported :", ct)
return &nilDecoder{}
}
func MakeEncoder(w http.ResponseWriter, r *http.Request) encoder {
ct := r.Header.Get("Content-Type")
if ct == "application/gob" {
return gob.NewEncoder(w)
} else if ct == "application/json" {
return json.NewEncoder(w)
}
logger.Error("Content-Type is not supported :", ct)
return &nilEncoder{}
}
func DotStringToTimestamp(tv string) primitive.Timestamp {
if len(tv) == 0 {
return primitive.Timestamp{T: 0, I: 0}
@ -576,11 +507,7 @@ func (rt *RpcReturnError) WithCode(code int) *RpcReturnError {
}
func (rt *RpcReturnError) WithError(err error) *RpcReturnError {
if err2, ok := err.(*ErrorWithStatus); ok {
rt.WithCode(err2.StatusCode)
} else {
rt.err = err
}
rt.err = err
return rt
}
@ -610,14 +537,6 @@ func MakeRPCReturn(value interface{}) *RpcReturnSimple {
}
}
func MakeRPCFail() *RpcReturnError {
return &RpcReturnError{
err: nil,
code: http.StatusInternalServerError,
h: nil,
}
}
func MakeRPCError() *RpcReturnError {
pc, _, _, ok := runtime.Caller(1)
if ok {
@ -680,227 +599,3 @@ func MakeHttpRequestForLogging(r *http.Request) *http.Request {
r.Body = ib
return r
}
type apiFuncType func(http.ResponseWriter, *http.Request)
type HttpApiHandler struct {
methods map[string]apiFuncType
originalReceiverName string
}
func MakeHttpApiHandler[T any](receiver *T, receiverName string) HttpApiHandler {
methods := make(map[string]apiFuncType)
tp := reflect.TypeOf(receiver)
if len(receiverName) == 0 {
receiverName = tp.Elem().Name()
}
writerType := reflect.TypeOf((*http.ResponseWriter)(nil)).Elem()
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
if method.Type.NumIn() != 3 {
continue
}
if method.Type.In(0) != tp {
continue
}
if !method.Type.In(1).Implements(writerType) {
continue
}
var r http.Request
if method.Type.In(2) != reflect.TypeOf(&r) {
continue
}
if method.Name == "ServeHTTP" {
continue
}
funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1)
testfunc := (*func(*T, http.ResponseWriter, *http.Request))(p2)
methods[receiverName+"."+method.Name] = func(w http.ResponseWriter, r *http.Request) {
(*testfunc)(receiver, w, r)
}
}
return HttpApiHandler{
methods: methods,
originalReceiverName: tp.Elem().Name(),
}
}
type HttpApiBroker struct {
methods map[string]apiFuncType
methods_dup map[string][]apiFuncType
}
type bufferReadCloser struct {
*bytes.Reader
}
func (buff *bufferReadCloser) Close() error { return nil }
type readOnlyResponseWriter struct {
inner http.ResponseWriter
statusCode int
}
func (w *readOnlyResponseWriter) Header() http.Header {
return w.inner.Header()
}
func (w *readOnlyResponseWriter) Write(in []byte) (int, error) {
logger.Println("readOnlyResponseWriter cannot write")
return len(in), nil
}
func (w *readOnlyResponseWriter) WriteHeader(statusCode int) {
w.statusCode = statusCode
}
func (hc *HttpApiBroker) AddHandler(receiver HttpApiHandler) {
if hc.methods == nil {
hc.methods = make(map[string]apiFuncType)
hc.methods_dup = make(map[string][]apiFuncType)
}
for k, v := range receiver.methods {
ab := strings.Split(k, ".")
logger.Printf("http api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k)
hc.methods_dup[k] = append(hc.methods_dup[k], v)
if len(hc.methods_dup[k]) > 1 {
chain := hc.methods_dup[k]
hc.methods[k] = func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
defer r.Body.Close()
wrap := &readOnlyResponseWriter{inner: w, statusCode: 200}
for _, f := range chain {
r.Body = &bufferReadCloser{bytes.NewReader(body)}
f(wrap, r)
}
if wrap.statusCode != 200 {
w.WriteHeader(wrap.statusCode)
}
}
} else {
hc.methods[k] = v
}
}
}
func (hc *HttpApiBroker) Call(w http.ResponseWriter, r *http.Request) {
funcname := r.URL.Query().Get("call")
if len(funcname) == 0 {
logger.Println("query param 'call' is missing")
w.WriteHeader(http.StatusBadRequest)
return
}
hc.call(funcname, w, r)
}
func (hc *HttpApiBroker) call(funcname string, w http.ResponseWriter, r *http.Request) {
if found := hc.methods[funcname]; found != nil {
found(w, r)
} else {
logger.Println("api is not found :", funcname)
}
}
func CallInternalServiceAPI[T any](url string, apitoken string, method string, data T, headers ...string) error {
tempHeader := make(http.Header)
tempHeader.Set("MG-X-API-TOKEN", apitoken)
tempHeader.Set("Content-Type", "application/gob")
for i := 1; i < len(headers); i += 2 {
tempHeader.Set(headers[i-1], headers[i])
}
buff := new(bytes.Buffer)
ct := tempHeader.Get("Content-Type")
if ct == "application/gob" {
enc := gob.NewEncoder(buff)
enc.Encode(data)
} else if ct == "application/json" {
enc := json.NewEncoder(buff)
enc.Encode(data)
}
reqURL := fmt.Sprintf("%s/api?call=%s", url, method)
req, err := http.NewRequest("POST", reqURL, buff)
if err != nil {
return err
}
req.Header = tempHeader
resp, err := http.DefaultClient.Do(req)
defer func() {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}()
if err == nil {
if resp.StatusCode != http.StatusOK {
return &ErrorWithStatus{StatusCode: resp.StatusCode}
}
}
return err
}
func CallInternalServiceAPIAs[Tin any, Tout any](url string, apitoken string, method string, data Tin, out *Tout, headers ...string) error {
tempHeader := make(http.Header)
tempHeader.Set("MG-X-API-TOKEN", apitoken)
tempHeader.Set("Content-Type", "application/gob")
for i := 1; i < len(headers); i += 2 {
tempHeader.Set(headers[i-1], headers[i])
}
buff := new(bytes.Buffer)
ct := tempHeader.Get("Content-Type")
if ct == "application/gob" {
enc := gob.NewEncoder(buff)
enc.Encode(data)
} else if ct == "application/json" {
enc := json.NewEncoder(buff)
enc.Encode(data)
}
reqURL := fmt.Sprintf("%s/api?call=%s", url, method)
req, err := http.NewRequest("POST", reqURL, buff)
if err != nil {
return err
}
req.Header = tempHeader
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return &ErrorWithStatus{StatusCode: resp.StatusCode}
}
defer func() {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}()
if out != nil && resp.Body != nil {
dec := gob.NewDecoder(resp.Body)
return dec.Decode(out)
}
return nil
}

View File

@ -1,133 +0,0 @@
package session
import (
"context"
"encoding/binary"
"encoding/hex"
"errors"
"math/rand"
"strings"
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type Authorization struct {
Account primitive.ObjectID `bson:"a" json:"a"`
// by authorization provider
Platform string `bson:"p" json:"p"`
Uid string `bson:"u" json:"u"`
Email string `bson:"em" json:"em"`
}
type Provider interface {
New(*Authorization) (string, error)
Delete(primitive.ObjectID) error
Query(string) (Authorization, error)
Touch(string) (bool, error)
}
type Consumer interface {
Query(string) (Authorization, error)
Touch(string) (Authorization, error)
RegisterOnSessionInvalidated(func(primitive.ObjectID))
}
type storagekey string
type publickey string
var r = rand.New(rand.NewSource(time.Now().UnixNano()))
func make_storagekey(acc primitive.ObjectID) storagekey {
bs := [4]byte{}
binary.LittleEndian.PutUint32(bs[:], r.Uint32())
return storagekey(acc.Hex() + hex.EncodeToString(bs[2:]))
}
func storagekey_to_publickey(sk storagekey) publickey {
bs, _ := hex.DecodeString(string(sk))
acc := bs[:12]
cs := bs[12:]
encoded := [14]byte{}
for i, v := range acc[:] {
encoded[i] = (v ^ cs[0]) ^ cs[1]
}
encoded[12] = cs[0]
encoded[13] = cs[1]
return publickey(hex.EncodeToString(encoded[:]))
}
func publickey_to_storagekey(pk publickey) storagekey {
bs, _ := hex.DecodeString(string(pk))
acc := bs[:12]
cs := bs[12:]
decoded := [14]byte{}
for i, v := range acc[:] {
decoded[i] = (v ^ cs[1]) ^ cs[0]
}
decoded[12] = cs[0]
decoded[13] = cs[1]
return storagekey(hex.EncodeToString(decoded[:]))
}
type SessionConfig struct {
SessionTTL int64 `json:"session_ttl"`
SessionStorage string `json:"session_storage"`
}
var errInvalidScheme = errors.New("storageAddr is not valid scheme")
var errSessionStorageMissing = errors.New("session_storageis missing")
func NewConsumer(ctx context.Context, storageAddr string, ttl time.Duration) (Consumer, error) {
if strings.HasPrefix(storageAddr, "mongodb") {
return newConsumerWithMongo(ctx, storageAddr, ttl)
}
if strings.HasPrefix(storageAddr, "redis") {
return newConsumerWithRedis(ctx, storageAddr, ttl)
}
return nil, errInvalidScheme
}
func NewConsumerWithConfig(ctx context.Context, cfg SessionConfig) (Consumer, error) {
if len(cfg.SessionStorage) == 0 {
return nil, errSessionStorageMissing
}
if cfg.SessionTTL == 0 {
cfg.SessionTTL = 3600
}
return NewConsumer(ctx, cfg.SessionStorage, time.Duration(cfg.SessionTTL)*time.Second)
}
func NewProvider(ctx context.Context, storageAddr string, ttl time.Duration) (Provider, error) {
if strings.HasPrefix(storageAddr, "mongodb") {
return newProviderWithMongo(ctx, storageAddr, ttl)
}
if strings.HasPrefix(storageAddr, "redis") {
return newProviderWithRedis(ctx, storageAddr, ttl)
}
return nil, errInvalidScheme
}
func NewProviderWithConfig(ctx context.Context, cfg SessionConfig) (Provider, error) {
if len(cfg.SessionStorage) == 0 {
return nil, errSessionStorageMissing
}
if cfg.SessionTTL == 0 {
cfg.SessionTTL = 3600
}
return NewProvider(ctx, cfg.SessionStorage, time.Duration(cfg.SessionTTL)*time.Second)
}

View File

@ -1,66 +0,0 @@
package session
import (
"context"
"sync"
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type cache_stage[T any] struct {
cache map[storagekey]T
deleted map[storagekey]bool
}
func make_cache_stage[T any]() *cache_stage[T] {
return &cache_stage[T]{
cache: make(map[storagekey]T),
deleted: make(map[storagekey]bool),
}
}
type consumer_common[T any] struct {
lock sync.Mutex
ttl time.Duration
ctx context.Context
stages [2]*cache_stage[T]
startTime time.Time
onSessionInvalidated []func(primitive.ObjectID)
}
func (c *consumer_common[T]) add_internal(sk storagekey, si T) {
c.stages[0].cache[sk] = si
delete(c.stages[0].deleted, sk)
c.stages[1].cache[sk] = si
delete(c.stages[1].deleted, sk)
}
func (c *consumer_common[T]) delete_internal(sk storagekey) (old T) {
if v, ok := c.stages[0].cache[sk]; ok {
old = v
delete(c.stages[0].cache, sk)
delete(c.stages[1].cache, sk)
} else if v, ok = c.stages[1].cache[sk]; ok {
old = v
delete(c.stages[1].cache, sk)
}
c.stages[0].deleted[sk] = true
c.stages[1].deleted[sk] = true
return
}
func (c *consumer_common[T]) delete(sk storagekey) T {
c.lock.Lock()
defer c.lock.Unlock()
return c.delete_internal(sk)
}
func (c *consumer_common[T]) changeStage() {
c.lock.Lock()
defer c.lock.Unlock()
c.stages[1] = c.stages[0]
c.stages[0] = make_cache_stage[T]()
}

View File

@ -1,362 +0,0 @@
package session
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
)
const (
session_collection_name = gocommon.CollectionName("session")
)
type provider_mongo struct {
mongoClient gocommon.MongoClient
}
type sessionMongo struct {
Id primitive.ObjectID `bson:"_id,omitempty"`
Auth *Authorization `bson:"auth"`
Key storagekey `bson:"key"`
Ts primitive.DateTime `bson:"_ts"`
}
func newProviderWithMongo(ctx context.Context, mongoUrl string, ttl time.Duration) (Provider, error) {
mc, err := gocommon.NewMongoClient(ctx, mongoUrl)
if err != nil {
return nil, err
}
if err = mc.MakeUniqueIndices(session_collection_name, map[string]bson.D{
"key": {{Key: "key", Value: 1}},
}); err != nil {
return nil, err
}
if err := mc.MakeExpireIndex(session_collection_name, int32(ttl.Seconds())); err != nil {
return nil, err
}
return &provider_mongo{
mongoClient: mc,
}, nil
}
func (p *provider_mongo) New(input *Authorization) (string, error) {
sk := make_storagekey(input.Account)
_, _, err := p.mongoClient.Update(session_collection_name, bson.M{
"_id": input.Account,
}, bson.M{
"$set": sessionMongo{
Auth: input,
Key: sk,
Ts: primitive.NewDateTimeFromTime(time.Now().UTC()),
},
}, options.Update().SetUpsert(true))
return string(storagekey_to_publickey(sk)), err
}
func (p *provider_mongo) Delete(acc primitive.ObjectID) error {
_, err := p.mongoClient.Delete(session_collection_name, bson.M{
"_id": acc,
})
return err
}
func (p *provider_mongo) Query(pk string) (Authorization, error) {
sk := publickey_to_storagekey(publickey(pk))
var auth Authorization
err := p.mongoClient.FindOneAs(session_collection_name, bson.M{
"key": sk,
}, &auth)
return auth, err
}
func (p *provider_mongo) Touch(pk string) (bool, error) {
sk := publickey_to_storagekey(publickey(pk))
worked, _, err := p.mongoClient.Update(session_collection_name, bson.M{
"key": sk,
}, bson.M{
"$currentDate": bson.M{
"_ts": bson.M{"$type": "date"},
},
}, options.Update().SetUpsert(false))
if err != nil {
logger.Println("provider Touch :", err)
return false, err
}
return worked, nil
}
type consumer_mongo struct {
consumer_common[*sessionMongo]
ids map[primitive.ObjectID]storagekey
mongoClient gocommon.MongoClient
ttl time.Duration
}
type sessionPipelineDocument struct {
OperationType string `bson:"operationType"`
DocumentKey struct {
Id primitive.ObjectID `bson:"_id"`
} `bson:"documentKey"`
Session *sessionMongo `bson:"fullDocument"`
}
func newConsumerWithMongo(ctx context.Context, mongoUrl string, ttl time.Duration) (Consumer, error) {
mc, err := gocommon.NewMongoClient(ctx, mongoUrl)
if err != nil {
return nil, err
}
consumer := &consumer_mongo{
consumer_common: consumer_common[*sessionMongo]{
ttl: ttl,
ctx: ctx,
stages: [2]*cache_stage[*sessionMongo]{make_cache_stage[*sessionMongo](), make_cache_stage[*sessionMongo]()},
startTime: time.Now(),
},
ids: make(map[primitive.ObjectID]storagekey),
ttl: ttl,
mongoClient: mc,
}
go func() {
matchStage := bson.D{
{
Key: "$match", Value: bson.D{
{Key: "operationType", Value: bson.D{
{Key: "$in", Value: bson.A{
"delete",
"insert",
"update",
}},
}},
},
}}
projectStage := bson.D{
{
Key: "$project", Value: bson.D{
{Key: "documentKey", Value: 1},
{Key: "operationType", Value: 1},
{Key: "fullDocument", Value: 1},
},
},
}
var stream *mongo.ChangeStream
nextswitch := time.Now().Add(ttl)
for {
if stream == nil {
stream, err = mc.Watch(session_collection_name, mongo.Pipeline{matchStage, projectStage})
if err != nil {
logger.Error("watchAuthCollection watch failed :", err)
time.Sleep(time.Minute)
continue
}
}
changed := stream.TryNext(ctx)
if ctx.Err() != nil {
logger.Error("watchAuthCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error())
break
}
if changed {
var data sessionPipelineDocument
if err := stream.Decode(&data); err == nil {
ot := data.OperationType
switch ot {
case "insert":
consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session)
case "update":
if data.Session == nil {
if old := consumer.deleteById(data.DocumentKey.Id); old != nil {
for _, f := range consumer.onSessionInvalidated {
f(old.Auth.Account)
}
}
} else {
consumer.add(data.Session.Key, data.DocumentKey.Id, data.Session)
}
case "delete":
if old := consumer.deleteById(data.DocumentKey.Id); old != nil {
for _, f := range consumer.onSessionInvalidated {
f(old.Auth.Account)
}
}
}
} else {
logger.Error("watchAuthCollection stream.Decode failed :", err)
}
} else if stream.Err() != nil || stream.ID() == 0 {
select {
case <-ctx.Done():
logger.Println("watchAuthCollection is done")
stream.Close(ctx)
return
case <-time.After(time.Second):
logger.Error("watchAuthCollection stream error :", stream.Err())
stream.Close(ctx)
stream = nil
}
} else {
time.Sleep(time.Second)
}
now := time.Now()
for now.After(nextswitch) {
consumer.changeStage()
nextswitch = nextswitch.Add(ttl)
}
}
}()
return consumer, nil
}
func (c *consumer_mongo) query_internal(sk storagekey) (*sessionMongo, bool, error) {
if _, deleted := c.stages[0].deleted[sk]; deleted {
return nil, false, nil
}
if _, deleted := c.stages[1].deleted[sk]; deleted {
return nil, false, nil
}
found, ok := c.stages[0].cache[sk]
if !ok {
found, ok = c.stages[1].cache[sk]
}
if ok {
return found, false, nil
}
var si sessionMongo
err := c.mongoClient.FindOneAs(session_collection_name, bson.M{
"key": sk,
}, &si)
if err != nil {
logger.Println("consumer Query :", err)
return nil, false, err
}
if len(si.Key) > 0 {
siptr := &si
c.add_internal(sk, siptr)
return siptr, true, nil
}
return nil, false, nil
}
func (c *consumer_mongo) Query(pk string) (Authorization, error) {
c.lock.Lock()
defer c.lock.Unlock()
sk := publickey_to_storagekey(publickey(pk))
si, _, err := c.query_internal(sk)
if err != nil {
return Authorization{}, err
}
if si == nil {
return Authorization{}, nil
}
if time.Now().After(si.Ts.Time().Add(c.ttl)) {
return Authorization{}, nil
}
return *si.Auth, nil
}
func (c *consumer_mongo) Touch(pk string) (Authorization, error) {
c.lock.Lock()
defer c.lock.Unlock()
sk := publickey_to_storagekey(publickey(pk))
worked, _, err := c.mongoClient.Update(session_collection_name, bson.M{
"key": sk,
}, bson.M{
"$currentDate": bson.M{
"_ts": bson.M{"$type": "date"},
},
}, options.Update().SetUpsert(false))
if err != nil {
logger.Println("consumer Touch :", err)
return Authorization{}, err
}
if !worked {
// 이미 만료되서 사라짐
return Authorization{}, nil
}
si, added, err := c.query_internal(sk)
if err != nil {
return Authorization{}, err
}
if si == nil {
return Authorization{}, nil
}
if !added {
var doc sessionMongo
err := c.mongoClient.FindOneAs(session_collection_name, bson.M{
"key": sk,
}, &doc)
if err != nil {
logger.Println("consumer Query :", err)
return Authorization{}, err
}
if len(si.Key) > 0 {
c.add_internal(sk, &doc)
c.ids[doc.Id] = sk
return *doc.Auth, nil
}
}
return *si.Auth, nil
}
func (c *consumer_mongo) add(sk storagekey, id primitive.ObjectID, si *sessionMongo) {
c.lock.Lock()
defer c.lock.Unlock()
c.consumer_common.add_internal(sk, si)
c.ids[id] = sk
}
func (c *consumer_mongo) deleteById(id primitive.ObjectID) (old *sessionMongo) {
c.lock.Lock()
defer c.lock.Unlock()
if sk, ok := c.ids[id]; ok {
old = c.consumer_common.delete_internal(sk)
delete(c.ids, id)
}
return
}
func (c *consumer_mongo) RegisterOnSessionInvalidated(cb func(primitive.ObjectID)) {
c.onSessionInvalidated = append(c.onSessionInvalidated, cb)
}

View File

@ -1,306 +0,0 @@
package session
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
)
const (
communication_channel_name_prefix = "_sess_comm_chan_name"
)
type sessionRedis struct {
*Authorization
expireAt time.Time
}
type provider_redis struct {
redisClient *redis.Client
deleteChannel string
ttl time.Duration
ctx context.Context
}
func newProviderWithRedis(ctx context.Context, redisUrl string, ttl time.Duration) (Provider, error) {
redisClient, err := gocommon.NewRedisClient(redisUrl)
if err != nil {
return nil, err
}
return &provider_redis{
redisClient: redisClient,
deleteChannel: fmt.Sprintf("%s_%d_d", communication_channel_name_prefix, redisClient.Options().DB),
ttl: ttl,
ctx: ctx,
}, nil
}
func (p *provider_redis) New(input *Authorization) (string, error) {
bt, err := json.Marshal(input)
if err != nil {
return "", err
}
sk := make_storagekey(input.Account)
_, err = p.redisClient.SetEX(p.ctx, string(sk), bt, p.ttl).Result()
if err != nil {
return "", err
}
pk := storagekey_to_publickey(sk)
return string(pk), err
}
func (p *provider_redis) Delete(account primitive.ObjectID) error {
prefix := account.Hex()
sks, err := p.redisClient.Keys(p.ctx, prefix+"*").Result()
if err != nil {
logger.Println("session provider delete :", sks, err)
return err
}
for _, sk := range sks {
p.redisClient.Del(p.ctx, sk).Result()
p.redisClient.Publish(p.ctx, p.deleteChannel, sk).Result()
}
return nil
}
func (p *provider_redis) Query(pk string) (Authorization, error) {
sk := publickey_to_storagekey(publickey(pk))
payload, err := p.redisClient.Get(p.ctx, string(sk)).Result()
if err == redis.Nil {
logger.Println("session provider query :", pk, err)
return Authorization{}, nil
} else if err != nil {
logger.Println("session provider query :", pk, err)
return Authorization{}, err
}
var auth Authorization
if err := json.Unmarshal([]byte(payload), &auth); err != nil {
logger.Println("session provider query :", pk, err)
return Authorization{}, err
}
return auth, nil
}
func (p *provider_redis) Touch(pk string) (bool, error) {
sk := publickey_to_storagekey(publickey(pk))
ok, err := p.redisClient.Expire(p.ctx, string(sk), p.ttl).Result()
if err == redis.Nil {
// 이미 만료됨
logger.Println("session provider touch :", pk, err)
return false, nil
} else if err != nil {
logger.Println("session provider touch :", pk, err)
return false, err
}
return ok, nil
}
type consumer_redis struct {
consumer_common[*sessionRedis]
redisClient *redis.Client
}
func newConsumerWithRedis(ctx context.Context, redisUrl string, ttl time.Duration) (Consumer, error) {
redisClient, err := gocommon.NewRedisClient(redisUrl)
if err != nil {
return nil, err
}
consumer := &consumer_redis{
consumer_common: consumer_common[*sessionRedis]{
ttl: ttl,
ctx: ctx,
stages: [2]*cache_stage[*sessionRedis]{make_cache_stage[*sessionRedis](), make_cache_stage[*sessionRedis]()},
startTime: time.Now(),
},
redisClient: redisClient,
}
deleteChannel := fmt.Sprintf("%s_%d_d", communication_channel_name_prefix, redisClient.Options().DB)
sub := redisClient.Subscribe(ctx, deleteChannel)
go func() {
stageswitch := time.Now().Add(ttl)
tickTimer := time.After(ttl)
for {
select {
case <-ctx.Done():
return
case <-tickTimer:
consumer.changeStage()
stageswitch = stageswitch.Add(ttl)
tempttl := time.Until(stageswitch)
tickTimer = time.After(tempttl)
case msg := <-sub.Channel():
if msg == nil {
return
}
if len(msg.Payload) == 0 {
continue
}
switch msg.Channel {
case deleteChannel:
sk := storagekey(msg.Payload)
old := consumer.delete(sk)
if old != nil {
for _, f := range consumer.onSessionInvalidated {
f(old.Account)
}
}
}
}
}
}()
return consumer, nil
}
func (c *consumer_redis) query_internal(sk storagekey) (*sessionRedis, bool, error) {
if _, deleted := c.stages[0].deleted[sk]; deleted {
return nil, false, nil
}
if _, deleted := c.stages[1].deleted[sk]; deleted {
return nil, false, nil
}
found, ok := c.stages[0].cache[sk]
if !ok {
found, ok = c.stages[1].cache[sk]
}
if ok {
if time.Now().Before(found.expireAt) {
// 만료전 세션
return found, false, nil
}
// 다른 Consumer가 Touch했을 수도 있으므로 redis에서 읽어본다.
}
payload, err := c.redisClient.Get(c.ctx, string(sk)).Result()
if err != nil && err != redis.Nil {
logger.Println("consumer Query :", err)
return nil, false, err
}
if len(payload) == 0 {
return nil, false, nil
}
var auth Authorization
if err := json.Unmarshal([]byte(payload), &auth); err != nil {
return nil, false, err
}
ttl, err := c.redisClient.TTL(c.ctx, string(sk)).Result()
if err != nil {
logger.Println("consumer Query :", err)
return nil, false, err
}
si := &sessionRedis{
Authorization: &auth,
expireAt: time.Now().Add(ttl),
}
c.add_internal(sk, si)
return si, true, nil
}
func (c *consumer_redis) Query(pk string) (Authorization, error) {
c.lock.Lock()
defer c.lock.Unlock()
sk := publickey_to_storagekey(publickey(pk))
si, _, err := c.query_internal(sk)
if err != nil {
logger.Println("session consumer query :", pk, err)
return Authorization{}, err
}
if si == nil {
logger.Println("session consumer query(si nil) :", pk, nil)
return Authorization{}, nil
}
if time.Now().After(si.expireAt) {
logger.Println("session consumer query(expired):", pk, nil)
return Authorization{}, nil
}
return *si.Authorization, nil
}
func (c *consumer_redis) Touch(pk string) (Authorization, error) {
c.lock.Lock()
defer c.lock.Unlock()
sk := publickey_to_storagekey(publickey(pk))
if _, deleted := c.stages[0].deleted[sk]; deleted {
return Authorization{}, nil
}
if _, deleted := c.stages[1].deleted[sk]; deleted {
return Authorization{}, nil
}
ok, err := c.redisClient.Expire(c.ctx, string(sk), c.ttl).Result()
if err == redis.Nil {
logger.Println("session consumer touch :", pk, err)
return Authorization{}, nil
} else if err != nil {
logger.Println("session consumer touch :", pk, err)
return Authorization{}, err
}
if ok {
// redis에 살아있다.
si, added, err := c.query_internal(sk)
if err != nil {
logger.Println("session consumer touch(ok) :", pk, err)
return Authorization{}, err
}
if si == nil {
logger.Println("session consumer touch(ok, si nil) :", pk)
return Authorization{}, nil
}
if !added {
si.expireAt = time.Now().Add(c.ttl)
// stage 0으로 옮기기 위해 add_internal을 다시 부름
c.add_internal(sk, si)
}
return *si.Authorization, nil
}
return Authorization{}, nil
}
func (c *consumer_redis) RegisterOnSessionInvalidated(cb func(primitive.ObjectID)) {
c.onSessionInvalidated = append(c.onSessionInvalidated, cb)
}

View File

@ -1,93 +0,0 @@
// package main ...
package session
import (
"context"
"testing"
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon/logger"
)
func TestExpTable(t *testing.T) {
// pv, err := NewProvider(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second)
// if err != nil {
// t.Error(err)
// }
// cs, err := NewConsumer(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second)
// if err != nil {
// t.Error(err)
// }
pv, err := NewProvider(context.Background(), "mongodb://192.168.8.94:27017/maingate?replicaSet=repl01&retrywrites=false", 10*time.Second)
if err != nil {
t.Error(err)
}
cs, err := NewConsumer(context.Background(), "mongodb://192.168.8.94:27017/maingate?replicaSet=repl01&retrywrites=false", 10*time.Second)
if err != nil {
t.Error(err)
}
test := primitive.NewObjectID()
sk := make_storagekey(test)
pk := storagekey_to_publickey(sk)
if publickey_to_storagekey(pk) != sk {
t.Errorf("pk != sk : %s, %s", pk, sk)
}
au1 := &Authorization{
Account: primitive.NewObjectID(),
Platform: "editor",
Uid: "uid-1",
}
sk1, err := pv.New(au1)
if err != nil {
t.Error(err)
}
au2 := &Authorization{
Account: primitive.NewObjectID(),
Platform: "editor",
Uid: "uid-2",
}
sk2, err := pv.New(au2)
if err != nil {
t.Error(err)
}
go func() {
for {
q1, err := cs.Query(sk1)
logger.Println("query :", q1, err)
q2, err := cs.Query(sk2)
logger.Println("query :", q2, err)
time.Sleep(time.Second)
}
}()
cs.Touch(sk1)
time.Sleep(2 * time.Second)
cs.Touch(sk2)
time.Sleep(2 * time.Second)
time.Sleep(2 * time.Second)
pv.Delete(au1.Account)
cs.Touch(sk1)
time.Sleep(2 * time.Second)
cs.Touch(sk2)
time.Sleep(2 * time.Second)
cs2, err := NewConsumer(context.Background(), "redis://192.168.8.94:6380/1", 10*time.Second)
if err != nil {
t.Error(err)
}
q2, err := cs2.Query(sk2)
logger.Println("queryf :", q2, err)
time.Sleep(20 * time.Second)
}

View File

@ -1,146 +0,0 @@
package voicechat
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"sync/atomic"
"time"
"unsafe"
"repositories.action2quare.com/ayo/gocommon/logger"
)
type eosauth struct {
AccessToken string `json:"access_token"`
ExpiresAt time.Time `json:"expires_at"`
ExpiresIn int64 `json:"expires_in"`
DeploymentId string `json:"deployment_id"`
ProductId string `json:"product_id"`
SandboxId string `json:"sandbox_id"`
TokenType string `json:"token_type"`
}
func eosTokenRefresh(target *unsafe.Pointer, ctx context.Context) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
endpoint := "https://api.epicgames.dev/auth/v1/oauth/token"
auth := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", config.EosClientId, config.EosClientSecret)))
for {
req, _ := http.NewRequest("POST", endpoint, bytes.NewBufferString("grant_type=client_credentials&deployment_id="+config.EosDeploymentId))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Accept", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Basic %s", auth))
resp, err := http.DefaultClient.Do(req)
if err != nil {
logger.Println("eosTokenRefresh failed. eos token reqeust err :", err)
time.Sleep(time.Minute)
continue
}
var neweos eosauth
err = json.NewDecoder(resp.Body).Decode(&neweos)
resp.Body.Close()
if err != nil {
logger.Println("eosTokenRefresh failed. decode err :", err)
return
}
logger.Printf("eos access_token retreived : %s...", neweos.AccessToken[:20])
atomic.StorePointer(target, unsafe.Pointer(&neweos))
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(neweos.ExpiresIn-60) * time.Second):
}
}
}
type eosRoomParticipantRequests struct {
Puid string `json:"puid"`
ClientIP string `json:"clientIP"`
HardMuted bool `json:"hardMuted"`
}
type eosRoomParticipants struct {
Participants []eosRoomParticipantRequests `json:"participants"`
}
func (gv *eosauth) joinVoiceChat(data JoinVoiceChatRequst) map[string]any {
// https://dev.epicgames.com/docs/web-api-ref/voice-web-api
accessToken := gv.AccessToken
if len(accessToken) == 0 {
logger.Println("eos voice chat is not ready. access_token is empty")
return nil
}
voiceendpoint := fmt.Sprintf("https://api.epicgames.dev/rtc/v1/%s/room/%s", config.EosDeploymentId, data.Gid)
participants := eosRoomParticipants{
Participants: []eosRoomParticipantRequests{
{Puid: data.Alias},
},
}
body, _ := json.Marshal(participants)
req, _ := http.NewRequest("POST", voiceendpoint, bytes.NewBuffer(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", accessToken))
resp, err := http.DefaultClient.Do(req)
if err != nil {
logger.Println("join voice room failed. api.epicgames.dev return err :", err)
return nil
}
var result map[string]any
json.NewDecoder(resp.Body).Decode(&result)
resp.Body.Close()
result["client_id"] = config.EosClientId
result["client_secret"] = config.EosClientSecret
par := result["participants"].([]any)[0]
participant := par.(map[string]any)
channelCredentials := map[string]any{
"override_userid": data.Alias,
"client_base_url": result["clientBaseUrl"],
"participant_token": participant["token"],
}
marshaled, _ := json.Marshal(channelCredentials)
result["channel_credentials"] = base64.StdEncoding.EncodeToString(marshaled)
return result
}
func (gv *eosauth) leaveVoiceChat(data LeaveVoiceChatRequst) {
voiceendpoint := fmt.Sprintf("https://api.epicgames.dev/rtc/v1/%s/room/%s/participants/%s", config.EosDeploymentId, data.Gid, data.Alias)
accessToken := gv.AccessToken
req, _ := http.NewRequest("DELETE", voiceendpoint, nil)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", accessToken))
resp, err := http.DefaultClient.Do(req)
if err != nil {
logger.Println("LeaveVoiceChat failed. err :", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
// 204가 정상 응답
logger.Println("LeaveVoiceChat failed. status code :", resp.StatusCode)
}
}

View File

@ -1,83 +0,0 @@
package voicechat
import (
"context"
"sync/atomic"
"unsafe"
"repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/logger"
)
type JoinVoiceChatRequst struct {
Gid string
Alias string
Service string
}
type LeaveVoiceChatRequst struct {
Gid string
Alias string
Service string
}
type voiceChatConfig struct {
EosClientId string `json:"eos_client_id"`
EosClientSecret string `json:"eos_client_secret"`
EosDeploymentId string `json:"eos_deployment_id"`
}
type voiceServiceImpl interface {
joinVoiceChat(JoinVoiceChatRequst) map[string]any
leaveVoiceChat(LeaveVoiceChatRequst)
}
var config voiceChatConfig
type VoiceChatService struct {
eosptr unsafe.Pointer
}
func (gv *VoiceChatService) Initialize(ctx context.Context) error {
if err := gocommon.LoadConfig(&config); err != nil {
return logger.ErrorWithCallStack(err)
}
if len(config.EosClientId) == 0 {
logger.Println("eos voice chat is disabled. 'eos_client_id' is empty")
}
if len(config.EosClientSecret) == 0 {
logger.Println("eos voice chat is disabled. 'eos_client_secret' is empty")
}
if len(config.EosDeploymentId) == 0 {
logger.Println("eos voice chat is disabled. 'eos_deployment_id' is empty")
}
gv.eosptr = unsafe.Pointer(&eosauth{})
if len(config.EosClientId) > 0 && len(config.EosClientSecret) > 0 && len(config.EosDeploymentId) > 0 {
go eosTokenRefresh(&gv.eosptr, ctx)
}
return nil
}
func (gv *VoiceChatService) eos() voiceServiceImpl {
ptr := atomic.LoadPointer(&gv.eosptr)
return (*eosauth)(ptr)
}
func (gv *VoiceChatService) JoinVoiceChat(req JoinVoiceChatRequst) map[string]any {
switch req.Service {
case "eos":
return gv.eos().joinVoiceChat(req)
}
return nil
}
func (gv *VoiceChatService) LeaveVoiceChat(req LeaveVoiceChatRequst) {
switch req.Service {
case "eos":
gv.eos().leaveVoiceChat(req)
}
}

View File

@ -1,185 +0,0 @@
package wshandler
import (
"encoding/json"
"io"
"reflect"
"strings"
"unsafe"
"github.com/gorilla/websocket"
"repositories.action2quare.com/ayo/gocommon/logger"
)
const (
ClientConnected = "ClientConnected"
ClientDisconnected = "ClientDisconnected"
)
type apiFuncType func(ApiCallContext)
type connFuncType func(*websocket.Conn, *Sender)
type disconnFuncType func(string, *Sender)
type WebsocketApiHandler struct {
methods map[string]apiFuncType
connfunc connFuncType
disconnfunc disconnFuncType
originalReceiverName string
}
type ApiCallContext struct {
CallBy *Sender
Arguments []any
}
func MakeWebsocketApiHandler[T any](receiver *T, receiverName string) WebsocketApiHandler {
methods := make(map[string]apiFuncType)
tp := reflect.TypeOf(receiver)
if len(receiverName) == 0 {
receiverName = tp.Elem().Name()
}
var connfunc connFuncType
var disconnfunc disconnFuncType
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
if method.Type.In(0) != tp {
continue
}
if method.Name == ClientConnected {
if method.Type.NumIn() != 3 {
continue
}
if method.Type.In(1) != reflect.TypeOf((*websocket.Conn)(nil)) {
continue
}
if method.Type.In(2) != reflect.TypeOf((*Sender)(nil)) {
continue
}
funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1)
connfuncptr := (*func(*T, *websocket.Conn, *Sender))(p2)
connfunc = func(c *websocket.Conn, s *Sender) {
(*connfuncptr)(receiver, c, s)
}
} else if method.Name == ClientDisconnected {
if method.Type.NumIn() != 3 {
continue
}
if method.Type.In(1) != reflect.TypeOf("") {
continue
}
if method.Type.In(2) != reflect.TypeOf((*Sender)(nil)) {
continue
}
funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1)
disconnfuncptr := (*func(*T, string, *Sender))(p2)
disconnfunc = func(msg string, s *Sender) {
(*disconnfuncptr)(receiver, msg, s)
}
} else {
if method.Type.NumIn() != 2 {
continue
}
if method.Type.In(1) != reflect.TypeOf((*ApiCallContext)(nil)).Elem() {
continue
}
funcptr := method.Func.Pointer()
p1 := unsafe.Pointer(&funcptr)
p2 := unsafe.Pointer(&p1)
apifuncptr := (*func(*T, ApiCallContext))(p2)
methods[receiverName+"."+method.Name] = func(ctx ApiCallContext) {
(*apifuncptr)(receiver, ctx)
}
}
}
return WebsocketApiHandler{
methods: methods,
connfunc: connfunc,
disconnfunc: disconnfunc,
originalReceiverName: tp.Elem().Name(),
}
}
type WebsocketApiBroker struct {
methods map[string]apiFuncType
methods_dup map[string][]apiFuncType
connFuncs []connFuncType
disconnFuncs []disconnFuncType
}
func (hc *WebsocketApiBroker) AddHandler(receiver WebsocketApiHandler) {
if hc.methods == nil {
hc.methods = make(map[string]apiFuncType)
hc.methods_dup = make(map[string][]apiFuncType)
}
for k, v := range receiver.methods {
ab := strings.Split(k, ".")
logger.Printf("ws api registered : %s.%s -> %s\n", receiver.originalReceiverName, ab[1], k)
hc.methods_dup[k] = append(hc.methods_dup[k], v)
if len(hc.methods_dup[k]) > 1 {
chain := hc.methods_dup[k]
hc.methods[k] = func(ctx ApiCallContext) {
for _, f := range chain {
f(ctx)
}
}
} else {
hc.methods[k] = v
}
}
if receiver.connfunc != nil {
logger.Printf("ws api registered : %s.ClientConnected\n", receiver.originalReceiverName)
hc.connFuncs = append(hc.connFuncs, receiver.connfunc)
}
if receiver.disconnfunc != nil {
// disconnfunc은 역순
logger.Printf("ws api registered : %s.ClientDisconnected\n", receiver.originalReceiverName)
hc.disconnFuncs = append([]disconnFuncType{receiver.disconnfunc}, hc.disconnFuncs...)
}
}
func (hc *WebsocketApiBroker) ClientConnected(c *wsconn) {
for _, v := range hc.connFuncs {
v(c.Conn, c.sender)
}
}
func (hc *WebsocketApiBroker) ClientDisconnected(c *wsconn) {
for _, v := range hc.disconnFuncs {
v(c.closeMessage, c.sender)
}
}
func (hc *WebsocketApiBroker) Call(callby *Sender, funcname string, r io.Reader) {
if found := hc.methods[funcname]; found != nil {
var args []any
if r != nil {
dec := json.NewDecoder(r)
if err := dec.Decode(&args); err != nil {
logger.Println("WebsocketApiBroker.Call failed. decode returns err :", err)
}
}
found(ApiCallContext{
CallBy: callby,
Arguments: args,
})
} else {
logger.Println("api is not found :", funcname)
}
}

View File

@ -1,66 +0,0 @@
// package main ...
package wshandler
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"testing"
)
type TestReceiver struct {
}
func (tr *TestReceiver) Func1([]any) {
}
func (tr *TestReceiver) Func2(args []any) {
fmt.Println(args...)
}
func TestUnmarshalToken(t *testing.T) {
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7
// 012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890
insrc := []byte(`["string value 1",200,["inner string value 1","inner string value 2"],{"inner map key 1":"inner map string value 1","inner map key 2":"inner map string value 2"},500.1]`)
dec := json.NewDecoder(bytes.NewBuffer(insrc))
dec.Token()
for {
token_start := dec.InputOffset()
tok, _ := dec.Token()
token_end := dec.InputOffset()
var string_val_1 string
stringtype := reflect.TypeOf(string_val_1)
stringvalue := reflect.New(stringtype)
castptr := stringvalue.Interface()
err := json.Unmarshal(insrc[token_start:token_end], castptr)
fmt.Println(err)
if tok == nil {
break
}
fmt.Println(tok, dec.InputOffset())
}
// src := []any{"a", 1, false}
// payload, _ := json.Marshal(src)
// tr := new(TestReceiver)
// receiver := MakeWebsocketApiHandler(tr, "test")
// var con WebsocketApiBroker
// con.AddHandler(receiver)
}
func TestUnmarshal(t *testing.T) {
src := []byte(`{"123" :"str1", "456": "str2"}`)
var test map[int]string
err := json.Unmarshal(src, &test)
if err != nil {
t.Error(err)
}
}

View File

@ -1 +0,0 @@
{}

View File

@ -1,108 +0,0 @@
package wshandler
import (
"bytes"
"context"
"encoding/json"
"github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon/logger"
)
type room struct {
inChan chan *wsconn
outChan chan primitive.ObjectID
messageChan chan *UpstreamMessage
name string
destroyChan chan<- string
sendMsgChan chan<- send_msg_queue_elem
}
// 만약 destroyChan가 nil이면 room이 비어도 파괴되지 않는다. 영구 유지되는 room
func makeRoom(name string, destroyChan chan<- string, sendMsgChan chan<- send_msg_queue_elem) *room {
return &room{
inChan: make(chan *wsconn, 10),
outChan: make(chan primitive.ObjectID, 10),
messageChan: make(chan *UpstreamMessage, 100),
name: name,
destroyChan: destroyChan,
sendMsgChan: sendMsgChan,
}
}
func (r *room) broadcast(msg *UpstreamMessage) {
r.messageChan <- msg
}
func (r *room) in(conn *wsconn) *room {
r.inChan <- conn
return r
}
func (r *room) out(accid primitive.ObjectID) *room {
r.outChan <- accid
return r
}
func (r *room) start(ctx context.Context) {
go func(ctx context.Context) {
conns := make(map[string]*wsconn)
normal := false
for !normal {
normal = r.loop(ctx, &conns)
}
}(ctx)
}
func (r *room) loop(ctx context.Context, conns *map[string]*wsconn) (normalEnd bool) {
defer func() {
s := recover()
if s != nil {
logger.Error(s)
normalEnd = false
}
}()
tag := "#" + r.name
for {
select {
case <-ctx.Done():
return true
case conn := <-r.inChan:
(*conns)[conn.sender.Accid.Hex()] = conn
case accid := <-r.outChan:
delete((*conns), accid.Hex())
if len(*conns) == 0 && r.destroyChan != nil {
r.destroyChan <- r.name
return true
}
case msg := <-r.messageChan:
ds := DownstreamMessage{
Alias: msg.Alias,
Body: msg.Body,
Tag: append(msg.Tag, tag),
}
buff := new(bytes.Buffer)
enc := json.NewEncoder(buff)
enc.SetEscapeHTML(false)
enc.Encode(ds)
for _, conn := range *conns {
pmsg, err := websocket.NewPreparedMessage(websocket.TextMessage, buff.Bytes())
if err != nil {
logger.Println("websocket.NewPreparedMessage failed :", err)
} else {
r.sendMsgChan <- send_msg_queue_elem{
to: conn.Conn,
pmsg: pmsg,
}
}
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,389 +0,0 @@
package wshandler
import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"reflect"
"strings"
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/session"
"github.com/gorilla/websocket"
)
type WebsocketPeerHandler interface {
RegisterHandlers(serveMux *http.ServeMux, prefix string) error
}
type peerCtorChannelValue struct {
accid primitive.ObjectID
conn *websocket.Conn
}
type peerDtorChannelValue struct {
accid primitive.ObjectID
closed bool
}
type websocketPeerHandler[T PeerInterface] struct {
methods map[string]peerApiFuncType[T]
createPeer func(primitive.ObjectID) T
sessionConsumer session.Consumer
peerCtorChannel chan peerCtorChannelValue
peerDtorChannel chan peerDtorChannelValue
}
type PeerInterface interface {
ClientDisconnected(string)
ClientConnected(*websocket.Conn)
}
type peerApiFuncType[T PeerInterface] func(T, io.Reader) (any, error)
type websocketPeerApiHandler[T PeerInterface] struct {
methods map[string]peerApiFuncType[T]
originalReceiverName string
}
func (hc *websocketPeerHandler[T]) call(recv T, funcname string, r io.Reader) (v any, e error) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
e = fmt.Errorf("%v", r)
}
}()
if found := hc.methods[funcname]; found != nil {
return found(recv, r)
}
return nil, fmt.Errorf("api is not found : %s", funcname)
}
func makeWebsocketPeerApiHandler[T PeerInterface]() websocketPeerApiHandler[T] {
methods := make(map[string]peerApiFuncType[T])
var archetype T
tp := reflect.TypeOf(archetype)
for i := 0; i < tp.NumMethod(); i++ {
method := tp.Method(i)
if method.Type.In(0) != tp {
continue
}
if method.Name == ClientDisconnected {
continue
}
var intypes []reflect.Type
for i := 1; i < method.Type.NumIn(); i++ {
intypes = append(intypes, method.Type.In(i))
}
var outconv func([]reflect.Value) (any, error)
if method.Type.NumOut() == 0 {
outconv = func([]reflect.Value) (any, error) { return nil, nil }
} else if method.Type.NumOut() == 1 {
if method.Type.Out(0).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
outconv = func(out []reflect.Value) (any, error) {
if out[0].Interface() == nil {
return nil, nil
}
return nil, out[0].Interface().(error)
}
} else {
outconv = func(out []reflect.Value) (any, error) {
return out[0].Interface(), nil
}
}
} else if method.Type.NumOut() == 2 && method.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
outconv = func(out []reflect.Value) (any, error) {
if out[1].Interface() == nil {
return out[0].Interface(), nil
}
return out[0].Interface(), out[1].Interface().(error)
}
}
methods[method.Name] = func(recv T, r io.Reader) (any, error) {
decoder := json.NewDecoder(r)
inargs := make([]any, len(intypes))
for i, intype := range intypes {
zerovalueptr := reflect.New(intype)
inargs[i] = zerovalueptr.Interface()
}
err := decoder.Decode(&inargs)
if err != nil {
return nil, err
}
reflectargs := make([]reflect.Value, 0, len(inargs)+1)
reflectargs = append(reflectargs, reflect.ValueOf(recv))
for _, p := range inargs {
reflectargs = append(reflectargs, reflect.ValueOf(p).Elem())
}
return outconv(method.Func.Call(reflectargs))
}
}
return websocketPeerApiHandler[T]{
methods: methods,
originalReceiverName: tp.Elem().Name(),
}
}
func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer, creator func(primitive.ObjectID) T) WebsocketPeerHandler {
methods := make(map[string]peerApiFuncType[T])
receiver := makeWebsocketPeerApiHandler[T]()
for k, v := range receiver.methods {
logger.Printf("ws api registered : %s.%s\n", receiver.originalReceiverName, k)
methods[k] = v
}
wsh := &websocketPeerHandler[T]{
sessionConsumer: consumer,
methods: methods,
createPeer: creator,
peerCtorChannel: make(chan peerCtorChannelValue),
peerDtorChannel: make(chan peerDtorChannelValue),
}
consumer.RegisterOnSessionInvalidated(wsh.onSessionInvalidated)
return wsh
}
func (ws *websocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, prefix string) error {
if *noAuthFlag {
serveMux.HandleFunc(prefix, ws.upgrade_noauth)
} else {
serveMux.HandleFunc(prefix, ws.upgrade)
}
go ws.sessionMonitoring()
return nil
}
func (ws *websocketPeerHandler[T]) onSessionInvalidated(accid primitive.ObjectID) {
ws.peerDtorChannel <- peerDtorChannelValue{
accid: accid,
closed: false,
}
}
func (ws *websocketPeerHandler[T]) sessionMonitoring() {
all := make(map[primitive.ObjectID]*websocket.Conn)
unauthdata := []byte{0x03, 0xec}
unauthdata = append(unauthdata, []byte("unauthorized")...)
for {
select {
case estVal := <-ws.peerCtorChannel:
all[estVal.accid] = estVal.conn
case disVal := <-ws.peerDtorChannel:
if disVal.closed {
delete(all, disVal.accid)
} else if c := all[disVal.accid]; c != nil {
c.WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
delete(all, disVal.accid)
}
}
}
}
func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) {
go func(c *websocket.Conn, accid primitive.ObjectID) {
peer := ws.createPeer(accid)
var closeReason string
peer.ClientConnected(conn)
ws.peerCtorChannel <- peerCtorChannelValue{accid: accid, conn: conn}
defer func() {
ws.peerDtorChannel <- peerDtorChannelValue{accid: accid, closed: true}
peer.ClientDisconnected(closeReason)
}()
response := make([]byte, 255)
for {
response = response[:5]
messageType, r, err := c.NextReader()
if err != nil {
if ce, ok := err.(*websocket.CloseError); ok {
closeReason = ce.Text
}
c.Close()
break
}
if messageType == websocket.CloseMessage {
closeMsg, _ := io.ReadAll(r)
closeReason = string(closeMsg)
break
}
if messageType == websocket.BinaryMessage {
var flag [1]byte
r.Read(flag[:])
if flag[0] == 0xff {
// nonce
r.Read(response[1:5])
var size [1]byte
r.Read(size[:])
cmd := make([]byte, size[0])
r.Read(cmd)
result, err := ws.call(peer, string(cmd), r)
if err != nil {
response[0] = 21 // 21 : Negative Ack
response = append(response, []byte(err.Error())...)
} else {
response[0] = 6 // 6 : Acknowledgement
switch result := result.(type) {
case string:
response = append(response, []byte(result)...)
case int8, int16, int32, int64, uint8, uint16, uint32, uint64:
response = append(response, []byte(fmt.Sprintf("%d", result))...)
case float32, float64:
response = append(response, []byte(fmt.Sprintf("%f", result))...)
case []byte:
response = append(response, result...)
default:
j, _ := json.Marshal(result)
response = append(response, j...)
}
}
pmsg, err := websocket.NewPreparedMessage(websocket.BinaryMessage, response)
if err != nil {
logger.Println("websocket.NewPreparedMessage failed :", err)
} else {
c.WritePreparedMessage(pmsg)
}
} else {
cmd := make([]byte, flag[0])
r.Read(cmd)
ws.call(peer, string(cmd), r)
}
}
}
}(conn, accid)
}
func (ws *websocketPeerHandler[T]) upgrade_noauth(w http.ResponseWriter, r *http.Request) {
// 클라이언트 접속
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
io.Copy(io.Discard, r.Body)
r.Body.Close()
}()
sk := r.Header.Get("AS-X-SESSION")
var accid primitive.ObjectID
if len(sk) > 0 {
logger.Println("WebsocketHandler.upgrade sk :", sk)
authinfo, err := ws.sessionConsumer.Query(sk)
if err == nil {
accid = authinfo.Account
}
}
if accid.IsZero() {
auth := strings.Split(r.Header.Get("Authorization"), " ")
if len(auth) != 2 {
w.WriteHeader(http.StatusBadRequest)
return
}
temp, err := hex.DecodeString(auth[1])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
if len(temp) != len(primitive.NilObjectID) {
w.WriteHeader(http.StatusBadRequest)
return
}
raw := (*[12]byte)(temp)
accid = primitive.ObjectID(*raw)
}
var upgrader = websocket.Upgrader{} // use default options
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// var alias string
// if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 {
// vt, _ := base64.StdEncoding.DecodeString(v)
// alias = string(vt)
// } else {
// alias = accid.Hex()
// }
nonce := rand.New(rand.NewSource(time.Now().UnixNano())).Uint32()
ws.upgrade_core(conn, accid, nonce)
}
func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) {
// 클라이언트 접속
defer func() {
s := recover()
if s != nil {
logger.Error(s)
}
io.Copy(io.Discard, r.Body)
r.Body.Close()
}()
sk := r.Header.Get("AS-X-SESSION")
logger.Println("WebsocketHandler.upgrade sk :", sk)
authinfo, err := ws.sessionConsumer.Query(sk)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.Error("authorize query failed :", err)
return
}
var upgrader = websocket.Upgrader{} // use default options
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// var alias string
// if v := r.Header.Get("AS-X-ALIAS"); len(v) > 0 {
// vt, _ := base64.StdEncoding.DecodeString(v)
// alias = string(vt)
// } else {
// alias = authinfo.Account.Hex()
// }
nonce := rand.New(rand.NewSource(time.Now().UnixNano())).Uint32()
ws.upgrade_core(conn, authinfo.Account, nonce)
}