Files
gocommon/rpc/rpc.go

219 lines
4.3 KiB
Go
Raw Normal View History

2023-07-10 09:42:34 +09:00
package rpc
import (
"bytes"
"context"
2023-07-10 11:43:52 +09:00
"crypto/md5"
2023-07-10 09:42:34 +09:00
"encoding/gob"
2023-07-10 11:43:52 +09:00
"encoding/hex"
2023-07-10 09:42:34 +09:00
"errors"
"fmt"
2023-07-10 10:53:49 +09:00
"path"
2023-07-10 09:42:34 +09:00
"reflect"
"runtime"
"strings"
"time"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/bson/primitive"
"repositories.action2quare.com/ayo/gocommon/logger"
)
type Receiver interface {
TargetExists(primitive.ObjectID) bool
}
type receiverManifest struct {
r Receiver
methods map[string]reflect.Method
}
type rpcEngine struct {
receivers map[string]receiverManifest
publish func([]byte) error
}
var engine = rpcEngine{
receivers: make(map[string]receiverManifest),
}
func RegistReceiver(ptr Receiver) {
rname := reflect.TypeOf(ptr).Elem().Name()
rname = fmt.Sprintf("(*%s)", rname)
methods := make(map[string]reflect.Method)
for i := 0; i < reflect.TypeOf(ptr).NumMethod(); i++ {
method := reflect.TypeOf(ptr).Method(i)
methods[method.Name] = method
}
engine.receivers[rname] = receiverManifest{
r: ptr,
methods: methods,
}
}
func Start(ctx context.Context, redisClient *redis.Client) {
if engine.publish != nil {
return
}
2023-07-10 11:43:52 +09:00
hash := md5.New()
for k, manifest := range engine.receivers {
2023-07-10 11:43:52 +09:00
hash.Write([]byte(k))
for m, r := range manifest.methods {
hash.Write([]byte(m))
hash.Write([]byte(r.Name))
for i := 0; i < r.Type.NumIn(); i++ {
inName := r.Type.In(i).Name()
hash.Write([]byte(inName))
}
}
Squashed commit of the following: commit 29b2f258507d9e11e20a9693b86b3ae09e10d88c Author: mountain <mountain@action2quare.com> Date: Wed Jul 19 09:33:37 2023 +0900 타입 이름 변경 commit 256bfd030c294d2d7bea4ca2c3082f2d49ae9aef Author: mountain <mountain@action2quare.com> Date: Wed Jul 19 09:31:01 2023 +0900 redison 추가 commit 72a683fed2c024616b2171be1f6841a11150a3a8 Author: mountain <mountain@action2quare.com> Date: Tue Jul 18 19:51:24 2023 +0900 gob에 []any 추가 commit 89fa9e4ac585026c331d697929697af5defc6b9d Author: mountain <mountain@action2quare.com> Date: Tue Jul 18 17:45:12 2023 +0900 write control 수정 commit d724cc84fa94ab6cdd3d8bddd3ab08c99d0aef3a Author: mountain <mountain@action2quare.com> Date: Tue Jul 18 17:38:04 2023 +0900 redis pubsub 채널 이름에 디비 인덱스 추가 commit 8df248fa54908a8cb547813179e4269e25c7df87 Author: mountain <mountain@action2quare.com> Date: Tue Jul 18 17:20:47 2023 +0900 close를 writecontrol로 변경 commit 40a603522d4082f426a0d3818d852db53e458cd2 Author: mountain <mountain@action2quare.com> Date: Tue Jul 18 12:21:06 2023 +0900 conn에 msg를 쓰는 쓰레드 단일화 commit c21017d2cd8b2bd26bdbf6d4eca49d06b8462ce0 Author: mountain <mountain@action2quare.com> Date: Tue Jul 18 11:08:38 2023 +0900 redis call이 문제가 아니었음 commit 82abcddb497bdb95b0eff2d7a98b72cacf37bc9a Author: mountain <mountain@action2quare.com> Date: Tue Jul 18 11:04:15 2023 +0900 잦은 redis call 회피 commit 289af24a8ffaa55336bfabca151a71f6e1f82290 Author: mountain <mountain@action2quare.com> Date: Tue Jul 18 09:55:18 2023 +0900 room create 메시지 전송 commit 4b35e0e6386b1a27e4dec854d1fc2c755f20aeef Author: mountain <mountain@action2quare.com> Date: Tue Jul 18 09:45:27 2023 +0900 EventReceiver 인터페이스 추가 commit 29843802ff0eb6378af63b2a14de826a594f5a9e Author: mountain <mountain@action2quare.com> Date: Mon Jul 17 17:45:40 2023 +0900 gob 등록 commit 66aea48fb7322728d0f665e37b2dd818f441c26f Author: mountain <mountain@action2quare.com> Date: Sun Jul 16 18:39:11 2023 +0900 채널간 publish marshalling을 gob으로 변경 commit 8f6c87a8aeb86f8fb41ba7a5ff75e3e8ee9ede2c Author: mountain <mountain@action2quare.com> Date: Sun Jul 16 16:37:02 2023 +0900 redis option을 copy로 변경 commit f0f459332d1a62a938a9b9b7ca34e3d3a2f26e8c Author: mountain <mountain@action2quare.com> Date: Sat Jul 15 17:08:33 2023 +0900 wshandler에서 authcache제거하고 config 포맷 변경
2023-07-19 09:35:25 +09:00
hash.Write([]byte(fmt.Sprintf("%d", redisClient.Options().DB)))
2023-07-10 11:43:52 +09:00
}
2023-07-10 11:43:52 +09:00
pubsubName := hex.EncodeToString(hash.Sum(nil))[:16]
2023-07-10 09:42:34 +09:00
engine.publish = func(s []byte) error {
_, err := redisClient.Publish(ctx, pubsubName, s).Result()
return err
}
go engine.loop(ctx, redisClient, pubsubName)
}
func (re *rpcEngine) callFromMessage(msg *redis.Message) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
encoded := []byte(msg.Payload)
var target primitive.ObjectID
copy(target[:], encoded[:12])
encoded = encoded[12:]
for i, c := range encoded {
if c == ')' {
if manifest, ok := re.receivers[string(encoded[:i+1])]; ok {
// 리시버 찾음
if manifest.r.TargetExists(target) {
// 이 리시버가 타겟을 가지고 있음
encoded = encoded[i+1:]
decoder := gob.NewDecoder(bytes.NewBuffer(encoded))
var params []any
if decoder.Decode(&params) == nil {
method := manifest.methods[params[0].(string)]
args := []reflect.Value{
reflect.ValueOf(manifest.r),
}
for _, arg := range params[1:] {
args = append(args, reflect.ValueOf(arg))
}
method.Func.Call(args)
}
}
}
}
}
}
func (re *rpcEngine) loop(ctx context.Context, redisClient *redis.Client, chanName string) {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
pubsub := redisClient.Subscribe(ctx, chanName)
for {
if ctx.Err() != nil {
return
}
if pubsub == nil {
pubsub = redisClient.Subscribe(ctx, chanName)
}
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
if err == redis.ErrClosed {
time.Sleep(time.Second)
}
pubsub = nil
} else {
re.callFromMessage(msg)
}
}
}
var errNoReceiver = errors.New("no receiver")
2023-07-10 14:30:24 +09:00
type CallContext struct {
2023-07-10 09:42:34 +09:00
r Receiver
t primitive.ObjectID
}
var ErrCanExecuteHere = errors.New("go ahead")
2023-07-10 14:30:24 +09:00
func (c *CallContext) Call(args ...any) error {
2023-07-10 09:42:34 +09:00
if c.r.TargetExists(c.t) {
// 여기 있네?
return ErrCanExecuteHere
}
pc := make([]uintptr, 1)
2023-07-10 10:53:49 +09:00
n := runtime.Callers(3, pc[:])
2023-07-10 09:42:34 +09:00
if n < 1 {
return errNoReceiver
}
frame, _ := runtime.CallersFrames(pc).Next()
2023-07-10 11:12:45 +09:00
fullname := path.Base(frame.Function)
prf := strings.Split(fullname, ".")
rname := prf[1]
funcname := prf[2]
2023-07-10 09:42:34 +09:00
serialized, err := encode(c.t, rname, funcname, args...)
if err != nil {
return err
}
return engine.publish(serialized)
}
2023-07-10 14:30:24 +09:00
func Make(r Receiver) *CallContext {
return &CallContext{
2023-07-10 10:29:16 +09:00
r: r,
}
2023-07-10 14:30:24 +09:00
}
func (cc *CallContext) To(target primitive.ObjectID) *CallContext {
cc.t = target
return cc
2023-07-10 10:29:16 +09:00
}
2023-07-10 09:42:34 +09:00
func encode(target primitive.ObjectID, receiver string, funcname string, args ...any) ([]byte, error) {
buff := new(bytes.Buffer)
// 타겟을 가장 먼저 기록
buff.Write(target[:])
// receiver
buff.Write([]byte(receiver))
// 다음 call context 기록
m := append([]any{funcname}, args...)
encoder := gob.NewEncoder(buff)
err := encoder.Encode(m)
if err != nil {
logger.Error("rpcCallContext.send err :", err)
return nil, err
}
return buff.Bytes(), nil
}