Merge branch 'master' into abci++

This commit is contained in:
marbar3778
2021-08-12 16:21:36 +02:00
173 changed files with 3461 additions and 1356 deletions

View File

@@ -27,15 +27,22 @@ func (bz *HexBytes) Unmarshal(data []byte) error {
return nil
}
// MarshalJSON implements the json.Marshaler interface. The hex bytes is a
// quoted hexadecimal encoded string.
// MarshalJSON implements the json.Marshaler interface. The encoding is a JSON
// quoted string of hexadecimal digits.
func (bz HexBytes) MarshalJSON() ([]byte, error) {
s := strings.ToUpper(hex.EncodeToString(bz))
jbz := make([]byte, len(s)+2)
jbz[0] = '"'
copy(jbz[1:], s)
jbz[len(jbz)-1] = '"'
return jbz, nil
size := hex.EncodedLen(len(bz)) + 2 // +2 for quotation marks
buf := make([]byte, size)
hex.Encode(buf[1:], []byte(bz))
buf[0] = '"'
buf[size-1] = '"'
// Ensure letter digits are capitalized.
for i := 1; i < size-1; i++ {
if buf[i] >= 'a' && buf[i] <= 'f' {
buf[i] = 'A' + (buf[i] - 'a')
}
}
return buf, nil
}
// UnmarshalJSON implements the json.Umarshaler interface.

View File

@@ -37,6 +37,7 @@ func TestJSONMarshal(t *testing.T) {
{[]byte(``), `{"B1":"","B2":""}`},
{[]byte(`a`), `{"B1":"YQ==","B2":"61"}`},
{[]byte(`abc`), `{"B1":"YWJj","B2":"616263"}`},
{[]byte("\x1a\x2b\x3c"), `{"B1":"Gis8","B2":"1A2B3C"}`},
}
for i, tc := range cases {

View File

@@ -231,34 +231,45 @@ func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error {
return err
}
var qs string
if args.Query != nil {
qs = args.Query.String()
}
s.mtx.RLock()
clientSubscriptions, ok := s.subscriptions[args.Subscriber]
if args.ID != "" {
qs, ok = clientSubscriptions[args.ID]
clientSubscriptions, err := func() (map[string]string, error) {
s.mtx.RLock()
defer s.mtx.RUnlock()
if ok && args.Query == nil {
var err error
args.Query, err = query.New(qs)
if err != nil {
return err
clientSubscriptions, ok := s.subscriptions[args.Subscriber]
if args.ID != "" {
qs, ok = clientSubscriptions[args.ID]
if ok && args.Query == nil {
var err error
args.Query, err = query.New(qs)
if err != nil {
return nil, err
}
}
} else if qs != "" {
args.ID, ok = clientSubscriptions[qs]
}
} else if qs != "" {
args.ID, ok = clientSubscriptions[qs]
}
s.mtx.RUnlock()
if !ok {
return ErrSubscriptionNotFound
if !ok {
return nil, ErrSubscriptionNotFound
}
return clientSubscriptions, nil
}()
if err != nil {
return err
}
select {
case s.cmds <- cmd{op: unsub, clientID: args.Subscriber, query: args.Query, subscription: &Subscription{id: args.ID}}:
s.mtx.Lock()
defer s.mtx.Unlock()
delete(clientSubscriptions, args.ID)
delete(clientSubscriptions, qs)
@@ -266,7 +277,6 @@ func (s *Server) Unsubscribe(ctx context.Context, args UnsubscribeArgs) error {
if len(clientSubscriptions) == 0 {
delete(s.subscriptions, args.Subscriber)
}
s.mtx.Unlock()
return nil
case <-ctx.Done():
return ctx.Err()
@@ -288,8 +298,10 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
select {
case s.cmds <- cmd{op: unsub, clientID: clientID}:
s.mtx.Lock()
defer s.mtx.Unlock()
delete(s.subscriptions, clientID)
s.mtx.Unlock()
return nil
case <-ctx.Done():
return ctx.Err()