mirror of
https://github.com/tendermint/tendermint.git
synced 2026-04-17 06:01:04 +00:00
Merge remote-tracking branch 'origin/wb/abci-prepare-proposal-synchronize' into wb/abci-prepare-proposal-synchronize
This commit is contained in:
@@ -85,6 +85,7 @@ Note the context/background should be written in the present tense.
|
||||
- [ADR-067: Mempool Refactor](./adr-067-mempool-refactor.md)
|
||||
- [ADR-075: RPC Event Subscription Interface](./adr-075-rpc-subscription.md)
|
||||
- [ADR-076: Combine Spec and Tendermint Repositories](./adr-076-combine-spec-repo.md)
|
||||
- [ADR-081: Protocol Buffers Management](./adr-081-protobuf-mgmt.md)
|
||||
|
||||
### Deprecated
|
||||
|
||||
|
||||
201
docs/architecture/adr-081-protobuf-mgmt.md
Normal file
201
docs/architecture/adr-081-protobuf-mgmt.md
Normal file
@@ -0,0 +1,201 @@
|
||||
# ADR 081: Protocol Buffers Management
|
||||
|
||||
## Changelog
|
||||
|
||||
- 2022-02-28: First draft
|
||||
|
||||
## Status
|
||||
|
||||
Accepted
|
||||
|
||||
[Tracking issue](https://github.com/tendermint/tendermint/issues/8121)
|
||||
|
||||
## Context
|
||||
|
||||
At present, we manage the [Protocol Buffers] schema files ("protos") that define
|
||||
our wire-level data formats within the Tendermint repository itself (see the
|
||||
[`proto`](../../proto/) directory). Recently, we have been making use of [Buf],
|
||||
both locally and in CI, in order to generate Go stubs, and lint and check
|
||||
`.proto` files for breaking changes.
|
||||
|
||||
The version of Buf used at the time of this decision was `v1beta1`, and it was
|
||||
discussed in [\#7975] and in weekly calls as to whether we should upgrade to
|
||||
`v1` and harmonize our approach with that used by the Cosmos SDK. The team
|
||||
managing the Cosmos SDK was primarily interested in having our protos versioned
|
||||
and easily accessible from the [Buf] registry.
|
||||
|
||||
The three main sets of stakeholders for the `.proto` files and their needs, as
|
||||
currently understood, are as follows.
|
||||
|
||||
1. Tendermint needs Go code generated from `.proto` files.
|
||||
2. Consumers of Tendermint's `.proto` files, specifically projects that want to
|
||||
interoperate with Tendermint and need to generate code for their own
|
||||
programming language, want to be able to access these files in a reliable and
|
||||
efficient way.
|
||||
3. The Tendermint Core team wants to provide stable interfaces that are as easy
|
||||
as possible to maintain, on which consumers can depend, and to be able to
|
||||
notify those consumers promptly when those interfaces change. To this end, we
|
||||
want to:
|
||||
1. Prevent any breaking changes from being introduced in minor/patch releases
|
||||
of Tendermint. Only major version updates should be able to contain
|
||||
breaking interface changes.
|
||||
2. Prevent generated code from diverging from the Protobuf schema files.
|
||||
|
||||
There was also discussion surrounding the notion of automated documentation
|
||||
generation and hosting, but it is not clear at this time whether this would be
|
||||
that valuable to any of our stakeholders. What will, of course, be valuable at
|
||||
minimum would be better documentation (in comments) of the `.proto` files
|
||||
themselves.
|
||||
|
||||
## Alternative Approaches
|
||||
|
||||
### Meeting stakeholders' needs
|
||||
|
||||
1. Go stub generation from protos. We could use:
|
||||
1. [Buf]. This approach has been rather cumbersome up to this point, and it
|
||||
is not clear what Buf really provides beyond that which `protoc` provides
|
||||
to justify the additional complexity in configuring Buf for stub
|
||||
generation.
|
||||
2. [protoc] - the Protocol Buffers compiler.
|
||||
2. Notification of breaking changes:
|
||||
1. Buf in CI for all pull requests to *release* branches only (and not on
|
||||
`master`).
|
||||
2. Buf in CI on every pull request to every branch (this was the case at the
|
||||
time of this decision, and the team decided that the signal-to-noise ratio
|
||||
for this approach was too low to be of value).
|
||||
3. `.proto` linting:
|
||||
1. Buf in CI on every pull request
|
||||
4. `.proto` formatting:
|
||||
1. [clang-format] locally and a [clang-format GitHub Action] in CI to check
|
||||
that files are formatted properly on every pull request.
|
||||
5. Sharing of `.proto` files in a versioned, reliable manner:
|
||||
1. Consumers could simply clone the Tendermint repository, check out a
|
||||
specific commit, tag or branch and manually copy out all of the `.proto`
|
||||
files they need. This requires no effort from the Tendermint Core team and
|
||||
will continue to be an option for consumers. The drawback of this approach
|
||||
is that it requires manual coding/scripting to implement and is brittle in
|
||||
the face of bigger changes.
|
||||
2. Uploading our `.proto` files to Buf's registry on every release. This is
|
||||
by far the most seamless for consumers of our `.proto` files, but requires
|
||||
the dependency on Buf. This has the additional benefit that the Buf
|
||||
registry will automatically [generate and host
|
||||
documentation][buf-docs-gen] for these protos.
|
||||
3. We could create a process that, upon release, creates a `.zip` file
|
||||
containing our `.proto` files.
|
||||
|
||||
### Popular alternatives to Buf
|
||||
|
||||
[Prototool] was not considered as it appears deprecated, and the ecosystem seems
|
||||
to be converging on Buf at this time.
|
||||
|
||||
### Tooling complexity
|
||||
|
||||
The more tools we have in our build/CI processes, the more complex and fragile
|
||||
repository/CI management becomes, and the longer it takes to onboard new team
|
||||
members. Maintainability is a core concern here.
|
||||
|
||||
### Buf sustainability and costs
|
||||
|
||||
One of the primary considerations regarding the usage of Buf is whether, for
|
||||
example, access to its registry will eventually become a
|
||||
paid-for/subscription-based service and whether this is valuable enough for us
|
||||
and the ecosystem to pay for such a service. At this time, it appears as though
|
||||
Buf will never charge for hosting open source projects' protos.
|
||||
|
||||
Another consideration was Buf's sustainability as a project - what happens when
|
||||
their resources run out? Will there be a strong and broad enough open source
|
||||
community to continue maintaining it?
|
||||
|
||||
### Local Buf usage options
|
||||
|
||||
Local usage of Buf (i.e. not in CI) can be accomplished in two ways:
|
||||
|
||||
1. Installing the relevant tools individually.
|
||||
2. By way of its [Docker image][buf-docker].
|
||||
|
||||
Local installation of Buf requires developers to manually keep their toolchains
|
||||
up-to-date. The Docker option comes with a number of complexities, including
|
||||
how the file system permissions of code generated by a Docker container differ
|
||||
between platforms (e.g. on Linux, Buf-generated code ends up being owned by
|
||||
`root`).
|
||||
|
||||
The trouble with the Docker-based approach is that we make use of the
|
||||
[gogoprotobuf] plugin for `protoc`. Continuing to use the Docker-based approach
|
||||
to using Buf will mean that we will have to continue building our own custom
|
||||
Docker image with embedded gogoprotobuf.
|
||||
|
||||
Along these lines, we could eventually consider coming up with a [Nix]- or
|
||||
[redo]-based approach to developer tooling to ensure tooling consistency across
|
||||
the team and for anyone who wants to be able to contribute to Tendermint.
|
||||
|
||||
## Decision
|
||||
|
||||
1. We will adopt Buf for now for proto generation, linting, breakage checking
|
||||
and its registry (mainly in CI, with optional usage locally).
|
||||
2. Failing CI when checking for breaking changes in `.proto` files will only
|
||||
happen when performing minor/patch releases.
|
||||
3. Local tooling will be favored over Docker-based tooling.
|
||||
|
||||
## Detailed Design
|
||||
|
||||
We currently aim to:
|
||||
|
||||
1. Update to Buf `v1` to facilitate linting, breakage checking and uploading to
|
||||
the Buf registry.
|
||||
2. Configure CI appropriately for proto management:
|
||||
1. Uploading protos to the Buf registry on every release (e.g. the
|
||||
[approach][cosmos-sdk-buf-registry-ci] used by the Cosmos SDK).
|
||||
2. Linting on every pull request (e.g. the
|
||||
[approach][cosmos-sdk-buf-linting-ci] used by the Cosmos SDK). The linter
|
||||
passing should be considered a requirement for accepting PRs.
|
||||
3. Checking for breaking changes in minor/patch version releases and failing
|
||||
CI accordingly - see [\#8003].
|
||||
4. Add [clang-format GitHub Action] to check `.proto` file formatting. Format
|
||||
checking should be considered a requirement for accepting PRs.
|
||||
3. Update the Tendermint [`Makefile`](../../Makefile) to primarily facilitate
|
||||
local Protobuf stub generation, linting, formatting and breaking change
|
||||
checking. More specifically:
|
||||
1. This includes removing the dependency on Docker and introducing the
|
||||
dependency on local toolchain installation. CI-based equivalents, where
|
||||
relevant, will rely on specific GitHub Actions instead of the Makefile.
|
||||
2. Go code generation will rely on `protoc` directly.
|
||||
|
||||
## Consequences
|
||||
|
||||
### Positive
|
||||
|
||||
- We will still offer Go stub generation, proto linting and breakage checking.
|
||||
- Breakage checking will only happen on minor/patch releases to increase the
|
||||
signal-to-noise ratio in CI.
|
||||
- Versioned protos will be made available via Buf's registry upon every release.
|
||||
|
||||
### Negative
|
||||
|
||||
- Developers/contributors will need to install the relevant Protocol
|
||||
Buffers-related tooling (Buf, gogoprotobuf, clang-format) locally in order to
|
||||
build, lint, format and check `.proto` files for breaking changes.
|
||||
|
||||
### Neutral
|
||||
|
||||
## References
|
||||
|
||||
- [Protocol Buffers]
|
||||
- [Buf]
|
||||
- [\#7975]
|
||||
- [protoc] - The Protocol Buffers compiler
|
||||
|
||||
[Protocol Buffers]: https://developers.google.com/protocol-buffers
|
||||
[Buf]: https://buf.build/
|
||||
[\#7975]: https://github.com/tendermint/tendermint/pull/7975
|
||||
[protoc]: https://github.com/protocolbuffers/protobuf
|
||||
[clang-format]: https://clang.llvm.org/docs/ClangFormat.html
|
||||
[clang-format GitHub Action]: https://github.com/marketplace/actions/clang-format-github-action
|
||||
[buf-docker]: https://hub.docker.com/r/bufbuild/buf
|
||||
[cosmos-sdk-buf-registry-ci]: https://github.com/cosmos/cosmos-sdk/blob/e6571906043b6751951a42b6546431b1c38b05bd/.github/workflows/proto-registry.yml
|
||||
[cosmos-sdk-buf-linting-ci]: https://github.com/cosmos/cosmos-sdk/blob/e6571906043b6751951a42b6546431b1c38b05bd/.github/workflows/proto.yml#L15
|
||||
[\#8003]: https://github.com/tendermint/tendermint/issues/8003
|
||||
[Nix]: https://nixos.org/
|
||||
[gogoprotobuf]: https://github.com/gogo/protobuf
|
||||
[Prototool]: https://github.com/uber/prototool
|
||||
[buf-docs-gen]: https://docs.buf.build/bsr/documentation
|
||||
[redo]: https://redo.readthedocs.io/en/latest/
|
||||
@@ -296,267 +296,3 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
assert.Equal(t, prevoteHeight, ev.Height())
|
||||
}
|
||||
}
|
||||
|
||||
// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals).
|
||||
// byzantine validator sends conflicting proposals into A and B,
|
||||
// and prevotes/precommits on both of them.
|
||||
// B sees a commit, A doesn't.
|
||||
// Heal partition and ensure A sees the commit
|
||||
func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
|
||||
// TODO: https://github.com/tendermint/tendermint/issues/6092
|
||||
t.SkipNow()
|
||||
|
||||
// n := 4
|
||||
// logger := consensusLogger().With("test", "byzantine")
|
||||
// app := newCounter
|
||||
|
||||
// states, cleanup := randConsensusState(n, "consensus_byzantine_test", newMockTickerFunc(false), app)
|
||||
// t.Cleanup(cleanup)
|
||||
|
||||
// // give the byzantine validator a normal ticker
|
||||
// ticker := NewTimeoutTicker()
|
||||
// ticker.SetLogger(states[0].logger)
|
||||
// states[0].SetTimeoutTicker(ticker)
|
||||
|
||||
// p2pLogger := logger.With("module", "p2p")
|
||||
|
||||
// blocksSubs := make([]types.Subscription, n)
|
||||
// reactors := make([]p2p.Reactor, n)
|
||||
// for i := 0; i < n; i++ {
|
||||
// // enable txs so we can create different proposals
|
||||
// assertMempool(states[i].txNotifier).EnableTxsAvailable()
|
||||
|
||||
// eventBus := states[i].eventBus
|
||||
// eventBus.SetLogger(logger.With("module", "events", "validator", i))
|
||||
|
||||
// var err error
|
||||
// blocksSubs[i], err = eventBus.Subscribe(ctx, testSubscriber, types.EventQueryNewBlock)
|
||||
// require.NoError(t, err)
|
||||
|
||||
// conR := NewReactor(states[i], true) // so we don't start the consensus states
|
||||
// conR.SetLogger(logger.With("validator", i))
|
||||
// conR.SetEventBus(eventBus)
|
||||
|
||||
// var conRI p2p.Reactor = conR
|
||||
|
||||
// // make first val byzantine
|
||||
// if i == 0 {
|
||||
// conRI = NewByzantineReactor(conR)
|
||||
// }
|
||||
|
||||
// reactors[i] = conRI
|
||||
// err = states[i].blockExec.Store().Save(states[i].state) // for save height 1's validators info
|
||||
// require.NoError(t, err)
|
||||
// }
|
||||
|
||||
// switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch {
|
||||
// sw.SetLogger(p2pLogger.With("validator", i))
|
||||
// sw.AddReactor("CONSENSUS", reactors[i])
|
||||
// return sw
|
||||
// }, func(sws []*p2p.Switch, i, j int) {
|
||||
// // the network starts partitioned with globally active adversary
|
||||
// if i != 0 {
|
||||
// return
|
||||
// }
|
||||
// p2p.Connect2Switches(sws, i, j)
|
||||
// })
|
||||
|
||||
// // make first val byzantine
|
||||
// // NOTE: Now, test validators are MockPV, which by default doesn't
|
||||
// // do any safety checks.
|
||||
// states[0].privValidator.(types.MockPV).DisableChecks()
|
||||
// states[0].decideProposal = func(j int32) func(int64, int32) {
|
||||
// return func(height int64, round int32) {
|
||||
// byzantineDecideProposalFunc(t, height, round, states[j], switches[j])
|
||||
// }
|
||||
// }(int32(0))
|
||||
// // We are setting the prevote function to do nothing because the prevoting
|
||||
// // and precommitting are done alongside the proposal.
|
||||
// states[0].doPrevote = func(height int64, round int32) {}
|
||||
|
||||
// defer func() {
|
||||
// for _, sw := range switches {
|
||||
// err := sw.Stop()
|
||||
// require.NoError(t, err)
|
||||
// }
|
||||
// }()
|
||||
|
||||
// // start the non-byz state machines.
|
||||
// // note these must be started before the byz
|
||||
// for i := 1; i < n; i++ {
|
||||
// cr := reactors[i].(*Reactor)
|
||||
// cr.SwitchToConsensus(cr.conS.GetState(), false)
|
||||
// }
|
||||
|
||||
// // start the byzantine state machine
|
||||
// byzR := reactors[0].(*ByzantineReactor)
|
||||
// s := byzR.reactor.conS.GetState()
|
||||
// byzR.reactor.SwitchToConsensus(s, false)
|
||||
|
||||
// // byz proposer sends one block to peers[0]
|
||||
// // and the other block to peers[1] and peers[2].
|
||||
// // note peers and switches order don't match.
|
||||
// peers := switches[0].Peers().List()
|
||||
|
||||
// // partition A
|
||||
// ind0 := getSwitchIndex(switches, peers[0])
|
||||
|
||||
// // partition B
|
||||
// ind1 := getSwitchIndex(switches, peers[1])
|
||||
// ind2 := getSwitchIndex(switches, peers[2])
|
||||
// p2p.Connect2Switches(switches, ind1, ind2)
|
||||
|
||||
// // wait for someone in the big partition (B) to make a block
|
||||
// <-blocksSubs[ind2].Out()
|
||||
|
||||
// t.Log("A block has been committed. Healing partition")
|
||||
// p2p.Connect2Switches(switches, ind0, ind1)
|
||||
// p2p.Connect2Switches(switches, ind0, ind2)
|
||||
|
||||
// // wait till everyone makes the first new block
|
||||
// // (one of them already has)
|
||||
// wg := new(sync.WaitGroup)
|
||||
// for i := 1; i < N-1; i++ {
|
||||
// wg.Add(1)
|
||||
// go func(j int) {
|
||||
// <-blocksSubs[j].Out()
|
||||
// wg.Done()
|
||||
// }(i)
|
||||
// }
|
||||
|
||||
// done := make(chan struct{})
|
||||
// go func() {
|
||||
// wg.Wait()
|
||||
// close(done)
|
||||
// }()
|
||||
|
||||
// tick := time.NewTicker(time.Second * 10)
|
||||
// select {
|
||||
// case <-done:
|
||||
// case <-tick.C:
|
||||
// for i, reactor := range reactors {
|
||||
// t.Log(fmt.Sprintf("Consensus Reactor %v", i))
|
||||
// t.Log(fmt.Sprintf("%v", reactor))
|
||||
// }
|
||||
// t.Fatalf("Timed out waiting for all validators to commit first block")
|
||||
// }
|
||||
}
|
||||
|
||||
// func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch) {
|
||||
// // byzantine user should create two proposals and try to split the vote.
|
||||
// // Avoid sending on internalMsgQueue and running consensus state.
|
||||
|
||||
// // Create a new proposal block from state/txs from the mempool.
|
||||
// block1, blockParts1 := cs.createProposalBlock()
|
||||
// polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
|
||||
// proposal1 := types.NewProposal(height, round, polRound, propBlockID)
|
||||
// p1 := proposal1.ToProto()
|
||||
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
|
||||
// proposal1.Signature = p1.Signature
|
||||
|
||||
// // some new transactions come in (this ensures that the proposals are different)
|
||||
// deliverTxsRange(cs, 0, 1)
|
||||
|
||||
// // Create a new proposal block from state/txs from the mempool.
|
||||
// block2, blockParts2 := cs.createProposalBlock()
|
||||
// polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
|
||||
// proposal2 := types.NewProposal(height, round, polRound, propBlockID)
|
||||
// p2 := proposal2.ToProto()
|
||||
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
|
||||
// proposal2.Signature = p2.Signature
|
||||
|
||||
// block1Hash := block1.Hash()
|
||||
// block2Hash := block2.Hash()
|
||||
|
||||
// // broadcast conflicting proposals/block parts to peers
|
||||
// peers := sw.Peers().List()
|
||||
// t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
|
||||
// for i, peer := range peers {
|
||||
// if i < len(peers)/2 {
|
||||
// go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
|
||||
// } else {
|
||||
// go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// func sendProposalAndParts(
|
||||
// height int64,
|
||||
// round int32,
|
||||
// cs *State,
|
||||
// peer p2p.Peer,
|
||||
// proposal *types.Proposal,
|
||||
// blockHash []byte,
|
||||
// parts *types.PartSet,
|
||||
// ) {
|
||||
// // proposal
|
||||
// msg := &ProposalMessage{Proposal: proposal}
|
||||
// peer.Send(DataChannel, MustEncode(msg))
|
||||
|
||||
// // parts
|
||||
// for i := 0; i < int(parts.Total()); i++ {
|
||||
// part := parts.GetPart(i)
|
||||
// msg := &BlockPartMessage{
|
||||
// Height: height, // This tells peer that this part applies to us.
|
||||
// Round: round, // This tells peer that this part applies to us.
|
||||
// Part: part,
|
||||
// }
|
||||
// peer.Send(DataChannel, MustEncode(msg))
|
||||
// }
|
||||
|
||||
// // votes
|
||||
// cs.mtx.Lock()
|
||||
// prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
|
||||
// precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
|
||||
// cs.mtx.Unlock()
|
||||
|
||||
// peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
|
||||
// peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
|
||||
// }
|
||||
|
||||
// type ByzantineReactor struct {
|
||||
// service.Service
|
||||
// reactor *Reactor
|
||||
// }
|
||||
|
||||
// func NewByzantineReactor(conR *Reactor) *ByzantineReactor {
|
||||
// return &ByzantineReactor{
|
||||
// Service: conR,
|
||||
// reactor: conR,
|
||||
// }
|
||||
// }
|
||||
|
||||
// func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
|
||||
// func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
|
||||
|
||||
// func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
|
||||
// if !br.reactor.IsRunning() {
|
||||
// return
|
||||
// }
|
||||
|
||||
// // Create peerState for peer
|
||||
// peerState := NewPeerState(peer).SetLogger(br.reactor.logger)
|
||||
// peer.Set(types.PeerStateKey, peerState)
|
||||
|
||||
// // Send our state to peer.
|
||||
// // If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
|
||||
// if !br.reactor.waitSync {
|
||||
// br.reactor.sendNewRoundStepMessage(peer)
|
||||
// }
|
||||
// }
|
||||
|
||||
// func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
// br.reactor.RemovePeer(peer, reason)
|
||||
// }
|
||||
|
||||
// func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
// br.reactor.Receive(chID, peer, msgBytes)
|
||||
// }
|
||||
|
||||
// func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
|
||||
|
||||
@@ -997,13 +997,11 @@ func (cs *State) handleMsg(ctx context.Context, mi msgInfo) {
|
||||
}
|
||||
}
|
||||
|
||||
// if err == ErrAddingVote {
|
||||
// TODO: punish peer
|
||||
// We probably don't want to stop the peer here. The vote does not
|
||||
// necessarily comes from a malicious peer but can be just broadcasted by
|
||||
// a typical peer.
|
||||
// https://github.com/tendermint/tendermint/issues/1281
|
||||
// }
|
||||
|
||||
// NOTE: the vote is broadcast to peers by the reactor listening
|
||||
// for vote events
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
// Create/Append to ./autofile_test
|
||||
af, err := OpenAutoFile("autofile_test")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Stream of writes.
|
||||
@@ -32,7 +32,7 @@ for i := 0; i < 60; i++ {
|
||||
// Close the AutoFile
|
||||
err = af.Close()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
log.Fatal(err)
|
||||
}
|
||||
*/
|
||||
|
||||
@@ -189,13 +189,7 @@ func (af *AutoFile) openFile() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// fileInfo, err := file.Stat()
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if fileInfo.Mode() != autoFilePerms {
|
||||
// return errors.NewErrPermissionsChanged(file.Name(), fileInfo.Mode(), autoFilePerms)
|
||||
// }
|
||||
|
||||
af.file = file
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
stdlog "log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
@@ -19,19 +20,26 @@ const Version = "0.0.1"
|
||||
const readBufferSize = 1024 // 1KB at a time
|
||||
|
||||
// Parse command-line options
|
||||
func parseFlags() (headPath string, chopSize int64, limitSize int64, version bool) {
|
||||
func parseFlags() (headPath string, chopSize int64, limitSize int64, version bool, err error) {
|
||||
var flagSet = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
|
||||
var chopSizeStr, limitSizeStr string
|
||||
flagSet.StringVar(&headPath, "head", "logjack.out", "Destination (head) file.")
|
||||
flagSet.StringVar(&chopSizeStr, "chop", "100M", "Move file if greater than this")
|
||||
flagSet.StringVar(&limitSizeStr, "limit", "10G", "Only keep this much (for each specified file). Remove old files.")
|
||||
flagSet.BoolVar(&version, "version", false, "Version")
|
||||
if err := flagSet.Parse(os.Args[1:]); err != nil {
|
||||
fmt.Printf("err parsing flag: %v\n", err)
|
||||
os.Exit(1)
|
||||
|
||||
if err = flagSet.Parse(os.Args[1:]); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
chopSize, err = parseByteSize(chopSizeStr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
limitSize, err = parseByteSize(limitSizeStr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
chopSize = parseBytesize(chopSizeStr)
|
||||
limitSize = parseBytesize(limitSizeStr)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -41,22 +49,23 @@ func main() {
|
||||
defer func() { fmt.Println("logjack shutting down") }()
|
||||
|
||||
// Read options
|
||||
headPath, chopSize, limitSize, version := parseFlags()
|
||||
headPath, chopSize, limitSize, version, err := parseFlags()
|
||||
if err != nil {
|
||||
stdlog.Fatalf("problem parsing arguments: %q", err.Error())
|
||||
}
|
||||
|
||||
if version {
|
||||
fmt.Printf("logjack version %v\n", Version)
|
||||
return
|
||||
stdlog.Printf("logjack version %s", Version)
|
||||
}
|
||||
|
||||
// Open Group
|
||||
group, err := auto.OpenGroup(ctx, log.NewNopLogger(), headPath, auto.GroupHeadSizeLimit(chopSize), auto.GroupTotalSizeLimit(limitSize))
|
||||
if err != nil {
|
||||
fmt.Printf("logjack couldn't create output file %v\n", headPath)
|
||||
os.Exit(1)
|
||||
stdlog.Fatalf("logjack couldn't create output file %q", headPath)
|
||||
}
|
||||
|
||||
if err = group.Start(ctx); err != nil {
|
||||
fmt.Printf("logjack couldn't start with file %v\n", headPath)
|
||||
os.Exit(1)
|
||||
stdlog.Fatalf("logjack couldn't start with file %q", headPath)
|
||||
}
|
||||
|
||||
// Forever read from stdin and write to AutoFile.
|
||||
@@ -65,25 +74,21 @@ func main() {
|
||||
n, err := os.Stdin.Read(buf)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
os.Exit(0)
|
||||
} else {
|
||||
fmt.Println("logjack errored:", err.Error())
|
||||
os.Exit(1)
|
||||
return
|
||||
}
|
||||
stdlog.Fatalln("logjack errored:", err.Error())
|
||||
}
|
||||
_, err = group.Write(buf[:n])
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "logjack failed write with error %v\n", headPath)
|
||||
os.Exit(1)
|
||||
stdlog.Fatalf("logjack failed write %q with error: %q", headPath, err.Error())
|
||||
}
|
||||
if err := group.FlushAndSync(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "logjack flushsync fail with error %v\n", headPath)
|
||||
os.Exit(1)
|
||||
stdlog.Fatalf("logjack flushsync %q fail with error: %q", headPath, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseBytesize(chopSize string) int64 {
|
||||
func parseByteSize(chopSize string) (int64, error) {
|
||||
// Handle suffix multiplier
|
||||
var multiplier int64 = 1
|
||||
if strings.HasSuffix(chopSize, "T") {
|
||||
@@ -106,8 +111,8 @@ func parseBytesize(chopSize string) int64 {
|
||||
// Parse the numeric part
|
||||
chopSizeInt, err := strconv.Atoi(chopSize)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return int64(chopSizeInt) * multiplier
|
||||
return int64(chopSizeInt) * multiplier, nil
|
||||
}
|
||||
|
||||
@@ -318,7 +318,8 @@ func (g *Group) checkTotalSizeLimit(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// rotateFile causes group to close the current head and assign it some index.
|
||||
// rotateFile causes group to close the current head and assign it
|
||||
// some index. Panics if it encounters an error.
|
||||
func (g *Group) rotateFile(ctx context.Context) {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
|
||||
@@ -35,35 +35,12 @@ func genPrivKeys(n int) privKeys {
|
||||
return res
|
||||
}
|
||||
|
||||
// // Change replaces the key at index i.
|
||||
// func (pkz privKeys) Change(i int) privKeys {
|
||||
// res := make(privKeys, len(pkz))
|
||||
// copy(res, pkz)
|
||||
// res[i] = ed25519.GenPrivKey()
|
||||
// return res
|
||||
// }
|
||||
|
||||
// Extend adds n more keys (to remove, just take a slice).
|
||||
func (pkz privKeys) Extend(n int) privKeys {
|
||||
extra := genPrivKeys(n)
|
||||
return append(pkz, extra...)
|
||||
}
|
||||
|
||||
// // GenSecpPrivKeys produces an array of secp256k1 private keys to generate commits.
|
||||
// func GenSecpPrivKeys(n int) privKeys {
|
||||
// res := make(privKeys, n)
|
||||
// for i := range res {
|
||||
// res[i] = secp256k1.GenPrivKey()
|
||||
// }
|
||||
// return res
|
||||
// }
|
||||
|
||||
// // ExtendSecp adds n more secp256k1 keys (to remove, just take a slice).
|
||||
// func (pkz privKeys) ExtendSecp(n int) privKeys {
|
||||
// extra := GenSecpPrivKeys(n)
|
||||
// return append(pkz, extra...)
|
||||
// }
|
||||
|
||||
// ToValidators produces a valset from the set of keys.
|
||||
// The first key has weight `init` and it increases by `inc` every step
|
||||
// so we can have all the same weight, or a simple linear distribution
|
||||
|
||||
@@ -344,10 +344,6 @@ func (c *WSClient) writeRoutine(ctx context.Context) {
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
c.conn.Close()
|
||||
// err != nil {
|
||||
// ignore error; it will trigger in tests
|
||||
// likely because it's closing an already closed connection
|
||||
// }
|
||||
c.wg.Done()
|
||||
}()
|
||||
|
||||
@@ -430,14 +426,7 @@ func (c *WSClient) readRoutine(ctx context.Context) {
|
||||
// ID. According to the spec, they should be notifications (requests
|
||||
// without IDs).
|
||||
// https://github.com/tendermint/tendermint/issues/2949
|
||||
// c.mtx.Lock()
|
||||
// if _, ok := c.sentIDs[response.ID.(types.JSONRPCIntID)]; !ok {
|
||||
// c.Logger.Error("unsolicited response ID", "id", response.ID, "expected", c.sentIDs)
|
||||
// c.mtx.Unlock()
|
||||
// continue
|
||||
// }
|
||||
// delete(c.sentIDs, response.ID.(types.JSONRPCIntID))
|
||||
// c.mtx.Unlock()
|
||||
//
|
||||
// Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking
|
||||
// c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
|
||||
// both readRoutine and writeRoutine
|
||||
|
||||
Reference in New Issue
Block a user