세션 무효화시 접속 종료 처리
This commit is contained in:
@ -257,6 +257,15 @@ func (c *consumer_redis) Touch(pk string) (Authorization, error) {
|
|||||||
defer c.lock.Unlock()
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
sk := publickey_to_storagekey(publickey(pk))
|
sk := publickey_to_storagekey(publickey(pk))
|
||||||
|
|
||||||
|
if _, deleted := c.stages[0].deleted[sk]; deleted {
|
||||||
|
return Authorization{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, deleted := c.stages[1].deleted[sk]; deleted {
|
||||||
|
return Authorization{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
ok, err := c.redisClient.Expire(c.ctx, string(sk), c.ttl).Result()
|
ok, err := c.redisClient.Expire(c.ctx, string(sk), c.ttl).Result()
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
logger.Println("session consumer touch :", pk, err)
|
logger.Println("session consumer touch :", pk, err)
|
||||||
|
|||||||
@ -93,6 +93,7 @@ type websocketHandlerBase struct {
|
|||||||
connInOutChan chan *wsconn
|
connInOutChan chan *wsconn
|
||||||
deliveryChan chan any
|
deliveryChan chan any
|
||||||
localDeliveryChan chan any
|
localDeliveryChan chan any
|
||||||
|
forceCloseChan chan primitive.ObjectID
|
||||||
sendMsgChan chan send_msg_queue_elem
|
sendMsgChan chan send_msg_queue_elem
|
||||||
|
|
||||||
connWaitGroup sync.WaitGroup
|
connWaitGroup sync.WaitGroup
|
||||||
@ -154,7 +155,7 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return &WebsocketHandler{
|
ws := &WebsocketHandler{
|
||||||
websocketHandlerBase: websocketHandlerBase{
|
websocketHandlerBase: websocketHandlerBase{
|
||||||
redisMsgChanName: fmt.Sprintf("_wsh_msg_%d", redisSync.Options().DB),
|
redisMsgChanName: fmt.Sprintf("_wsh_msg_%d", redisSync.Options().DB),
|
||||||
redisCmdChanName: fmt.Sprintf("_wsh_cmd_%d", redisSync.Options().DB),
|
redisCmdChanName: fmt.Sprintf("_wsh_cmd_%d", redisSync.Options().DB),
|
||||||
@ -162,10 +163,13 @@ func NewWebsocketHandler(consumer session.Consumer, redisUrl string) (*Websocket
|
|||||||
connInOutChan: make(chan *wsconn),
|
connInOutChan: make(chan *wsconn),
|
||||||
deliveryChan: make(chan any, 1000),
|
deliveryChan: make(chan any, 1000),
|
||||||
localDeliveryChan: make(chan any, 100),
|
localDeliveryChan: make(chan any, 100),
|
||||||
|
forceCloseChan: make(chan primitive.ObjectID),
|
||||||
sendMsgChan: sendchan,
|
sendMsgChan: sendchan,
|
||||||
sessionConsumer: consumer,
|
sessionConsumer: consumer,
|
||||||
},
|
},
|
||||||
}, nil
|
}
|
||||||
|
consumer.RegisterOnSessionInvalidated(ws.onSessionInvalidated)
|
||||||
|
return ws, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WebsocketHandler) Start(ctx context.Context) {
|
func (ws *WebsocketHandler) Start(ctx context.Context) {
|
||||||
@ -206,6 +210,10 @@ func (ws *WebsocketHandler) LeaveRoom(room string, accid primitive.ObjectID) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ws *WebsocketHandler) onSessionInvalidated(accid primitive.ObjectID) {
|
||||||
|
ws.forceCloseChan <- accid
|
||||||
|
}
|
||||||
|
|
||||||
func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
|
func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
|
||||||
defer func() {
|
defer func() {
|
||||||
ws.connWaitGroup.Done()
|
ws.connWaitGroup.Done()
|
||||||
@ -358,6 +366,9 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 유저에게서 온 메세지, 소켓 연결/해체 처리
|
// 유저에게서 온 메세지, 소켓 연결/해체 처리
|
||||||
|
unauthdata := []byte{0x03, 0xec}
|
||||||
|
unauthdata = append(unauthdata, []byte("unauthorized")...)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
buffer := bytes.NewBuffer(make([]byte, 0, 1024))
|
buffer := bytes.NewBuffer(make([]byte, 0, 1024))
|
||||||
buffer.Reset()
|
buffer.Reset()
|
||||||
@ -442,6 +453,11 @@ func (ws *WebsocketHandler) mainLoop(ctx context.Context) {
|
|||||||
logger.Println("ClientConnected :", c.sender.Alias)
|
logger.Println("ClientConnected :", c.sender.Alias)
|
||||||
go ws.ClientConnected(c)
|
go ws.ClientConnected(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case accid := <-ws.forceCloseChan:
|
||||||
|
if conn := entireConns[accid.Hex()]; conn != nil {
|
||||||
|
conn.WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -520,6 +536,18 @@ func (ws *WebsocketHandler) upgrade_nosession(w http.ResponseWriter, r *http.Req
|
|||||||
raw := (*[12]byte)(temp)
|
raw := (*[12]byte)(temp)
|
||||||
accid := primitive.ObjectID(*raw)
|
accid := primitive.ObjectID(*raw)
|
||||||
|
|
||||||
|
sk := r.Header.Get("AS-X-SESSION")
|
||||||
|
authinfo, err := ws.sessionConsumer.Query(sk)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if authinfo.Account != accid {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var upgrader = websocket.Upgrader{} // use default options
|
var upgrader = websocket.Upgrader{} // use default options
|
||||||
conn, err := upgrader.Upgrade(w, r, nil)
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -558,6 +586,11 @@ func (ws *WebsocketHandler) upgrade(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if authinfo.Account.IsZero() {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var upgrader = websocket.Upgrader{} // use default options
|
var upgrader = websocket.Upgrader{} // use default options
|
||||||
conn, err := upgrader.Upgrade(w, r, nil)
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -22,10 +22,23 @@ type WebsocketPeerHandler interface {
|
|||||||
RegisterHandlers(serveMux *http.ServeMux, prefix string) error
|
RegisterHandlers(serveMux *http.ServeMux, prefix string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type peerCtorChannelValue struct {
|
||||||
|
accid primitive.ObjectID
|
||||||
|
conn *websocket.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
type peerDtorChannelValue struct {
|
||||||
|
accid primitive.ObjectID
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
type websocketPeerHandler[T PeerInterface] struct {
|
type websocketPeerHandler[T PeerInterface] struct {
|
||||||
methods map[string]peerApiFuncType[T]
|
methods map[string]peerApiFuncType[T]
|
||||||
createPeer func(primitive.ObjectID) T
|
createPeer func(primitive.ObjectID) T
|
||||||
sessionConsumer session.Consumer
|
sessionConsumer session.Consumer
|
||||||
|
|
||||||
|
peerCtorChannel chan peerCtorChannelValue
|
||||||
|
peerDtorChannel chan peerDtorChannelValue
|
||||||
}
|
}
|
||||||
|
|
||||||
type PeerInterface interface {
|
type PeerInterface interface {
|
||||||
@ -144,8 +157,11 @@ func NewWebsocketPeerHandler[T PeerInterface](consumer session.Consumer, creator
|
|||||||
sessionConsumer: consumer,
|
sessionConsumer: consumer,
|
||||||
methods: methods,
|
methods: methods,
|
||||||
createPeer: creator,
|
createPeer: creator,
|
||||||
|
peerCtorChannel: make(chan peerCtorChannelValue),
|
||||||
|
peerDtorChannel: make(chan peerDtorChannelValue),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
consumer.RegisterOnSessionInvalidated(wsh.onSessionInvalidated)
|
||||||
return wsh
|
return wsh
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,18 +171,47 @@ func (ws *websocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, pre
|
|||||||
} else {
|
} else {
|
||||||
serveMux.HandleFunc(prefix, ws.upgrade)
|
serveMux.HandleFunc(prefix, ws.upgrade)
|
||||||
}
|
}
|
||||||
|
go ws.sessionMonitoring()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ws *websocketPeerHandler[T]) onSessionInvalidated(accid primitive.ObjectID) {
|
||||||
|
ws.peerDtorChannel <- peerDtorChannelValue{
|
||||||
|
accid: accid,
|
||||||
|
closed: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws *websocketPeerHandler[T]) sessionMonitoring() {
|
||||||
|
all := make(map[primitive.ObjectID]*websocket.Conn)
|
||||||
|
unauthdata := []byte{0x03, 0xec}
|
||||||
|
unauthdata = append(unauthdata, []byte("unauthorized")...)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case estVal := <-ws.peerCtorChannel:
|
||||||
|
all[estVal.accid] = estVal.conn
|
||||||
|
case disVal := <-ws.peerDtorChannel:
|
||||||
|
if disVal.closed {
|
||||||
|
delete(all, disVal.accid)
|
||||||
|
} else if c := all[disVal.accid]; c != nil {
|
||||||
|
c.WriteControl(websocket.CloseMessage, unauthdata, time.Time{})
|
||||||
|
delete(all, disVal.accid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) {
|
func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) {
|
||||||
go func(c *websocket.Conn, accid primitive.ObjectID) {
|
go func(c *websocket.Conn, accid primitive.ObjectID) {
|
||||||
peer := ws.createPeer(accid)
|
peer := ws.createPeer(accid)
|
||||||
var closeReason string
|
var closeReason string
|
||||||
|
|
||||||
peer.ClientConnected(conn)
|
peer.ClientConnected(conn)
|
||||||
|
ws.peerCtorChannel <- peerCtorChannelValue{accid: accid, conn: conn}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
ws.peerDtorChannel <- peerDtorChannelValue{accid: accid, closed: true}
|
||||||
peer.ClientDisconnected(closeReason)
|
peer.ClientDisconnected(closeReason)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user