diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index 5734d6c1b..45fbe0dbc 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -41,12 +41,12 @@ func TestHeaderEvents(t *testing.T) { } }) } - - evtTyp := types.EventNewBlockHeader - evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) + ectx, cancel := context.WithTimeout(ctx, waitForEventTimeout) + defer cancel() + query := types.QueryForEvent(types.EventNewBlockHeader).String() + var evt types.EventDataNewBlockHeader + err := client.WaitForOneEvent(ectx, c, query, &evt) require.Nil(t, err, "%d: %+v", i, err) - _, ok := evt.(types.EventDataNewBlockHeader) - require.True(t, ok, "%d: %#v", i, evt) // TODO: more checks... }) } @@ -141,17 +141,15 @@ func testTxEventsSent(t *testing.T, broadcastMethod string) { } }() - // and wait for confirmation - evt, err := client.WaitForOneEvent(c, types.EventTx, waitForEventTimeout) + // Wait for the transaction we sent to be confirmed. + query := fmt.Sprintf(`tm.event = '%s' AND tx.hash = '%X'`, types.EventTx, types.Tx(tx).Hash()) + var evt types.EventDataTx + err := client.WaitForOneEvent(ctx, c, query, &evt) require.Nil(t, err) - // and make sure it has the proper info - txe, ok := evt.(types.EventDataTx) - require.True(t, ok) - // make sure this is the proper tx - require.EqualValues(t, tx, txe.Tx) - require.True(t, txe.Result.IsOK()) + require.EqualValues(t, tx, evt.Tx) + require.True(t, evt.Result.IsOK()) }) } } diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 9c4d3ad15..1701f6f0f 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -2,10 +2,11 @@ package client import ( "context" - "errors" + "encoding/json" "fmt" "time" + coretypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -52,32 +53,24 @@ func WaitForHeight(c StatusClient, h int64, waiter Waiter) error { return nil } -// WaitForOneEvent subscribes to a websocket event for the given -// event time and returns upon receiving it one time, or -// when the timeout duration has expired. -// -// This handles subscribing and unsubscribing under the hood -func WaitForOneEvent(c SubscriptionClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) { - const subscriber = "helpers" - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - // register for the next event of this type - eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String()) - if err != nil { - return nil, fmt.Errorf("failed to subscribe: %w", err) - } - // make sure to unregister after the test is over - defer func() { - if deferErr := c.UnsubscribeAll(ctx, subscriber); deferErr != nil { - panic(deferErr) +// WaitForOneEvent waits for the first event matching the given query on c, or +// until ctx ends. It reports an error if ctx ends before a matching event is +// received. +func WaitForOneEvent(ctx context.Context, c EventsClient, query string, evt types.TMEventData) error { + for { + rsp, err := c.Events(ctx, &coretypes.RequestEvents{ + Filter: &coretypes.EventFilter{Query: query}, + MaxItems: 1, + WaitTime: 10 * time.Second, // duration doesn't matter, limited by ctx timeout + }) + if err != nil { + return err + } else if len(rsp.Items) == 0 { + continue // continue polling until ctx expires } - }() - - select { - case event := <-eventCh: - return event.Data, nil - case <-ctx.Done(): - return nil, errors.New("timed out waiting for event") + if err := json.Unmarshal(rsp.Items[0].Data, evt); err != nil { + return err + } + return nil } }