Fix hive tests and reorganise the fix for body download problem (#6515)

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2023-01-06 12:43:46 +00:00 committed by GitHub
parent 4c3bb1cca5
commit 2941e754e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 49 additions and 79 deletions

View File

@ -373,7 +373,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
return err
}
// We start the mining step
if err := stages2.StateStep(ctx, batch, stateSync, header, body, unwindPoint, headersChain, bodiesChain, true /* quiet */); err != nil {
if err := stages2.StateStep(ctx, batch, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain, true /* quiet */); err != nil {
log.Warn("Could not validate block", "err", err)
return err
}
@ -566,7 +566,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
if err := backend.sentriesClient.Hd.AddMinedHeader(b.Header()); err != nil {
log.Error("add mined block to header downloader", "err", err)
}
backend.sentriesClient.Bd.AddToPrefetch(b)
backend.sentriesClient.Bd.AddToPrefetch(b.Header(), b.RawBody())
//p2p
//backend.sentriesClient.BroadcastNewBlock(context.Background(), b, b.Difficulty())

View File

@ -1,6 +1,7 @@
package sentry
import (
"bytes"
"context"
"errors"
"math"
@ -73,11 +74,19 @@ func (cs *MultiClient) PropagateNewBlockHashes(ctx context.Context, announces []
}
}
func (cs *MultiClient) BroadcastNewBlock(ctx context.Context, block *types.Block, td *big.Int) {
func (cs *MultiClient) BroadcastNewBlock(ctx context.Context, header *types.Header, body *types.RawBody, td *big.Int) {
cs.lock.RLock()
defer cs.lock.RUnlock()
txs := make([]types.Transaction, len(body.Transactions))
for i, tx := range body.Transactions {
var err error
if txs[i], err = types.DecodeTransaction(rlp.NewStream(bytes.NewReader(tx), 0)); err != nil {
log.Error("broadcastNewBlock", "err", err)
return
}
}
data, err := rlp.EncodeToBytes(&eth.NewBlockPacket{
Block: block,
Block: types.NewBlock(header, txs, body.Uncles, nil, body.Withdrawals),
TD: td,
})
if err != nil {

View File

@ -535,7 +535,7 @@ func (cs *MultiClient) newBlock66(ctx context.Context, inreq *proto_sentry.Inbou
} else {
return fmt.Errorf("singleHeaderAsSegment failed: %w", err)
}
cs.Bd.AddToPrefetch(request.Block)
cs.Bd.AddToPrefetch(request.Block.Header(), request.Block.RawBody())
outreq := proto_sentry.PeerMinBlockRequest{
PeerId: inreq.PeerId,
MinBlock: request.Block.NumberU64(),

View File

@ -387,7 +387,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
return err
}
// We start the mining step
if err := stages2.StateStep(ctx, batch, stateSync, header, body, unwindPoint, headersChain, bodiesChain, true /* quiet */); err != nil {
if err := stages2.StateStep(ctx, batch, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain, true /* quiet */); err != nil {
log.Warn("Could not validate block", "err", err)
return err
}
@ -612,7 +612,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
if err := backend.sentriesClient.Hd.AddMinedHeader(b.Header()); err != nil {
log.Error("add mined block to header downloader", "err", err)
}
backend.sentriesClient.Bd.AddToPrefetch(b)
backend.sentriesClient.Bd.AddToPrefetch(b.Header(), b.RawBody())
//p2p
//backend.sentriesClient.BroadcastNewBlock(context.Background(), b, b.Difficulty())

View File

@ -524,7 +524,7 @@ func handleNewPayload(
}
if cfg.bodyDownload != nil {
cfg.bodyDownload.AddToPrefetch(block)
cfg.bodyDownload.AddToPrefetch(header, block.RawBody())
}
return response, nil

View File

@ -7,4 +7,4 @@ import (
"github.com/ledgerwatch/erigon/core/types"
)
type BlockPropagator func(ctx context.Context, block *types.Block, td *big.Int)
type BlockPropagator func(ctx context.Context, header *types.Header, body *types.RawBody, td *big.Int)

View File

@ -206,7 +206,7 @@ func InsertChain(ethereum *eth.Ethereum, chain *core.ChainPack) error {
for _, b := range chain.Blocks {
sentryControlServer.Hd.AddMinedHeader(b.Header())
sentryControlServer.Bd.AddToPrefetch(b)
sentryControlServer.Bd.AddToPrefetch(b.Header(), b.RawBody())
}
sentryControlServer.Hd.MarkAllVerified()

View File

@ -163,11 +163,6 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body
}
if extendCanonical {
if header.Number.Uint64() > fv.currentHeight+1 {
// Cannot extend because some stages are behind the headers. This usually happens when body download timeouts
status = remote.EngineStatus_ACCEPTED
return
}
// If the new block extends the canonical chain we update extendingFork.
if fv.extendingFork == nil {
fv.extendingFork = memdb.NewMemoryBatch(tx, fv.tmpDir)
@ -229,10 +224,6 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body
// Do not set an unwind point if we are already there.
if unwindPoint == fv.currentHeight {
unwindPoint = 0
} else if unwindPoint > fv.currentHeight {
// Some stages are behind headers so we cannot do in-memory validations. This usually happens when body download timeouts
status = remote.EngineStatus_ACCEPTED
return
}
batch := memdb.NewMemoryBatch(tx, fv.tmpDir)
defer batch.Rollback()

View File

@ -210,28 +210,28 @@ func (bd *BodyDownload) RequestMoreBodies(tx kv.RwTx, blockReader services.FullB
// checks if we have the block prefetched, returns true if found and stored or false if not present
func (bd *BodyDownload) checkPrefetchedBlock(hash common.Hash, tx kv.RwTx, blockNum uint64, blockPropagator adapter.BlockPropagator) bool {
block := bd.prefetchedBlocks.Pop(hash)
header, body := bd.prefetchedBlocks.Pop(hash)
if block == nil {
if body == nil {
return false
}
// Block is prefetched, no need to request
bd.deliveriesH[blockNum] = block.Header()
bd.deliveriesH[blockNum] = header
// make sure we have the body in the bucket for later use
bd.addBodyToBucket(tx, blockNum, block.RawBody())
bd.addBodyToBucket(tx, blockNum, body)
// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
if parent, err := rawdb.ReadTd(tx, block.ParentHash(), block.NumberU64()-1); err != nil {
log.Error("Failed to ReadTd", "err", err, "number", block.NumberU64()-1, "hash", block.ParentHash())
if parent, err := rawdb.ReadTd(tx, header.ParentHash, header.Number.Uint64()-1); err != nil {
log.Error("Failed to ReadTd", "err", err, "number", header.Number.Uint64()-1, "hash", header.ParentHash)
} else if parent != nil {
if block.Difficulty().Sign() != 0 { // don't propagate proof-of-stake blocks
td := new(big.Int).Add(block.Difficulty(), parent)
go blockPropagator(context.Background(), block, td)
if header.Difficulty.Sign() != 0 { // don't propagate proof-of-stake blocks
td := new(big.Int).Add(header.Difficulty, parent)
go blockPropagator(context.Background(), header, body, td)
}
} else {
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
log.Error("Propagating dangling block", "number", header.Number.Uint64(), "hash", hash)
}
return true
@ -401,8 +401,8 @@ func (bd *BodyDownload) PrintPeerMap() {
bd.peerMap = make(map[[64]byte]int)
}
func (bd *BodyDownload) AddToPrefetch(block *types.Block) {
bd.prefetchedBlocks.Add(block)
func (bd *BodyDownload) AddToPrefetch(header *types.Header, body *types.RawBody) {
bd.prefetchedBlocks.Add(header, body)
}
// GetHeader returns a header by either loading from the deliveriesH slice populated when running RequestMoreBodies

View File

@ -11,6 +11,11 @@ type PrefetchedBlocks struct {
blocks *lru.Cache
}
type HeaderAndBody struct {
header *types.Header
body *types.RawBody
}
func NewPrefetchedBlocks() *PrefetchedBlocks {
// Setting this to 2500 as `erigon import` imports blocks in batches of 2500
// and the import command makes use of PrefetchedBlocks.
@ -21,20 +26,20 @@ func NewPrefetchedBlocks() *PrefetchedBlocks {
return &PrefetchedBlocks{blocks: cache}
}
func (pb *PrefetchedBlocks) Pop(hash common.Hash) *types.Block {
func (pb *PrefetchedBlocks) Pop(hash common.Hash) (*types.Header, *types.RawBody) {
if val, ok := pb.blocks.Get(hash); ok && val != nil {
pb.blocks.Remove(hash)
if block, ok := val.(*types.Block); ok {
return block
//pb.blocks.Remove(hash)
if headerAndBody, ok := val.(HeaderAndBody); ok {
return headerAndBody.header, headerAndBody.body
}
}
return nil
return nil, nil
}
func (pb *PrefetchedBlocks) Add(b *types.Block) {
func (pb *PrefetchedBlocks) Add(h *types.Header, b *types.RawBody) {
if b == nil {
return
}
hash := b.Hash()
pb.blocks.ContainsOrAdd(hash, b)
hash := h.Hash()
pb.blocks.ContainsOrAdd(hash, HeaderAndBody{header: h, body: b})
}

View File

@ -277,7 +277,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
sentries := []direct.SentryClient{mock.SentryClient}
sendBodyRequest := func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool) { return [64]byte{}, false }
blockPropagator := func(Ctx context.Context, block *types.Block, td *big.Int) {}
blockPropagator := func(Ctx context.Context, header *types.Header, body *types.RawBody, td *big.Int) {}
if !cfg.DeprecatedTxPool.Disable {
poolCfg := txpool.DefaultConfig

View File

@ -34,6 +34,7 @@ import (
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/stages/bodydownload"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)
@ -278,7 +279,7 @@ func MiningStep(ctx context.Context, kv kv.RwDB, mining *stagedsync.Sync, tmpDir
return nil
}
func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody, quiet bool) (err error) {
func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, Bd *bodydownload.BodyDownload, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody, quiet bool) (err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("%+v, trace: %s", rec, dbg.Stack())
@ -300,22 +301,7 @@ func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, h
currentHeight := headersChain[i].Number.Uint64()
currentHash := headersChain[i].Hash()
// Prepare memory state for block execution
_, _, err := rawdb.WriteRawBodyIfNotExists(batch, currentHash, currentHeight, currentBody)
if err != nil {
return err
}
/*
ok, lastTxnNum, err := rawdb.WriteRawBodyIfNotExists(batch, currentHash, currentHeight, currentBody)
if err != nil {
return err
}
if ok {
if txNums != nil {
txNums.Append(currentHeight, lastTxnNum)
}
}
*/
Bd.AddToPrefetch(currentHeader, currentBody)
rawdb.WriteHeader(batch, currentHeader)
if err = rawdb.WriteHeaderNumber(batch, currentHash, currentHeight); err != nil {
return err
@ -350,28 +336,7 @@ func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, h
return err
}
if body != nil {
if err = stages.SaveStageProgress(batch, stages.Bodies, height); err != nil {
return err
}
_, _, err := rawdb.WriteRawBodyIfNotExists(batch, hash, height, body)
if err != nil {
return err
}
/*
ok, lastTxnNum, err := rawdb.WriteRawBodyIfNotExists(batch, hash, height, body)
if err != nil {
return err
}
if ok {
if txNums != nil {
txNums.Append(height, lastTxnNum)
}
}
*/
} else {
if err = stages.SaveStageProgress(batch, stages.Bodies, height-1); err != nil {
return err
}
Bd.AddToPrefetch(header, body)
}
// Run state sync
if err = stateSync.Run(nil, batch, false /* firstCycle */, quiet); err != nil {