From a6be390fd54200138c4e655067c80de1a610876c Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Sun, 21 Mar 2021 21:23:47 +0000 Subject: [PATCH] Handshake in tests, correct ENR updates, etc. (#1578) * Remove Blockchain dependency from forkID, fix ENR update * Fix handshake_test * Remove db access from handshake * Undo * Use StagedSync in test handlers * Compile fix * Debugging * dependency fixes * More info * Print test name * Increase timeout * Disable checkpoint test * Optimise RW message pipe * Fix test * Print handshake errors * See where the pipe is closing * Remove checkpoints * Remove printouts * Revert "Fix test" This reverts commit d154e07b5f97f0b54485b50d572fb27c6c61af4b. * Revert "Optimise RW message pipe" This reverts commit 6936111a7ffdf166a19c6cf6fa9c8de725b449e0. * Revert "Increase timeout" This reverts commit 9dc0e234bbb80a1ff5146788a308fb7d76ce69de. * Revert "See where the pipe is closing" This reverts commit 3cf22afc43f2b2a254b63b86a0e24b58f84bdc2b. * Remove printing * Relax peerEventCh Co-authored-by: Alexey Sharp --- core/forkid/forkid.go | 40 ++----------------- core/headerchain.go | 3 +- eth/backend.go | 23 +++++++---- eth/handler.go | 56 +++++++++++++++----------- eth/handler_eth.go | 2 +- eth/handler_eth_test.go | 62 +++++++++++++++++++---------- eth/handler_test.go | 30 ++++++++------ eth/protocols/eth/discovery.go | 31 +++++---------- eth/protocols/eth/handler.go | 4 +- eth/protocols/eth/handler_test.go | 24 ++++++----- eth/protocols/eth/handshake_test.go | 4 +- eth/stagedsync/all_stages.go | 2 + eth/sync.go | 4 +- 13 files changed, 145 insertions(+), 140 deletions(-) diff --git a/core/forkid/forkid.go b/core/forkid/forkid.go index 38d13b262..6690e720f 100644 --- a/core/forkid/forkid.go +++ b/core/forkid/forkid.go @@ -28,7 +28,6 @@ import ( "strings" "github.com/ledgerwatch/turbo-geth/common" - "github.com/ledgerwatch/turbo-geth/core/types" "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/params" ) @@ -45,18 +44,6 @@ var ( ErrLocalIncompatibleOrStale = errors.New("local incompatible or needs update") ) -// Blockchain defines all necessary method to build a forkID. -type Blockchain interface { - // Config retrieves the chain's fork configuration. - Config() *params.ChainConfig - - // Genesis retrieves the chain's genesis block. - Genesis() *types.Block - - // CurrentHeader retrieves the current head header of the canonical chain. - CurrentHeader() *types.Header -} - // ID is a fork identifier as defined by EIP-2124. type ID struct { Hash [4]byte // CRC32 checksum of the genesis block and passed fork block numbers @@ -85,15 +72,6 @@ func NewID(config *params.ChainConfig, genesis common.Hash, head uint64) ID { return ID{Hash: checksumToBytes(hash), Next: next} } -// NewIDWithChain calculates the Ethereum fork ID from an existing chain instance. -func NewIDWithChain(chain Blockchain) ID { - return NewID( - chain.Config(), - chain.Genesis().Hash(), - chain.CurrentHeader().Number.Uint64(), - ) -} - func NewIDFromForks(forks []uint64, genesis common.Hash) ID { // Calculate the starting checksum from the genesis hash hash := crc32.ChecksumIEEE(genesis[:]) @@ -105,26 +83,14 @@ func NewIDFromForks(forks []uint64, genesis common.Hash) ID { return ID{Hash: checksumToBytes(hash), Next: 0} } -// NewFilter creates a filter that returns if a fork ID should be rejected or not +// NewFilter creates a filter that returns if a fork ID should be rejected or notI // based on the local chain's status. -func NewFilter(chain Blockchain) Filter { - return NewFilterAutofork( - chain.Config(), - chain.Genesis().Hash(), - chain.CurrentHeader().Number.Uint64(), - ) -} - -// NewFilterAutofork creates a filter that returns if a fork ID should be rejected or notI -// based on the local chain's status. -func NewFilterAutofork(config *params.ChainConfig, genesis common.Hash, head uint64) Filter { +func NewFilter(config *params.ChainConfig, genesis common.Hash, head func() uint64) Filter { forks := GatherForks(config) return newFilter( forks, genesis, - func() uint64 { - return head - }, + head, ) } diff --git a/core/headerchain.go b/core/headerchain.go index e5e77016d..37631ba28 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -531,8 +531,7 @@ func (hc *HeaderChain) GetCanonicalHash(number uint64) common.Hash { return h } -// CurrentHeader retrieves the current head header of the canonical chain. The -// header is retrieved from the HeaderChain's internal cache. +// CurrentHeader retrieves the current head header of the canonical chain. func (hc *HeaderChain) CurrentHeader() *types.Header { headHash := rawdb.ReadHeadHeaderHash(hc.chainDb) headNumber := rawdb.ReadHeaderNumber(hc.chainDb, headHash) diff --git a/eth/backend.go b/eth/backend.go index b45ed4b15..cfa4e9da5 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -52,6 +52,7 @@ import ( "github.com/ledgerwatch/turbo-geth/eth/gasprice" "github.com/ledgerwatch/turbo-geth/eth/protocols/eth" "github.com/ledgerwatch/turbo-geth/eth/stagedsync" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/ethdb/remote/remotedbserver" "github.com/ledgerwatch/turbo-geth/event" @@ -110,7 +111,10 @@ type Ethereum struct { torrentClient *bittorrent.Client - lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase) + lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase) + events *remotedbserver.Events + chainConfig *params.ChainConfig + genesisHash common.Hash } // New creates a new Ethereum object (including the @@ -291,6 +295,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { bloomRequests: make(chan chan *bloombits.Retrieval), p2pServer: stack.Server(), torrentClient: torrentClient, + chainConfig: chainConfig, + genesisHash: genesisHash, } eth.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice) @@ -364,14 +370,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { stagedSync := config.StagedSync // setting notifier to support streaming events to rpc daemon - remoteEvents := remotedbserver.NewEvents() + eth.events = remotedbserver.NewEvents() if stagedSync == nil { // if there is not stagedsync, we create one with the custom notifier - stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{Notifier: remoteEvents}) + stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{Notifier: eth.events}) } else { // otherwise we add one if needed if stagedSync.Notifier == nil { - stagedSync.Notifier = remoteEvents + stagedSync.Notifier = eth.events } } @@ -407,12 +413,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } - eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, &creds, remoteEvents) + eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, &creds, eth.events) if err != nil { return nil, err } } else { - eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, nil, remoteEvents) + eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, nil, eth.events) if err != nil { return nil, err } @@ -768,14 +774,15 @@ func (s *Ethereum) ArchiveMode() bool { return !s.config.Pruning } // Protocols returns all the currently configured // network protocols to start. func (s *Ethereum) Protocols() []p2p.Protocol { - protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates) + headHeight, _ := stages.GetStageProgress(s.chainDb, stages.Finish) + protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates, s.chainConfig, s.genesisHash, headHeight) return protos } // Start implements node.Lifecycle, starting all internal goroutines needed by the // Ethereum protocol implementation. func (s *Ethereum) Start() error { - eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode()) + eth.StartENRUpdater(s.chainConfig, s.genesisHash, s.events, s.p2pServer.LocalNode()) // Figure out a max peers count based on the server limits maxPeers := s.p2pServer.MaxPeers diff --git a/eth/handler.go b/eth/handler.go index cb1fdb53d..f3a3e9287 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,6 +18,7 @@ package eth import ( "errors" + "fmt" "math" "math/big" "sync" @@ -28,11 +29,13 @@ import ( "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/core" "github.com/ledgerwatch/turbo-geth/core/forkid" + "github.com/ledgerwatch/turbo-geth/core/rawdb" "github.com/ledgerwatch/turbo-geth/core/types" "github.com/ledgerwatch/turbo-geth/eth/downloader" "github.com/ledgerwatch/turbo-geth/eth/fetcher" "github.com/ledgerwatch/turbo-geth/eth/protocols/eth" "github.com/ledgerwatch/turbo-geth/eth/stagedsync" + "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/event" "github.com/ledgerwatch/turbo-geth/log" @@ -137,22 +140,25 @@ func newHandler(config *handlerConfig) (*handler, error) { //nolint:unparam config.EventMux = new(event.TypeMux) // Nicety initialization for tests } h := &handler{ - networkID: config.Network, - forkFilter: forkid.NewFilterAutofork(config.Chain.Config(), config.Chain.Genesis().Hash(), config.Chain.CurrentHeader().Number.Uint64()), - eventMux: config.EventMux, - database: config.Database, - txpool: config.TxPool, - chain: config.Chain, - peers: newPeerSet(), - whitelist: config.Whitelist, - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + networkID: config.Network, + eventMux: config.EventMux, + database: config.Database, + txpool: config.TxPool, + chain: config.Chain, + peers: newPeerSet(), + whitelist: config.Whitelist, + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), } - // If we have trusted checkpoints, enforce them on the chain - if config.Checkpoint != nil { - h.checkpointNumber = (config.Checkpoint.SectionIndex+1)*params.CHTFrequency - 1 - h.checkpointHash = config.Checkpoint.SectionHead + if headHeight, err := stages.GetStageProgress(config.Database, stages.Finish); err == nil { + h.currentHeight = headHeight + } else { + return nil, fmt.Errorf("could not get Finish stage progress: %v", err) } + heighter := func() uint64 { + return atomic.LoadUint64(&h.currentHeight) + } + h.forkFilter = forkid.NewFilter(config.Chain.Config(), config.Chain.Genesis().Hash(), heighter) // Construct the downloader (long sync) and its backing state bloom if fast // sync is requested. The downloader is responsible for deallocating the state // bloom when it's done. @@ -168,9 +174,6 @@ func newHandler(config *handlerConfig) (*handler, error) { //nolint:unparam validator := func(header *types.Header) error { return h.chain.Engine().VerifyHeader(h.chain, header, true) } - heighter := func() uint64 { - return atomic.LoadUint64(&h.currentHeight) - } inserter := func(blocks types.Blocks) (int, error) { if err == nil { atomic.StoreUint32(&h.acceptTxs, 1) // Mark initial sync done on any fetcher import @@ -217,7 +220,7 @@ func (h *handler) SetStagedSync(stagedSync *stagedsync.StagedSync) { // various subsistems and starts handling messages. func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // TODO(karalabe): Not sure why this is needed - if !h.chainSync.handlePeerEvent(peer) { + if !h.chainSync.handlePeerEvent() { return p2p.DiscQuitting } h.peerWG.Add(1) @@ -226,12 +229,17 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // Execute the Ethereum handshake var ( genesis = h.chain.Genesis() - head = h.chain.CurrentHeader() - hash = head.Hash() - number = head.Number.Uint64() - td = h.chain.GetTd(hash, number) + number = atomic.LoadUint64(&h.currentHeight) ) - forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64()) + hash, err := rawdb.ReadCanonicalHash(h.database, number) + if err != nil { + return fmt.Errorf("reading canonical hash for %d: %v", number, err) + } + td, err1 := rawdb.ReadTd(h.database, hash, number) + if err1 != nil { + return fmt.Errorf("reading td for %d %x: %v", number, hash, err1) + } + forkID := forkid.NewID(h.chain.Config(), genesis.Hash(), number) if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil { peer.Log().Debug("Ethereum handshake failed", "err", err) return err @@ -261,7 +269,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Error("Failed to register peer in eth syncer", "err", err) return err } - h.chainSync.handlePeerEvent(peer) + h.chainSync.handlePeerEvent() // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. diff --git a/eth/handler_eth.go b/eth/handler_eth.go index ed5c60cbc..7456f0b23 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -210,7 +210,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td // Update the peer's total difficulty if better than the previous if _, headNumber := peer.Head(); block.NumberU64() > headNumber { peer.SetHead(trueHead, block.NumberU64()) - h.chainSync.handlePeerEvent(peer) + h.chainSync.handlePeerEvent() } return nil } diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 813f8134c..44565ad0e 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -28,6 +28,7 @@ import ( "github.com/ledgerwatch/turbo-geth/consensus/ethash" "github.com/ledgerwatch/turbo-geth/core" "github.com/ledgerwatch/turbo-geth/core/forkid" + "github.com/ledgerwatch/turbo-geth/core/rawdb" "github.com/ledgerwatch/turbo-geth/core/types" "github.com/ledgerwatch/turbo-geth/core/vm" "github.com/ledgerwatch/turbo-geth/eth/downloader" @@ -116,7 +117,7 @@ func testForkIDSplit(t *testing.T, protocol uint) { Chain: chainNoFork, TxPool: newTestTxPool(), Network: 1, - Sync: downloader.FullSync, + Sync: downloader.StagedSync, BloomCache: 1, }) ethProFork, _ = newHandler(&handlerConfig{ @@ -124,7 +125,7 @@ func testForkIDSplit(t *testing.T, protocol uint) { Chain: chainProFork, TxPool: newTestTxPool(), Network: 1, - Sync: downloader.FullSync, + Sync: downloader.StagedSync, BloomCache: 1, }) ) @@ -170,9 +171,11 @@ func testForkIDSplit(t *testing.T, protocol uint) { if _, err := stagedsync.InsertBlocksInStages(dbNoFork, ethdb.DefaultStorageMode, configNoFork, &vm.Config{}, ethash.NewFaker(), blocksNoFork[:1], true /* checkRoot */); err != nil { t.Fatal(err) } + atomic.StoreUint64(ðNoFork.currentHeight, 1) if _, err := stagedsync.InsertBlocksInStages(dbProFork, ethdb.DefaultStorageMode, configProFork, &vm.Config{}, ethash.NewFaker(), blocksProFork[:1], true /* checkRoot */); err != nil { t.Fatal(err) } + atomic.StoreUint64(ðProFork.currentHeight, 1) p2pNoFork, p2pProFork = p2p.MsgPipe() defer p2pNoFork.Close() @@ -205,9 +208,11 @@ func testForkIDSplit(t *testing.T, protocol uint) { if _, err := stagedsync.InsertBlocksInStages(dbNoFork, ethdb.DefaultStorageMode, configNoFork, &vm.Config{}, ethash.NewFaker(), blocksNoFork[1:2], true /* checkRoot */); err != nil { t.Fatal(err) } + atomic.StoreUint64(ðNoFork.currentHeight, 2) if _, err := stagedsync.InsertBlocksInStages(dbProFork, ethdb.DefaultStorageMode, configProFork, &vm.Config{}, ethash.NewFaker(), blocksProFork[1:2], true /* checkRoot */); err != nil { t.Fatal(err) } + atomic.StoreUint64(ðProFork.currentHeight, 2) p2pNoFork, p2pProFork = p2p.MsgPipe() defer p2pNoFork.Close() @@ -274,11 +279,14 @@ func testRecvTransactions(t *testing.T, protocol uint) { // Run the handshake locally to avoid spinning up a source handler var ( genesis = handler.chain.Genesis() - head = handler.chain.CurrentBlock() - td = handler.chain.GetTd(head.Hash(), head.NumberU64()) + head = handler.headBlock ) - if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil { - t.Fatalf("failed to run protocol handshake: %s", err) + td, err := rawdb.ReadTd(handler.db, head.Hash(), head.NumberU64()) + if err != nil { + t.Fatal(err) + } + if err = src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewID(handler.chain.Config(), genesis.Hash(), head.NumberU64()), forkid.NewFilter(handler.chain.Config(), genesis.Hash(), func() uint64 { return head.NumberU64() })); err != nil { + t.Fatalf("failed to run protocol handshake: %v", err) } // Send the transaction to the sink and verify that it's added to the tx pool tx := types.NewTransaction(0, common.Address{}, uint256.NewInt(), 100000, uint256.NewInt(), nil) @@ -335,11 +343,14 @@ func testSendTransactions(t *testing.T, protocol uint) { // Run the handshake locally to avoid spinning up a source handler var ( genesis = handler.chain.Genesis() - head = handler.chain.CurrentBlock() - td = handler.chain.GetTd(head.Hash(), head.NumberU64()) + head = handler.headBlock ) - if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil { - t.Fatalf("failed to run protocol handshake") + td, err := rawdb.ReadTd(handler.db, head.Hash(), head.NumberU64()) + if err != nil { + t.Fatal(err) + } + if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewID(handler.chain.Config(), genesis.Hash(), head.NumberU64()), forkid.NewFilter(handler.chain.Config(), genesis.Hash(), func() uint64 { return head.NumberU64() })); err != nil { + t.Fatalf("failed to run protocol handshake: %v", err) } // After the handshake completes, the source handler should stream the sink // the transactions, subscribe to all inbound network events @@ -473,6 +484,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) { // challenge to validate each other's chains. Hash mismatches, or missing ones // during a fast sync should lead to the peer getting dropped. func TestCheckpointChallenge(t *testing.T) { + t.Skip("Not relevant for Turbo-Geth") tests := []struct { syncmode downloader.SyncMode checkpoint bool @@ -549,11 +561,14 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo // Run the handshake locally to avoid spinning up a remote handler var ( genesis = handler.chain.Genesis() - head = handler.chain.CurrentBlock() - td = handler.chain.GetTd(head.Hash(), head.NumberU64()) + head = handler.headBlock ) - if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil { - t.Fatalf("failed to run protocol handshake") + td, err := rawdb.ReadTd(handler.db, head.Hash(), head.NumberU64()) + if err != nil { + t.Fatal(err) + } + if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewID(handler.chain.Config(), genesis.Hash(), head.NumberU64()), forkid.NewFilter(handler.chain.Config(), genesis.Hash(), func() uint64 { return head.NumberU64() })); err != nil { + t.Fatalf("failed to run protocol handshake: %v", err) } // Connect a new peer and check that we receive the checkpoint challenge if checkpoint { @@ -618,8 +633,12 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { // Interconnect all the sink handlers with the source handler var ( genesis = source.chain.Genesis() - td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64()) + head = source.headBlock ) + td, err := rawdb.ReadTd(source.db, head.Hash(), head.NumberU64()) + if err != nil { + t.Fatal(err) + } for i, sink := range sinks { sink := sink // Closure for gorotuine below @@ -636,7 +655,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error { return eth.Handle((*ethHandler)(source.handler), peer) }) - if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil { + if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewID(source.chain.Config(), genesis.Hash(), head.NumberU64()), forkid.NewFilter(source.chain.Config(), genesis.Hash(), func() uint64 { return head.NumberU64() })); err != nil { t.Fatalf("failed to run protocol handshake") } //nolint:errcheck @@ -711,9 +730,13 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) { // Run the handshake locally to avoid spinning up a sink handler var ( genesis = source.chain.Genesis() - td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64()) + head = source.headBlock ) - if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil { + td, err := rawdb.ReadTd(source.db, head.Hash(), head.NumberU64()) + if err != nil { + t.Fatal(err) + } + if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewID(source.chain.Config(), genesis.Hash(), head.NumberU64()), forkid.NewFilter(source.chain.Config(), genesis.Hash(), func() uint64 { return head.NumberU64() })); err != nil { t.Fatalf("failed to run protocol handshake") } // After the handshake completes, the source handler should stream the sink @@ -727,9 +750,6 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) { //nolint:errcheck go eth.Handle(backend, sink) - // Create various combinations of malformed blocks - head := source.chain.CurrentBlock() - malformedUncles := head.Header() malformedUncles.UncleHash[0]++ malformedTransactions := head.Header() diff --git a/eth/handler_test.go b/eth/handler_test.go index 0cd1fdd20..6f0c81e9f 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -116,10 +116,11 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs // preinitialized with some sane testing defaults and the transaction pool mocked // out. type testHandler struct { - db ethdb.Database - chain *core.BlockChain - txpool *testTxPool - handler *handler + db ethdb.Database + chain *core.BlockChain + txpool *testTxPool + handler *handler + headBlock *types.Block } // newTestHandler creates a new handler for testing purposes with no blocks. @@ -139,9 +140,13 @@ func newTestHandlerWithBlocks(blocks int) *testHandler { chain, _ := core.NewBlockChain(db, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, nil) - bs, _, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, blocks, nil, false) - if _, err := stagedsync.InsertBlocksInStages(db, ethdb.DefaultStorageMode, params.TestChainConfig, &vm.Config{}, ethash.NewFaker(), bs, true /* checkRoot */); err != nil { - panic(err) + headBlock := genesis + if blocks > 0 { + bs, _, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, blocks, nil, false) + if _, err := stagedsync.InsertBlocksInStages(db, ethdb.DefaultStorageMode, params.TestChainConfig, &vm.Config{}, ethash.NewFaker(), bs, true /* checkRoot */); err != nil { + panic(err) + } + headBlock = bs[len(bs)-1] } txpool := newTestTxPool() @@ -150,16 +155,17 @@ func newTestHandlerWithBlocks(blocks int) *testHandler { Chain: chain, TxPool: txpool, Network: 1, - Sync: downloader.FastSync, + Sync: downloader.StagedSync, BloomCache: 1, }) handler.Start(1000) return &testHandler{ - db: db, - chain: chain, - txpool: txpool, - handler: handler, + db: db, + chain: chain, + txpool: txpool, + handler: handler, + headBlock: headBlock, } } diff --git a/eth/protocols/eth/discovery.go b/eth/protocols/eth/discovery.go index 2e7615887..84f8cd597 100644 --- a/eth/protocols/eth/discovery.go +++ b/eth/protocols/eth/discovery.go @@ -17,9 +17,12 @@ package eth import ( - "github.com/ledgerwatch/turbo-geth/core" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/core/forkid" + "github.com/ledgerwatch/turbo-geth/core/types" + "github.com/ledgerwatch/turbo-geth/ethdb/remote/remotedbserver" "github.com/ledgerwatch/turbo-geth/p2p/enode" + "github.com/ledgerwatch/turbo-geth/params" "github.com/ledgerwatch/turbo-geth/rlp" ) @@ -38,28 +41,16 @@ func (e enrEntry) ENRKey() string { // StartENRUpdater starts the `eth` ENR updater loop, which listens for chain // head events and updates the requested node record whenever a fork is passed. -func StartENRUpdater(chain *core.BlockChain, ln *enode.LocalNode) { - var newHead = make(chan core.ChainHeadEvent, 10) - sub := chain.SubscribeChainHeadEvent(newHead) - - go func() { - defer sub.Unsubscribe() - for { - select { - case <-newHead: - ln.Set(currentENREntry(chain)) - case <-sub.Err(): - // Would be nice to sync with Stop, but there is no - // good way to do that. - return - } - } - }() +func StartENRUpdater(chainConfig *params.ChainConfig, genesisHash common.Hash, events *remotedbserver.Events, ln *enode.LocalNode) { + events.AddHeaderSubscription(func(h *types.Header) error { + ln.Set(currentENREntry(chainConfig, genesisHash, h.Number.Uint64())) + return nil + }) } // currentENREntry constructs an `eth` ENR entry based on the current state of the chain. -func currentENREntry(chain forkid.Blockchain) *enrEntry { +func currentENREntry(chainConfig *params.ChainConfig, genesisHash common.Hash, headHeight uint64) *enrEntry { return &enrEntry{ - ForkID: forkid.NewID(chain.Config(), chain.Genesis().Hash(), chain.CurrentHeader().Number.Uint64()), + ForkID: forkid.NewID(chainConfig, genesisHash, headHeight), } } diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index ae8e68492..8dfe35931 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -92,7 +92,7 @@ type TxPool interface { } // MakeProtocols constructs the P2P protocol definitions for `eth`. -func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol { +func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator, chainConfig *params.ChainConfig, genesisHash common.Hash, headHeight uint64) []p2p.Protocol { protocols := make([]p2p.Protocol, len(ProtocolVersions)) for i, version := range ProtocolVersions { version := version // Closure @@ -115,7 +115,7 @@ func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2 PeerInfo: func(id enode.ID) interface{} { return backend.PeerInfo(id) }, - Attributes: []enr.Entry{currentENREntry(backend.Chain())}, + Attributes: []enr.Entry{currentENREntry(chainConfig, genesisHash, headHeight)}, DialCandidates: dnsdisc, } } diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index 61548d6ee..c21d8e8bf 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -48,9 +48,10 @@ var ( // purpose is to allow testing the request/reply workflows and wire serialization // in the `eth` protocol without actually doing any data processing. type testBackend struct { - db ethdb.Database - txpool *core.TxPool - chain *core.BlockChain + db ethdb.Database + txpool *core.TxPool + chain *core.BlockChain + headBlock *types.Block } // newTestBackend creates an empty chain and wraps it into a mock backend. @@ -68,9 +69,13 @@ func newTestBackendWithGenerator(blocks int, generator func(int, *core.BlockGen) Alloc: core.GenesisAlloc{testAddr: {Balance: big.NewInt(1000000)}}, }).MustCommit(db) - bs, _, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, blocks, generator, true) - if _, err := stagedsync.InsertBlocksInStages(db, ethdb.DefaultStorageMode, params.TestChainConfig, &vm.Config{}, ethash.NewFaker(), bs, true /* checkRoot */); err != nil { - panic(err) + headBlock := genesis + if blocks > 0 { + bs, _, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, blocks, generator, true) + if _, err := stagedsync.InsertBlocksInStages(db, ethdb.DefaultStorageMode, params.TestChainConfig, &vm.Config{}, ethash.NewFaker(), bs, true /* checkRoot */); err != nil { + panic(err) + } + headBlock = bs[len(bs)-1] } txconfig := core.DefaultTxPoolConfig txconfig.Journal = "" // Don't litter the disk with test journals @@ -78,9 +83,10 @@ func newTestBackendWithGenerator(blocks int, generator func(int, *core.BlockGen) chain, _ := core.NewBlockChain(db, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, nil) txCacher := core.NewTxSenderCacher(1) return &testBackend{ - db: db, - txpool: core.NewTxPool(txconfig, params.TestChainConfig, db, txCacher), - chain: chain, + db: db, + txpool: core.NewTxPool(txconfig, params.TestChainConfig, db, txCacher), + chain: chain, + headBlock: headBlock, } } diff --git a/eth/protocols/eth/handshake_test.go b/eth/protocols/eth/handshake_test.go index 0247df12d..20c6e60a2 100644 --- a/eth/protocols/eth/handshake_test.go +++ b/eth/protocols/eth/handshake_test.go @@ -42,7 +42,7 @@ func testHandshake(t *testing.T, protocol uint) { genesis = backend.chain.Genesis() head = backend.chain.CurrentBlock() td = backend.chain.GetTd(head.Hash(), head.NumberU64()) - forkID = forkid.NewID(backend.chain.Config(), backend.chain.Genesis().Hash(), backend.chain.CurrentHeader().Number.Uint64()) + forkID = forkid.NewID(backend.chain.Config(), backend.chain.Genesis().Hash(), backend.headBlock.NumberU64()) ) tests := []struct { code uint64 @@ -86,7 +86,7 @@ func testHandshake(t *testing.T, protocol uint) { } }() - err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain)) + err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain.Config(), backend.chain.Genesis().Hash(), func() uint64 { return backend.headBlock.NumberU64() })) if err == nil { t.Errorf("test %d: protocol returned nil error, want %q", i, test.want) } else if !errors.Is(err, test.want) { diff --git a/eth/stagedsync/all_stages.go b/eth/stagedsync/all_stages.go index 5d68901cd..1f743064a 100644 --- a/eth/stagedsync/all_stages.go +++ b/eth/stagedsync/all_stages.go @@ -153,6 +153,8 @@ func createStageBuilders(blocks []*types.Block, blockNum uint64, checkRoot bool) } } c.Close() + */ + /* c = world.TX.(ethdb.HasTx).Tx().Cursor(dbutils.CurrentStateBucket) for k, v, err := c.First(); k != nil; k, v, err = c.Next() { if err != nil { diff --git a/eth/sync.go b/eth/sync.go index fa3099d5e..c542974d9 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -175,14 +175,14 @@ type chainSyncOp struct { func newChainSyncer(handler *handler) *chainSyncer { return &chainSyncer{ handler: handler, - peerEventCh: make(chan struct{}), + peerEventCh: make(chan struct{}, 1), } } // handlePeerEvent notifies the syncer about a change in the peer set. // This is called for new peers and every time a peer announces a new // chain head. -func (cs *chainSyncer) handlePeerEvent(peer *eth.Peer) bool { //nolint:unparam +func (cs *chainSyncer) handlePeerEvent() bool { select { case cs.peerEventCh <- struct{}{}: return true