pubsub: comments

This commit is contained in:
Ethan Buchman
2019-01-25 18:11:31 -05:00
committed by Anton Kaliaev
parent 57af99d901
commit 9b6b792ce7

View File

@@ -223,9 +223,11 @@ func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query)
}
// original query is used here because we're using pointers as map keys
// ?
select {
case s.cmds <- cmd{op: unsub, clientID: clientID, query: origQuery}:
s.mtx.Lock()
// if its the only query left, should we also delete the client?
delete(clientSubscriptions, query.String())
s.mtx.Unlock()
return nil
@@ -353,20 +355,24 @@ func (state *state) remove(clientID string, q Query) {
}
ch, ok := clientToChannelMap[clientID]
if ok {
close(ch)
if !ok {
return
}
delete(state.clients[clientID], q)
close(ch)
// if it not subscribed to anything else, remove the client
if len(state.clients[clientID]) == 0 {
delete(state.clients, clientID)
}
// remove the query from client map.
// if client is not subscribed to anything else, remove it.
delete(state.clients[clientID], q)
if len(state.clients[clientID]) == 0 {
delete(state.clients, clientID)
}
delete(state.queries[q], clientID)
if len(state.queries[q]) == 0 {
delete(state.queries, q)
}
// remove the client from query map.
// if query has no other clients subscribed, remove it.
delete(state.queries[q], clientID)
if len(state.queries[q]) == 0 {
delete(state.queries, q)
}
}
@@ -380,11 +386,15 @@ func (state *state) removeAll(clientID string) {
ch := state.queries[q][clientID]
close(ch)
// remove the client from query map.
// if query has no other clients subscribed, remove it.
delete(state.queries[q], clientID)
if len(state.queries[q]) == 0 {
delete(state.queries, q)
}
}
// remove the client.
delete(state.clients, clientID)
}