From 544a38da7bc63b244c3e96b64d1f5705b3bcb340 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Sat, 26 Nov 2022 06:17:56 +0800 Subject: [PATCH] Add command-line tool to manually subscribe to an event stream. (#8015) (#9718) This tool is an aid to debugging, and demonstrates the API of the eventstream helper package. It subscribes to the event stream of a running node with the ADR 075 event log enabled, and writes matching events to stdout as JSON. Co-authored-by: M. J. Fromberger --- scripts/estream/estream.go | 81 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 scripts/estream/estream.go diff --git a/scripts/estream/estream.go b/scripts/estream/estream.go new file mode 100644 index 000000000..cacb73761 --- /dev/null +++ b/scripts/estream/estream.go @@ -0,0 +1,81 @@ +// Program estream is a manual testing tool for polling the event stream +// of a running Tendermint consensus node. +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "os/signal" + "path/filepath" + + "github.com/tendermint/tendermint/rpc/client/eventstream" + rpcclient "github.com/tendermint/tendermint/rpc/client/http" + coretypes "github.com/tendermint/tendermint/rpc/core/types" +) + +var ( + query = flag.String("query", "", "Filter query") + batchSize = flag.Int("batch", 0, "Batch size") + resumeFrom = flag.String("resume", "", "Resume cursor") + numItems = flag.Int("count", 0, "Number of items to read (0 to stream)") + waitTime = flag.Duration("poll", 0, "Long poll interval") + rpcAddr = flag.String("addr", "http://localhost:26657", "RPC service address") +) + +func init() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, `Usage: %[1]s [options] + +Connect to the Tendermint node whose RPC service is at -addr, and poll for events +matching the specified -query. If no query is given, all events are fetched. +The resulting event data are written to stdout as JSON. + +Use -resume to pick up polling from a previously-reported event cursor. +Use -count to stop polling after a certain number of events has been reported. +Use -batch to override the default request batch size. +Use -poll to override the default long-polling interval. + +Options: +`, filepath.Base(os.Args[0])) + flag.PrintDefaults() + } +} + +func main() { + flag.Parse() + + cli, err := rpcclient.New(*rpcAddr, "/websocket") + if err != nil { + log.Fatalf("RPC client: %v", err) + } + stream := eventstream.New(cli, *query, &eventstream.StreamOptions{ + BatchSize: *batchSize, + ResumeFrom: *resumeFrom, + WaitTime: *waitTime, + }) + + // Shut down cleanly on SIGINT. Don't attempt clean shutdown for other + // fatal signals. + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + var nr int + if err := stream.Run(ctx, func(itm *coretypes.EventItem) error { + nr++ + bits, err := json.Marshal(itm) + if err != nil { + return err + } + fmt.Println(string(bits)) + if *numItems > 0 && nr >= *numItems { + return eventstream.ErrStopRunning + } + return nil + }); err != nil { + log.Fatalf("Stream failed: %v", err) + } +}