Handshake in tests, correct ENR updates, etc. (#1578)

* Remove Blockchain dependency from forkID, fix ENR update

* Fix handshake_test

* Remove db access from handshake

* Undo

* Use StagedSync in test handlers

* Compile fix

* Debugging

* dependency fixes

* More info

* Print test name

* Increase timeout

* Disable checkpoint test

* Optimise RW message pipe

* Fix test

* Print handshake errors

* See where the pipe is closing

* Remove checkpoints

* Remove printouts

* Revert "Fix test"

This reverts commit d154e07b5f97f0b54485b50d572fb27c6c61af4b.

* Revert "Optimise RW message pipe"

This reverts commit 6936111a7ffdf166a19c6cf6fa9c8de725b449e0.

* Revert "Increase timeout"

This reverts commit 9dc0e234bbb80a1ff5146788a308fb7d76ce69de.

* Revert "See where the pipe is closing"

This reverts commit 3cf22afc43f2b2a254b63b86a0e24b58f84bdc2b.

* Remove printing

* Relax peerEventCh

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2021-03-21 21:23:47 +00:00 committed by GitHub
parent f7c122417c
commit a6be390fd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 145 additions and 140 deletions

View File

@ -28,7 +28,6 @@ import (
"strings"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/params"
)
@ -45,18 +44,6 @@ var (
ErrLocalIncompatibleOrStale = errors.New("local incompatible or needs update")
)
// Blockchain defines all necessary method to build a forkID.
type Blockchain interface {
// Config retrieves the chain's fork configuration.
Config() *params.ChainConfig
// Genesis retrieves the chain's genesis block.
Genesis() *types.Block
// CurrentHeader retrieves the current head header of the canonical chain.
CurrentHeader() *types.Header
}
// ID is a fork identifier as defined by EIP-2124.
type ID struct {
Hash [4]byte // CRC32 checksum of the genesis block and passed fork block numbers
@ -85,15 +72,6 @@ func NewID(config *params.ChainConfig, genesis common.Hash, head uint64) ID {
return ID{Hash: checksumToBytes(hash), Next: next}
}
// NewIDWithChain calculates the Ethereum fork ID from an existing chain instance.
func NewIDWithChain(chain Blockchain) ID {
return NewID(
chain.Config(),
chain.Genesis().Hash(),
chain.CurrentHeader().Number.Uint64(),
)
}
func NewIDFromForks(forks []uint64, genesis common.Hash) ID {
// Calculate the starting checksum from the genesis hash
hash := crc32.ChecksumIEEE(genesis[:])
@ -105,26 +83,14 @@ func NewIDFromForks(forks []uint64, genesis common.Hash) ID {
return ID{Hash: checksumToBytes(hash), Next: 0}
}
// NewFilter creates a filter that returns if a fork ID should be rejected or not
// NewFilter creates a filter that returns if a fork ID should be rejected or notI
// based on the local chain's status.
func NewFilter(chain Blockchain) Filter {
return NewFilterAutofork(
chain.Config(),
chain.Genesis().Hash(),
chain.CurrentHeader().Number.Uint64(),
)
}
// NewFilterAutofork creates a filter that returns if a fork ID should be rejected or notI
// based on the local chain's status.
func NewFilterAutofork(config *params.ChainConfig, genesis common.Hash, head uint64) Filter {
func NewFilter(config *params.ChainConfig, genesis common.Hash, head func() uint64) Filter {
forks := GatherForks(config)
return newFilter(
forks,
genesis,
func() uint64 {
return head
},
head,
)
}

View File

@ -531,8 +531,7 @@ func (hc *HeaderChain) GetCanonicalHash(number uint64) common.Hash {
return h
}
// CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the HeaderChain's internal cache.
// CurrentHeader retrieves the current head header of the canonical chain.
func (hc *HeaderChain) CurrentHeader() *types.Header {
headHash := rawdb.ReadHeadHeaderHash(hc.chainDb)
headNumber := rawdb.ReadHeaderNumber(hc.chainDb, headHash)

View File

@ -52,6 +52,7 @@ import (
"github.com/ledgerwatch/turbo-geth/eth/gasprice"
"github.com/ledgerwatch/turbo-geth/eth/protocols/eth"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/ethdb/remote/remotedbserver"
"github.com/ledgerwatch/turbo-geth/event"
@ -110,7 +111,10 @@ type Ethereum struct {
torrentClient *bittorrent.Client
lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
events *remotedbserver.Events
chainConfig *params.ChainConfig
genesisHash common.Hash
}
// New creates a new Ethereum object (including the
@ -291,6 +295,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
bloomRequests: make(chan chan *bloombits.Retrieval),
p2pServer: stack.Server(),
torrentClient: torrentClient,
chainConfig: chainConfig,
genesisHash: genesisHash,
}
eth.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice)
@ -364,14 +370,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
stagedSync := config.StagedSync
// setting notifier to support streaming events to rpc daemon
remoteEvents := remotedbserver.NewEvents()
eth.events = remotedbserver.NewEvents()
if stagedSync == nil {
// if there is not stagedsync, we create one with the custom notifier
stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{Notifier: remoteEvents})
stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{Notifier: eth.events})
} else {
// otherwise we add one if needed
if stagedSync.Notifier == nil {
stagedSync.Notifier = remoteEvents
stagedSync.Notifier = eth.events
}
}
@ -407,12 +413,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, &creds, remoteEvents)
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, &creds, eth.events)
if err != nil {
return nil, err
}
} else {
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, nil, remoteEvents)
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, nil, eth.events)
if err != nil {
return nil, err
}
@ -768,14 +774,15 @@ func (s *Ethereum) ArchiveMode() bool { return !s.config.Pruning }
// Protocols returns all the currently configured
// network protocols to start.
func (s *Ethereum) Protocols() []p2p.Protocol {
protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates)
headHeight, _ := stages.GetStageProgress(s.chainDb, stages.Finish)
protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates, s.chainConfig, s.genesisHash, headHeight)
return protos
}
// Start implements node.Lifecycle, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start() error {
eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode())
eth.StartENRUpdater(s.chainConfig, s.genesisHash, s.events, s.p2pServer.LocalNode())
// Figure out a max peers count based on the server limits
maxPeers := s.p2pServer.MaxPeers

