From 0a65c6009e709d869400b944980084392b771437 Mon Sep 17 00:00:00 2001 From: mountain Date: Tue, 23 Apr 2024 15:57:25 +0900 Subject: [PATCH] =?UTF-8?q?=EC=84=B8=EC=85=98=20consumer=EC=97=90=20Revoke?= =?UTF-8?q?=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- session/impl_redis.go | 14 ++++++++++++-- wshandler/wshandler_peer.go | 31 +++++++++++++++---------------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/session/impl_redis.go b/session/impl_redis.go index 45f46c1..ad6aa49 100644 --- a/session/impl_redis.go +++ b/session/impl_redis.go @@ -50,8 +50,8 @@ func (p *provider_redis) New(input *Authorization) (string, error) { return "", err } + p.redisClient.Del(p.ctx, sks...) for _, sk := range sks { - p.redisClient.HSet(p.ctx, sk, "inv", "true") p.redisClient.Publish(p.ctx, p.deleteChannel, sk).Result() } @@ -323,7 +323,17 @@ func (c *consumer_redis) Revoke(pk string) { sk := publickey_to_storagekey(publickey(pk)) c.redisClient.Del(c.ctx, string(sk)) - c.redisClient.Publish(c.ctx, c.deleteChannel, string(sk)).Result() + + c.lock.Lock() + defer c.lock.Unlock() + + if sr, ok := c.stages[0].cache[sk]; ok { + c.stages[0].deleted[sk] = sr + } + + if sr, ok := c.stages[1].cache[sk]; ok { + c.stages[1].deleted[sk] = sr + } } func (c *consumer_redis) IsRevoked(accid primitive.ObjectID) bool { diff --git a/wshandler/wshandler_peer.go b/wshandler/wshandler_peer.go index 942f5ae..bbb5f98 100644 --- a/wshandler/wshandler_peer.go +++ b/wshandler/wshandler_peer.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "io" - "math/rand" "net/http" "reflect" "strings" @@ -28,8 +27,8 @@ type peerCtorChannelValue struct { } type peerDtorChannelValue struct { - accid primitive.ObjectID - closed bool + accid primitive.ObjectID + sk string } type websocketPeerHandler[T PeerInterface] struct { @@ -178,8 +177,7 @@ func (ws *websocketPeerHandler[T]) RegisterHandlers(serveMux *http.ServeMux, pre func (ws *websocketPeerHandler[T]) onSessionInvalidated(accid primitive.ObjectID) { ws.peerDtorChannel <- peerDtorChannelValue{ - accid: accid, - closed: false, + accid: accid, } } @@ -192,18 +190,21 @@ func (ws *websocketPeerHandler[T]) sessionMonitoring() { case estVal := <-ws.peerCtorChannel: all[estVal.accid] = estVal.conn case disVal := <-ws.peerDtorChannel: - if disVal.closed { - delete(all, disVal.accid) - } else if c := all[disVal.accid]; c != nil { + if c := all[disVal.accid]; c != nil { c.WriteControl(websocket.CloseMessage, unauthdata, time.Time{}) delete(all, disVal.accid) } + + if len(disVal.sk) > 0 { + ws.sessionConsumer.Revoke(disVal.sk) + delete(all, disVal.accid) + } } } } -func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, nonce uint32) { - go func(c *websocket.Conn, accid primitive.ObjectID) { +func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid primitive.ObjectID, sk string) { + go func(c *websocket.Conn, accid primitive.ObjectID, sk string) { peer := ws.createPeer(accid) var closeReason string @@ -211,7 +212,7 @@ func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid prim ws.peerCtorChannel <- peerCtorChannelValue{accid: accid, conn: conn} defer func() { - ws.peerDtorChannel <- peerDtorChannelValue{accid: accid, closed: true} + ws.peerDtorChannel <- peerDtorChannelValue{accid: accid, sk: sk} peer.ClientDisconnected(closeReason) }() @@ -285,7 +286,7 @@ func (ws *websocketPeerHandler[T]) upgrade_core(conn *websocket.Conn, accid prim } } } - }(conn, accid) + }(conn, accid, sk) } func (ws *websocketPeerHandler[T]) upgrade_noauth(w http.ResponseWriter, r *http.Request) { @@ -345,8 +346,7 @@ func (ws *websocketPeerHandler[T]) upgrade_noauth(w http.ResponseWriter, r *http // alias = accid.Hex() // } - nonce := rand.New(rand.NewSource(time.Now().UnixNano())).Uint32() - ws.upgrade_core(conn, accid, nonce) + ws.upgrade_core(conn, accid, sk) } func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Request) { @@ -387,6 +387,5 @@ func (ws *websocketPeerHandler[T]) upgrade(w http.ResponseWriter, r *http.Reques // } else { // alias = authinfo.Account.Hex() // } - nonce := rand.New(rand.NewSource(time.Now().UnixNano())).Uint32() - ws.upgrade_core(conn, authinfo.Account, nonce) + ws.upgrade_core(conn, authinfo.Account, sk) }