View File

@ -18,6 +18,7 @@ package eth
import (
"errors"
"fmt"
"math"
"math/big"
"sync"
@ -28,11 +29,13 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/forkid"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/eth/downloader"
"github.com/ledgerwatch/turbo-geth/eth/fetcher"
"github.com/ledgerwatch/turbo-geth/eth/protocols/eth"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/event"
"github.com/ledgerwatch/turbo-geth/log"
@ -137,22 +140,25 @@ func newHandler(config *handlerConfig) (*handler, error) { //nolint:unparam
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilterAutofork(config.Chain.Config(), config.Chain.Genesis().Hash(), config.Chain.CurrentHeader().Number.Uint64()),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
networkID: config.Network,
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
// If we have trusted checkpoints, enforce them on the chain
if config.Checkpoint != nil {
h.checkpointNumber = (config.Checkpoint.SectionIndex+1)*params.CHTFrequency - 1
h.checkpointHash = config.Checkpoint.SectionHead
if headHeight, err := stages.GetStageProgress(config.Database, stages.Finish); err == nil {
h.currentHeight = headHeight
} else {
return nil, fmt.Errorf("could not get Finish stage progress: %v", err)
}
heighter := func() uint64 {
return atomic.LoadUint64(&h.currentHeight)
}
h.forkFilter = forkid.NewFilter(config.Chain.Config(), config.Chain.Genesis().Hash(), heighter)
// Construct the downloader (long sync) and its backing state bloom if fast
// sync is requested. The downloader is responsible for deallocating the state
// bloom when it's done.
@ -168,9 +174,6 @@ func newHandler(config *handlerConfig) (*handler, error) { //nolint:unparam
validator := func(header *types.Header) error {
return h.chain.Engine().VerifyHeader(h.chain, header, true)
}
heighter := func() uint64 {
return atomic.LoadUint64(&h.currentHeight)
}
inserter := func(blocks types.Blocks) (int, error) {
if err == nil {
atomic.StoreUint32(&h.acceptTxs, 1) // Mark initial sync done on any fetcher import
@ -217,7 +220,7 @@ func (h *handler) SetStagedSync(stagedSync *stagedsync.StagedSync) {
// various subsistems and starts handling messages.
func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// TODO(karalabe): Not sure why this is needed
if !h.chainSync.handlePeerEvent(peer) {
if !h.chainSync.handlePeerEvent() {
return p2p.DiscQuitting
}
h.peerWG.Add(1)
@ -226,12 +229,17 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// Execute the Ethereum handshake
var (
genesis = h.chain.Genesis()
head = h.chain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = h.chain.GetTd(hash, number)
number = atomic.LoadUint64(&h.currentHeight)
)
forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64())
hash, err := rawdb.ReadCanonicalHash(h.database, number)
if err != nil {
return fmt.Errorf("reading canonical hash for %d: %v", number, err)
}
td, err1 := rawdb.ReadTd(h.database, hash, number)
if err1 != nil {
return fmt.Errorf("reading td for %d %x: %v", number, hash, err1)
}
forkID := forkid.NewID(h.chain.Config(), genesis.Hash(), number)
if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
peer.Log().Debug("Ethereum handshake failed", "err", err)
return err
@ -261,7 +269,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
peer.Log().Error("Failed to register peer in eth syncer", "err", err)
return err
}
h.chainSync.handlePeerEvent(peer)
h.chainSync.handlePeerEvent()
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.

View File

@ -210,7 +210,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td
// Update the peer's total difficulty if better than the previous
if _, headNumber := peer.Head(); block.NumberU64() > headNumber {
peer.SetHead(trueHead, block.NumberU64())
h.chainSync.handlePeerEvent(peer)
h.chainSync.handlePeerEvent()
}
return nil
}

View File

@ -28,6 +28,7 @@ import (
"github.com/ledgerwatch/turbo-geth/consensus/ethash"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/forkid"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/core/vm"
"github.com/ledgerwatch/turbo-geth/eth/downloader"
@ -116,7 +117,7 @@ func testForkIDSplit(t *testing.T, protocol uint) {
Chain: chainNoFork,
TxPool: newTestTxPool(),
Network: 1,
Sync: downloader.FullSync,
Sync: downloader.StagedSync,
BloomCache: 1,
})
ethProFork, _ = newHandler(&handlerConfig{
@ -124,7 +125,7 @@ func testForkIDSplit(t *testing.T, protocol uint) {
Chain: chainProFork,
TxPool: newTestTxPool(),
Network: 1,
Sync: downloader.FullSync,
Sync: downloader.StagedSync,
BloomCache: 1,
})
)
@ -170,9 +171,11 @@ func testForkIDSplit(t *testing.T, protocol uint) {
if _, err := stagedsync.InsertBlocksInStages(dbNoFork, ethdb.DefaultStorageMode, configNoFork, &vm.Config{}, ethash.NewFaker(), blocksNoFork[:1], true /* checkRoot */); err != nil {
t.Fatal(err)
}
atomic.StoreUint64(&ethNoFork.currentHeight, 1)
if _, err := stagedsync.InsertBlocksInStages(dbProFork, ethdb.DefaultStorageMode, configProFork, &vm.Config{}, ethash.NewFaker(), blocksProFork[:1], true /* checkRoot */); err != nil {
t.Fatal(err)
}
atomic.StoreUint64(&ethProFork.currentHeight, 1)
p2pNoFork, p2pProFork = p2p.MsgPipe()
defer p2pNoFork.Close()
@ -205,9 +208,11 @@ func testForkIDSplit(t *testing.T, protocol uint) {
if _, err := stagedsync.InsertBlocksInStages(dbNoFork, ethdb.DefaultStorageMode, configNoFork, &vm.Config{}, ethash.NewFaker(), blocksNoFork[1:2], true /* checkRoot */); err != nil {
t.Fatal(err)
}
atomic.StoreUint64(&ethNoFork.currentHeight, 2)
if _, err := stagedsync.InsertBlocksInStages(dbProFork, ethdb.DefaultStorageMode, configProFork, &vm.Config{}, ethash.NewFaker(), blocksProFork[1:2], true /* checkRoot */); err != nil {
t.Fatal(err)
}
atomic.StoreUint64(&ethProFork.currentHeight, 2)
p2pNoFork, p2pProFork = p2p.MsgPipe()
defer p2pNoFork.Close()
@ -274,11 +279,14 @@ func testRecvTransactions(t *testing.T, protocol uint) {
// Run the handshake locally to avoid spinning up a source handler
var (
genesis = handler.chain.Genesis()
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
head = handler.headBlock
)
if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
t.Fatalf("failed to run protocol handshake: %s", err)
td, err := rawdb.ReadTd(handler.db, head.Hash(), head.NumberU64())
if err != nil {
t.Fatal(err)
}
if err = src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewID(handler.chain.Config(), genesis.Hash(), head.NumberU64()), forkid.NewFilter(handler.chain.Config(), genesis.Hash(), func() uint64 { return head.NumberU64() })); err != nil {
t.Fatalf("failed to run protocol handshake: %v", err)
}
// Send the transaction to the sink and verify that it's added to the tx pool
tx := types.NewTransaction(0, common.Address{}, uint256.NewInt(), 100000, uint256.NewInt(), nil)
@ -335,11 +343,14 @@ func testSendTransactions(t *testing.T, protocol uint) {
// Run the handshake locally to avoid spinning up a source handler
var (
genesis = handler.chain.Genesis()
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
head = handler.headBlock
)
if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
t.Fatalf("failed to run protocol handshake")
td, err := rawdb.ReadTd(handler.db, head.Hash(), head.NumberU64())
if err != nil {
t.Fatal(err)
}
if err := sink.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewID(handler.chain.Config(), genesis.Hash(), head.NumberU64()), forkid.NewFilter(handler.chain.Config(), genesis.Hash(), func() uint64 { return head.NumberU64() })); err != nil {
t.Fatalf("failed to run protocol handshake: %v", err)
}
// After the handshake completes, the source handler should stream the sink
// the transactions, subscribe to all inbound network events
@ -473,6 +484,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
// challenge to validate each other's chains. Hash mismatches, or missing ones
// during a fast sync should lead to the peer getting dropped.
func TestCheckpointChallenge(t *testing.T) {
t.Skip("Not relevant for Turbo-Geth")
tests := []struct {
syncmode downloader.SyncMode
checkpoint bool
@ -549,11 +561,14 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
// Run the handshake locally to avoid spinning up a remote handler
var (
genesis = handler.chain.Genesis()
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
head = handler.headBlock
)
if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
t.Fatalf("failed to run protocol handshake")
td, err := rawdb.ReadTd(handler.db, head.Hash(), head.NumberU64())
if err != nil {
t.Fatal(err)
}
if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewID(handler.chain.Config(), genesis.Hash(), head.NumberU64()), forkid.NewFilter(handler.chain.Config(), genesis.Hash(), func() uint64 { return head.NumberU64() })); err != nil {
t.Fatalf("failed to run protocol handshake: %v", err)
}
// Connect a new peer and check that we receive the checkpoint challenge
if checkpoint {
@ -618,8 +633,12 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
// Interconnect all the sink handlers with the source handler
var (
genesis = source.chain.Genesis()
td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64())
head = source.headBlock
)
td, err := rawdb.ReadTd(source.db, head.Hash(), head.NumberU64())
if err != nil {
t.Fatal(err)
}
for i, sink := range sinks {
sink := sink // Closure for gorotuine below
@ -636,7 +655,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(source.handler), peer)
})
if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil {
if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewID(source.chain.Config(), genesis.Hash(), head.NumberU64()), forkid.NewFilter(source.chain.Config(), genesis.Hash(), func() uint64 { return head.NumberU64() })); err != nil {
t.Fatalf("failed to run protocol handshake")
}
//nolint:errcheck
@ -711,9 +730,13 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
// Run the handshake locally to avoid spinning up a sink handler
var (
genesis = source.chain.Genesis()
td = source.chain.GetTd(genesis.Hash(), genesis.NumberU64())
head = source.headBlock
)
if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain)); err != nil {
td, err := rawdb.ReadTd(source.db, head.Hash(), head.NumberU64())
if err != nil {
t.Fatal(err)
}
if err := sink.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewID(source.chain.Config(), genesis.Hash(), head.NumberU64()), forkid.NewFilter(source.chain.Config(), genesis.Hash(), func() uint64 { return head.NumberU64() })); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// After the handshake completes, the source handler should stream the sink
@ -727,9 +750,6 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
//nolint:errcheck
go eth.Handle(backend, sink)
// Create various combinations of malformed blocks
head := source.chain.CurrentBlock()
malformedUncles := head.Header()
malformedUncles.UncleHash[0]++
malformedTransactions := head.Header()

View File

@ -116,10 +116,11 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs
// preinitialized with some sane testing defaults and the transaction pool mocked
// out.
type testHandler struct {
db ethdb.Database
chain *core.BlockChain
txpool *testTxPool
handler *handler
db ethdb.Database
chain *core.BlockChain
txpool *testTxPool
handler *handler
headBlock *types.Block
}
// newTestHandler creates a new handler for testing purposes with no blocks.
@ -139,9 +140,13 @@ func newTestHandlerWithBlocks(blocks int) *testHandler {
chain, _ := core.NewBlockChain(db, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, nil)
bs, _, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, blocks, nil, false)
if _, err := stagedsync.InsertBlocksInStages(db, ethdb.DefaultStorageMode, params.TestChainConfig, &vm.Config{}, ethash.NewFaker(), bs, true /* checkRoot */); err != nil {
panic(err)
headBlock := genesis
if blocks > 0 {
bs, _, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, blocks, nil, false)
if _, err := stagedsync.InsertBlocksInStages(db, ethdb.DefaultStorageMode, params.TestChainConfig, &vm.Config{}, ethash.NewFaker(), bs, true /* checkRoot */); err != nil {
panic(err)
}
headBlock = bs[len(bs)-1]
}
txpool := newTestTxPool()
@ -150,16 +155,17 @@ func newTestHandlerWithBlocks(blocks int) *testHandler {
Chain: chain,
TxPool: txpool,
Network: 1,
Sync: downloader.FastSync,
Sync: downloader.StagedSync,
BloomCache: 1,
})
handler.Start(1000)
return &testHandler{
db: db,
chain: chain,
txpool: txpool,
handler: handler,
db: db,
chain: chain,
txpool: txpool,
handler: handler,
headBlock: headBlock,
}
}

View File

@ -17,9 +17,12 @@
package eth
import (
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/forkid"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb/remote/remotedbserver"
"github.com/ledgerwatch/turbo-geth/p2p/enode"
"github.com/ledgerwatch/turbo-geth/params"
"github.com/ledgerwatch/turbo-geth/rlp"
)
@ -38,28 +41,16 @@ func (e enrEntry) ENRKey() string {
// StartENRUpdater starts the `eth` ENR updater loop, which listens for chain
// head events and updates the requested node record whenever a fork is passed.
func StartENRUpdater(chain *core.BlockChain, ln *enode.LocalNode) {
var newHead = make(chan core.ChainHeadEvent, 10)
sub := chain.SubscribeChainHeadEvent(newHead)
go func() {
defer sub.Unsubscribe()
for {
select {
case <-newHead:
ln.Set(currentENREntry(chain))
case <-sub.Err():
// Would be nice to sync with Stop, but there is no
// good way to do that.
return
}
}
}()
func StartENRUpdater(chainConfig *params.ChainConfig, genesisHash common.Hash, events *remotedbserver.Events, ln *enode.LocalNode) {
events.AddHeaderSubscription(func(h *types.Header) error {
ln.Set(currentENREntry(chainConfig, genesisHash, h.Number.Uint64()))
return nil
})
}
// currentENREntry constructs an `eth` ENR entry based on the current state of the chain.
func currentENREntry(chain forkid.Blockchain) *enrEntry {
func currentENREntry(chainConfig *params.ChainConfig, genesisHash common.Hash, headHeight uint64) *enrEntry {
return &enrEntry{
ForkID: forkid.NewID(chain.Config(), chain.Genesis().Hash(), chain.CurrentHeader().Number.Uint64()),
ForkID: forkid.NewID(chainConfig, genesisHash, headHeight),
}
}

View File

@ -92,7 +92,7 @@ type TxPool interface {
}
// MakeProtocols constructs the P2P protocol definitions for `eth`.
func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol {
func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator, chainConfig *params.ChainConfig, genesisHash common.Hash, headHeight uint64) []p2p.Protocol {
protocols := make([]p2p.Protocol, len(ProtocolVersions))
for i, version := range ProtocolVersions {
version := version // Closure
@ -115,7 +115,7 @@ func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2
PeerInfo: func(id enode.ID) interface{} {
return backend.PeerInfo(id)
},
Attributes: []enr.Entry{currentENREntry(backend.Chain())},
Attributes: []enr.Entry{currentENREntry(chainConfig, genesisHash, headHeight)},
DialCandidates: dnsdisc,
}
}

View File

@ -48,9 +48,10 @@ var (
// purpose is to allow testing the request/reply workflows and wire serialization
// in the `eth` protocol without actually doing any data processing.
type testBackend struct {
db ethdb.Database
txpool *core.TxPool
chain *core.BlockChain
db ethdb.Database
txpool *core.TxPool
chain *core.BlockChain
headBlock *types.Block
}
// newTestBackend creates an empty chain and wraps it into a mock backend.
@ -68,9 +69,13 @@ func newTestBackendWithGenerator(blocks int, generator func(int, *core.BlockGen)
Alloc: core.GenesisAlloc{testAddr: {Balance: big.NewInt(1000000)}},
}).MustCommit(db)
bs, _, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, blocks, generator, true)
if _, err := stagedsync.InsertBlocksInStages(db, ethdb.DefaultStorageMode, params.TestChainConfig, &vm.Config{}, ethash.NewFaker(), bs, true /* checkRoot */); err != nil {
panic(err)
headBlock := genesis
if blocks > 0 {
bs, _, _ := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, blocks, generator, true)
if _, err := stagedsync.InsertBlocksInStages(db, ethdb.DefaultStorageMode, params.TestChainConfig, &vm.Config{}, ethash.NewFaker(), bs, true /* checkRoot */); err != nil {
panic(err)
}
headBlock = bs[len(bs)-1]
}
txconfig := core.DefaultTxPoolConfig
txconfig.Journal = "" // Don't litter the disk with test journals
@ -78,9 +83,10 @@ func newTestBackendWithGenerator(blocks int, generator func(int, *core.BlockGen)
chain, _ := core.NewBlockChain(db, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, nil)
txCacher := core.NewTxSenderCacher(1)
return &testBackend{
db: db,
txpool: core.NewTxPool(txconfig, params.TestChainConfig, db, txCacher),
chain: chain,
db: db,
txpool: core.NewTxPool(txconfig, params.TestChainConfig, db, txCacher),
chain: chain,
headBlock: headBlock,
}
}

View File

@ -42,7 +42,7 @@ func testHandshake(t *testing.T, protocol uint) {
genesis = backend.chain.Genesis()
head = backend.chain.CurrentBlock()
td = backend.chain.GetTd(head.Hash(), head.NumberU64())
forkID = forkid.NewID(backend.chain.Config(), backend.chain.Genesis().Hash(), backend.chain.CurrentHeader().Number.Uint64())
forkID = forkid.NewID(backend.chain.Config(), backend.chain.Genesis().Hash(), backend.headBlock.NumberU64())
)
tests := []struct {
code uint64
@ -86,7 +86,7 @@ func testHandshake(t *testing.T, protocol uint) {
}
}()
err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain))
err := peer.Handshake(1, td, head.Hash(), genesis.Hash(), forkID, forkid.NewFilter(backend.chain.Config(), backend.chain.Genesis().Hash(), func() uint64 { return backend.headBlock.NumberU64() }))
if err == nil {
t.Errorf("test %d: protocol returned nil error, want %q", i, test.want)
} else if !errors.Is(err, test.want) {

View File

@ -153,6 +153,8 @@ func createStageBuilders(blocks []*types.Block, blockNum uint64, checkRoot bool)
}
}
c.Close()
*/
/*
c = world.TX.(ethdb.HasTx).Tx().Cursor(dbutils.CurrentStateBucket)
for k, v, err := c.First(); k != nil; k, v, err = c.Next() {
if err != nil {

View File

@ -175,14 +175,14 @@ type chainSyncOp struct {
func newChainSyncer(handler *handler) *chainSyncer {
return &chainSyncer{
handler: handler,
peerEventCh: make(chan struct{}),
peerEventCh: make(chan struct{}, 1),
}
}
// handlePeerEvent notifies the syncer about a change in the peer set.
// This is called for new peers and every time a peer announces a new
// chain head.
func (cs *chainSyncer) handlePeerEvent(peer *eth.Peer) bool { //nolint:unparam
func (cs *chainSyncer) handlePeerEvent() bool {
select {
case cs.peerEventCh <- struct{}{}:
return true