From b2f424e2cd6cae9f310558d6e1e6807bd1e97e26 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Fri, 7 May 2021 09:06:00 +0700 Subject: [PATCH] Canonical fix in --download.v2 (#1886) --- cmd/headers/download/downloader.go | 5 +- eth/stagedsync/replacement_stages.go.go | 22 ++++-- eth/stagedsync/stage_bodies_new.go | 33 +++++---- eth/stagedsync/stage_headers_new.go | 58 +++++++-------- turbo/stages/bodydownload/body_algos.go | 73 ++++++++++--------- turbo/stages/bodydownload/body_test.go | 9 ++- .../stages/headerdownload/header_algo_test.go | 18 ++--- turbo/stages/headerdownload/header_algos.go | 44 ++++++----- .../headerdownload/header_data_struct.go | 5 +- 9 files changed, 148 insertions(+), 119 deletions(-) diff --git a/cmd/headers/download/downloader.go b/cmd/headers/download/downloader.go index 2d5579253..32e476d56 100644 --- a/cmd/headers/download/downloader.go +++ b/cmd/headers/download/downloader.go @@ -435,7 +435,10 @@ func NewControlServer(db ethdb.Database, nodeName string, chainConfig *params.Ch cs.protocolVersion = uint32(eth.ProtocolVersions[0]) cs.networkId = networkID var err error - cs.headHeight, cs.headHash, cs.headTd, err = bd.UpdateFromDb(db) + err = db.RwKV().Update(context.Background(), func(tx ethdb.RwTx) error { + cs.headHeight, cs.headHash, cs.headTd, err = bd.UpdateFromDb(tx) + return err + }) return cs, err } diff --git a/eth/stagedsync/replacement_stages.go.go b/eth/stagedsync/replacement_stages.go.go index aa140db7e..4a9a25e64 100644 --- a/eth/stagedsync/replacement_stages.go.go +++ b/eth/stagedsync/replacement_stages.go.go @@ -29,10 +29,18 @@ func ReplacementStages(ctx context.Context, ID: stages.Headers, Description: "Download headers", ExecFunc: func(s *StageState, u Unwinder) error { - return HeadersForward(s, u, ctx, world.TX, headers, world.InitialCycle) + var tx ethdb.RwTx + if hasTx, ok := world.TX.(ethdb.HasTx); ok { + tx = hasTx.Tx().(ethdb.RwTx) + } + return HeadersForward(s, u, ctx, tx, headers, world.InitialCycle) }, UnwindFunc: func(u *UnwindState, s *StageState) error { - return HeadersUnwind(u, s, world.TX) + var tx ethdb.RwTx + if hasTx, ok := world.TX.(ethdb.HasTx); ok { + tx = hasTx.Tx().(ethdb.RwTx) + } + return HeadersUnwind(u, s, tx, headers) }, } }, @@ -47,7 +55,7 @@ func ReplacementStages(ctx context.Context, return SpawnBlockHashStage(s, world.TX, world.TmpDir, ctx.Done()) }, UnwindFunc: func(u *UnwindState, s *StageState) error { - return u.Done(world.DB) + return u.Done(world.TX) }, } }, @@ -59,10 +67,14 @@ func ReplacementStages(ctx context.Context, ID: stages.Bodies, Description: "Download block bodies", ExecFunc: func(s *StageState, u Unwinder) error { - return BodiesForward(s, ctx, world.TX, bodies) + var tx ethdb.RwTx + if hasTx, ok := world.TX.(ethdb.HasTx); ok { + tx = hasTx.Tx().(ethdb.RwTx) + } + return BodiesForward(s, ctx, tx, bodies) }, UnwindFunc: func(u *UnwindState, s *StageState) error { - return u.Done(world.DB) + return u.Done(world.TX) }, } }, diff --git a/eth/stagedsync/stage_bodies_new.go b/eth/stagedsync/stage_bodies_new.go index 7a096f4d1..17f4ac6d3 100644 --- a/eth/stagedsync/stage_bodies_new.go +++ b/eth/stagedsync/stage_bodies_new.go @@ -50,16 +50,12 @@ func StageBodiesCfg( func BodiesForward( s *StageState, ctx context.Context, - db ethdb.Database, + tx ethdb.RwTx, cfg BodiesCfg) error { - var tx ethdb.DbWithPendingMutations var err error - var useExternalTx bool - if hasTx, ok := db.(ethdb.HasTx); ok && hasTx.Tx() != nil { - tx = db.(ethdb.DbWithPendingMutations) - useExternalTx = true - } else { - tx, err = db.Begin(context.Background(), ethdb.RW) + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(context.Background()) if err != nil { return err } @@ -87,7 +83,7 @@ func BodiesForward( } logPrefix := s.LogPrefix() log.Info(fmt.Sprintf("[%s] Processing bodies...", logPrefix), "from", bodyProgress, "to", headerProgress) - batch := tx.NewBatch() + batch := ethdb.NewBatch(tx) defer batch.Rollback() logEvery := time.NewTicker(logInterval) defer logEvery.Stop() @@ -111,7 +107,10 @@ func BodiesForward( */ if req == nil { currentTime := uint64(time.Now().Unix()) - req, blockNum = cfg.bd.RequestMoreBodies(db, blockNum, currentTime, cfg.blockPropagator) + req, blockNum, err = cfg.bd.RequestMoreBodies(tx, blockNum, currentTime, cfg.blockPropagator) + if err != nil { + return fmt.Errorf("[%s] request more bodies: %w", logPrefix, err) + } } peer = nil if req != nil { @@ -126,7 +125,10 @@ func BodiesForward( } for req != nil && peer != nil { currentTime := uint64(time.Now().Unix()) - req, blockNum = cfg.bd.RequestMoreBodies(db, blockNum, currentTime, cfg.blockPropagator) + req, blockNum, err = cfg.bd.RequestMoreBodies(tx, blockNum, currentTime, cfg.blockPropagator) + if err != nil { + return fmt.Errorf("[%s] request more bodies: %w", logPrefix, err) + } peer = nil if req != nil { peer = cfg.bodyReqSend(ctx, req) @@ -155,17 +157,22 @@ func BodiesForward( } if batch.BatchSize() >= int(cfg.batchSize) { - if err = batch.CommitAndBegin(context.Background()); err != nil { + if err = batch.Commit(); err != nil { return err } if !useExternalTx { if err := s.DoneAndUpdate(tx, bodyProgress); err != nil { return err } - if err = tx.CommitAndBegin(context.Background()); err != nil { + if err = tx.Commit(); err != nil { + return err + } + tx, err = cfg.db.BeginRw(ctx) + if err != nil { return err } } + batch = ethdb.NewBatch(tx) } } //log.Info("Body progress", "block number", bodyProgress, "header progress", headerProgress) diff --git a/eth/stagedsync/stage_headers_new.go b/eth/stagedsync/stage_headers_new.go index 895600a66..b10b78261 100644 --- a/eth/stagedsync/stage_headers_new.go +++ b/eth/stagedsync/stage_headers_new.go @@ -52,19 +52,15 @@ func HeadersForward( s *StageState, u Unwinder, ctx context.Context, - db ethdb.Database, + tx ethdb.RwTx, cfg HeadersCfg, initialCycle bool, ) error { var headerProgress uint64 var err error - var tx ethdb.DbWithPendingMutations - var useExternalTx bool - if hasTx, ok := db.(ethdb.HasTx); ok && hasTx.Tx() != nil { - tx = db.(ethdb.DbWithPendingMutations) - useExternalTx = true - } else { - tx, err = db.Begin(context.Background(), ethdb.RW) + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(context.Background()) if err != nil { return err } @@ -76,12 +72,12 @@ func HeadersForward( } logPrefix := s.LogPrefix() // Check if this is called straight after the unwinds, which means we need to create new canonical markings - hash, err1 := rawdb.ReadCanonicalHash(tx, headerProgress) - if err1 != nil { - return err1 + hash, err := rawdb.ReadCanonicalHash(tx, headerProgress) + if err != nil { + return err } - headHash := rawdb.ReadHeadHeaderHash(tx) if hash == (common.Hash{}) { + headHash := rawdb.ReadHeadHeaderHash(tx) if err = fixCanonicalChain(logPrefix, headerProgress, headHash, tx); err != nil { return err } @@ -95,16 +91,16 @@ func HeadersForward( } log.Info(fmt.Sprintf("[%s] Processing headers...", logPrefix), "from", headerProgress) - batch := tx.NewBatch() + batch := ethdb.NewBatch(tx) defer batch.Rollback() logEvery := time.NewTicker(logInterval) defer logEvery.Stop() - localTd, err1 := rawdb.ReadTd(tx, hash, headerProgress) - if err1 != nil { - return err1 + localTd, err := rawdb.ReadTd(tx, hash, headerProgress) + if err != nil { + return err } - headerInserter := headerdownload.NewHeaderInserter(logPrefix, batch, localTd, headerProgress) + headerInserter := headerdownload.NewHeaderInserter(logPrefix, localTd, headerProgress) cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, batch: batch}) var req *headerdownload.HeaderRequest @@ -144,21 +140,27 @@ func HeadersForward( } } // Load headers into the database - if err = cfg.hd.InsertHeaders(headerInserter.FeedHeader); err != nil { + if err = cfg.hd.InsertHeaders(headerInserter.FeedHeaderFunc(batch)); err != nil { return err } if batch.BatchSize() >= int(cfg.batchSize) { - if err = batch.CommitAndBegin(context.Background()); err != nil { + if err = batch.Commit(); err != nil { return err } if !useExternalTx { if err = s.Update(tx, headerInserter.GetHighest()); err != nil { return err } - if err = tx.CommitAndBegin(context.Background()); err != nil { + if err = tx.Commit(); err != nil { + return err + } + tx, err = cfg.db.BeginRw(ctx) + if err != nil { return err } } + batch = ethdb.NewBatch(tx) + cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, batch: batch}) } timer.Stop() announces := cfg.hd.GrabAnnounces() @@ -220,7 +222,7 @@ func HeadersForward( return nil } -func fixCanonicalChain(logPrefix string, height uint64, hash common.Hash, tx ethdb.DbWithPendingMutations) error { +func fixCanonicalChain(logPrefix string, height uint64, hash common.Hash, tx ethdb.StatelessRwTx) error { if height == 0 { return nil } @@ -245,15 +247,11 @@ func fixCanonicalChain(logPrefix string, height uint64, hash common.Hash, tx eth return nil } -func HeadersUnwind(u *UnwindState, s *StageState, db ethdb.Database) error { +func HeadersUnwind(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HeadersCfg) error { var err error - var tx ethdb.DbWithPendingMutations - var useExternalTx bool - if hasTx, ok := db.(ethdb.HasTx); ok && hasTx.Tx() != nil { - tx = db.(ethdb.DbWithPendingMutations) - useExternalTx = true - } else { - tx, err = db.Begin(context.Background(), ethdb.RW) + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(context.Background()) if err != nil { return err } @@ -261,7 +259,7 @@ func HeadersUnwind(u *UnwindState, s *StageState, db ethdb.Database) error { } // Delete canonical hashes that are being unwound var headerProgress uint64 - headerProgress, err = stages.GetStageProgress(db, stages.Headers) + headerProgress, err = stages.GetStageProgress(tx, stages.Headers) if err != nil { return err } diff --git a/turbo/stages/bodydownload/body_algos.go b/turbo/stages/bodydownload/body_algos.go index 1d7b63e61..4c8fb74ac 100644 --- a/turbo/stages/bodydownload/body_algos.go +++ b/turbo/stages/bodydownload/body_algos.go @@ -7,6 +7,7 @@ import ( "github.com/holiman/uint256" "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/debug" "github.com/ledgerwatch/turbo-geth/core/rawdb" "github.com/ledgerwatch/turbo-geth/core/types" "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" @@ -18,7 +19,7 @@ import ( const BlockBufferSize = 1024 // UpdateFromDb reads the state of the database and refreshes the state of the body download -func (bd *BodyDownload) UpdateFromDb(db ethdb.Database) (headHeight uint64, headHash common.Hash, headTd256 *uint256.Int, err error) { +func (bd *BodyDownload) UpdateFromDb(db ethdb.RwTx) (headHeight uint64, headHash common.Hash, headTd256 *uint256.Int, err error) { var headerProgress, bodyProgress uint64 headerProgress, err = stages.GetStageProgress(db, stages.Headers) if err != nil { @@ -62,7 +63,8 @@ func (bd *BodyDownload) UpdateFromDb(db ethdb.Database) (headHeight uint64, head return headHeight, headHash, headTd256, nil } -func (bd *BodyDownload) RequestMoreBodies(db ethdb.Database, blockNum uint64, currentTime uint64, blockPropagator adapter.BlockPropagator) (*BodyRequest, uint64) { +// RequestMoreBodies - returns nil if nothing to request +func (bd *BodyDownload) RequestMoreBodies(db ethdb.Tx, blockNum uint64, currentTime uint64, blockPropagator adapter.BlockPropagator) (*BodyRequest, uint64, error) { bd.lock.Lock() defer bd.lock.Unlock() if blockNum < bd.requestedLow { @@ -96,47 +98,48 @@ func (bd *BodyDownload) RequestMoreBodies(db ethdb.Database, blockNum uint64, cu if bd.deliveries[blockNum-bd.requestedLow] != nil { // If this block was requested before, we don't need to fetch the headers from the database the second time header = bd.deliveries[blockNum-bd.requestedLow].Header() + if header == nil { + return nil, 0, fmt.Errorf("header not found: %w, blockNum=%d, trace=%s", err, blockNum, debug.Callers(7)) + } hash = header.Hash() } else { hash, err = rawdb.ReadCanonicalHash(db, blockNum) - if err == nil { - header = rawdb.ReadHeader(db, hash, blockNum) - } else { - log.Error("Could not find canonical header", "block number", blockNum) + if err != nil { + return nil, 0, fmt.Errorf("could not find canonical header: %w, blockNum=%d, trace=%s", err, blockNum, debug.Callers(7)) + } + header = rawdb.ReadHeader(db, hash, blockNum) + if header == nil { + return nil, 0, fmt.Errorf("header not found: %w, blockNum=%d, trace=%s", err, blockNum, debug.Callers(7)) } - if header != nil { - if block := bd.prefetchedBlocks.Pop(hash); block != nil { - // Block is prefetched, no need to request - bd.deliveries[blockNum-bd.requestedLow] = block - // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) - var td *big.Int - if parent, err := rawdb.ReadTd(db, block.ParentHash(), block.NumberU64()-1); err != nil { - log.Error("Failed to ReadTd", "err", err, "number", block.NumberU64()-1, "hash", block.ParentHash()) - } else if parent != nil { - td = new(big.Int).Add(block.Difficulty(), parent) - go blockPropagator.BroadcastNewBlock(context.Background(), block, td) - } else { - log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) - } - request = false + if block := bd.prefetchedBlocks.Pop(hash); block != nil { + // Block is prefetched, no need to request + bd.deliveries[blockNum-bd.requestedLow] = block + + // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) + var td *big.Int + if parent, err := rawdb.ReadTd(db, block.ParentHash(), block.NumberU64()-1); err != nil { + log.Error("Failed to ReadTd", "err", err, "number", block.NumberU64()-1, "hash", block.ParentHash()) + } else if parent != nil { + td = new(big.Int).Add(block.Difficulty(), parent) + go blockPropagator.BroadcastNewBlock(context.Background(), block, td) } else { - bd.deliveries[blockNum-bd.requestedLow] = types.NewBlockWithHeader(header) // Block without uncles and transactions - if header.UncleHash != types.EmptyUncleHash || header.TxHash != types.EmptyRootHash { - var doubleHash DoubleHash - copy(doubleHash[:], header.UncleHash.Bytes()) - copy(doubleHash[common.HashLength:], header.TxHash.Bytes()) - bd.requestedMap[doubleHash] = blockNum - } else { - request = false - } + log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) + } + request = false + } else { + bd.deliveries[blockNum-bd.requestedLow] = types.NewBlockWithHeader(header) // Block without uncles and transactions + if header.UncleHash != types.EmptyUncleHash || header.TxHash != types.EmptyRootHash { + var doubleHash DoubleHash + copy(doubleHash[:], header.UncleHash.Bytes()) + copy(doubleHash[common.HashLength:], header.TxHash.Bytes()) + bd.requestedMap[doubleHash] = blockNum + } else { + request = false } } } - if header == nil { - log.Error("Header not found", "block number", blockNum) - panic("") - } else if request { + if request { blockNums = append(blockNums, blockNum) hashes = append(hashes, hash) } else { @@ -150,7 +153,7 @@ func (bd *BodyDownload) RequestMoreBodies(db ethdb.Database, blockNum uint64, cu bd.requests[blockNum-bd.requestedLow] = bodyReq } } - return bodyReq, blockNum + return bodyReq, blockNum, nil } func (bd *BodyDownload) RequestSent(bodyReq *BodyRequest, timeWithTimeout uint64, peer []byte) { diff --git a/turbo/stages/bodydownload/body_test.go b/turbo/stages/bodydownload/body_test.go index f6c363c39..dd766c714 100644 --- a/turbo/stages/bodydownload/body_test.go +++ b/turbo/stages/bodydownload/body_test.go @@ -1,16 +1,21 @@ package bodydownload import ( + "context" "testing" "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/stretchr/testify/require" ) func TestCreateBodyDownload(t *testing.T) { - db := ethdb.NewMemDatabase() + db := ethdb.NewMemKV() defer db.Close() + tx, err := db.BeginRw(context.Background()) + require.NoError(t, err) + defer tx.Rollback() bd := NewBodyDownload(100) - if _, _, _, err := bd.UpdateFromDb(db); err != nil { + if _, _, _, err := bd.UpdateFromDb(tx); err != nil { t.Fatalf("update from db: %v", err) } } diff --git a/turbo/stages/headerdownload/header_algo_test.go b/turbo/stages/headerdownload/header_algo_test.go index e4bc0d93a..ecb1f0106 100644 --- a/turbo/stages/headerdownload/header_algo_test.go +++ b/turbo/stages/headerdownload/header_algo_test.go @@ -9,23 +9,21 @@ import ( "github.com/ledgerwatch/turbo-geth/core/rawdb" "github.com/ledgerwatch/turbo-geth/core/types" "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/stretchr/testify/require" ) func TestInserter1(t *testing.T) { - db := ethdb.NewMemDatabase() + db := ethdb.NewMemKV() defer db.Close() + tx, err := db.BeginRw(context.Background()) + require.NoError(t, err) + defer tx.Rollback() // Set up parent difficulty - if err := rawdb.WriteTd(db, common.Hash{}, 4, big.NewInt(0)); err != nil { + if err := rawdb.WriteTd(tx, common.Hash{}, 4, big.NewInt(0)); err != nil { t.Fatalf("write parent diff: %v", err) } - tx, err := db.Begin(context.Background(), ethdb.RW) - if err != nil { - t.Fatalf("begin transaction: %v", err) - } - defer tx.Rollback() - batch := tx.NewBatch() - hi := NewHeaderInserter("headers", batch, big.NewInt(0), 0) - if err := hi.FeedHeader(&types.Header{Number: big.NewInt(5), Difficulty: big.NewInt(1)}, 5); err != nil { + hi := NewHeaderInserter("headers", big.NewInt(0), 0) + if err := hi.FeedHeader(tx, &types.Header{Number: big.NewInt(5), Difficulty: big.NewInt(1)}, 5); err != nil { t.Errorf("feed empty header: %v", err) } } diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index be8fddac8..4bc30d824 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -485,17 +485,16 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime uint64) *HeaderRequest for hd.anchorQueue.Len() > 0 { anchor := (*hd.anchorQueue)[0] if _, ok := hd.anchors[anchor.parentHash]; ok { - if anchor.timestamp <= currentTime { - if anchor.timeouts < 10 { - return &HeaderRequest{Hash: anchor.parentHash, Number: anchor.blockHeight - 1, Length: 192, Skip: 0, Reverse: true} - } else { - // Ancestors of this anchor seem to be unavailable, invalidate and move on - hd.invalidateAnchor(anchor) - } - } else { + if anchor.timestamp > currentTime { // Anchor not ready for re-request yet return nil } + if anchor.timeouts < 10 { + return &HeaderRequest{Hash: anchor.parentHash, Number: anchor.blockHeight - 1, Length: 192, Skip: 0, Reverse: true} + } else { + // Ancestors of this anchor seem to be unavailable, invalidate and move on + hd.invalidateAnchor(anchor) + } } // Anchor disappeared or unavailable, pop from the queue and move on heap.Remove(hd.anchorQueue, 0) @@ -633,7 +632,14 @@ func (hd *HeaderDownload) addHeaderAsLink(header *types.Header, persisted bool) return link } -func (hi *HeaderInserter) FeedHeader(header *types.Header, blockHeight uint64) error { +func (hi *HeaderInserter) FeedHeaderFunc(db ethdb.StatelessRwTx) func(header *types.Header, blockHeight uint64) error { + return func(header *types.Header, blockHeight uint64) error { + return hi.FeedHeader(db, header, blockHeight) + } + +} + +func (hi *HeaderInserter) FeedHeader(db ethdb.StatelessRwTx, header *types.Header, blockHeight uint64) error { hash := header.Hash() if hash == hi.prevHash { // Skip duplicates @@ -642,19 +648,19 @@ func (hi *HeaderInserter) FeedHeader(header *types.Header, blockHeight uint64) e if blockHeight < hi.prevHeight { return fmt.Errorf("[%s] headers are unexpectedly unsorted, got %d after %d", hi.logPrefix, blockHeight, hi.prevHeight) } - if oldH := rawdb.ReadHeader(hi.batch, hash, blockHeight); oldH != nil { + if oldH := rawdb.ReadHeader(db, hash, blockHeight); oldH != nil { // Already inserted, skip return nil } // Load parent header - parent := rawdb.ReadHeader(hi.batch, header.ParentHash, blockHeight-1) + parent := rawdb.ReadHeader(db, header.ParentHash, blockHeight-1) if parent == nil { log.Warn(fmt.Sprintf("Could not find parent with hash %x and height %d for header %x %d", header.ParentHash, blockHeight-1, hash, blockHeight)) // Skip headers without parents return nil } // Parent's total difficulty - parentTd, err := rawdb.ReadTd(hi.batch, header.ParentHash, blockHeight-1) + parentTd, err := rawdb.ReadTd(db, header.ParentHash, blockHeight-1) if err != nil { return fmt.Errorf("[%s] parent's total difficulty not found with hash %x and height %d for header %x %d: %v", hi.logPrefix, header.ParentHash, blockHeight-1, hash, blockHeight, err) } @@ -666,7 +672,7 @@ func (hi *HeaderInserter) FeedHeader(header *types.Header, blockHeight uint64) e // Find the forking point - i.e. the latest header on the canonical chain which is an ancestor of this one // Most common case - forking point is the height of the parent header var forkingPoint uint64 - ch, err1 := rawdb.ReadCanonicalHash(hi.batch, blockHeight-1) + ch, err1 := rawdb.ReadCanonicalHash(db, blockHeight-1) if err1 != nil { return fmt.Errorf("reading canonical hash for height %d: %w", blockHeight-1, err1) } @@ -676,8 +682,8 @@ func (hi *HeaderInserter) FeedHeader(header *types.Header, blockHeight uint64) e // Going further back ancestorHash := parent.ParentHash ancestorHeight := blockHeight - 2 - for ch, err = rawdb.ReadCanonicalHash(hi.batch, ancestorHeight); err == nil && ch != ancestorHash; ch, err = rawdb.ReadCanonicalHash(hi.batch, ancestorHeight) { - ancestor := rawdb.ReadHeader(hi.batch, ancestorHash, ancestorHeight) + for ch, err = rawdb.ReadCanonicalHash(db, ancestorHeight); err == nil && ch != ancestorHash; ch, err = rawdb.ReadCanonicalHash(db, ancestorHeight) { + ancestor := rawdb.ReadHeader(db, ancestorHash, ancestorHeight) ancestorHash = ancestor.ParentHash ancestorHeight-- } @@ -687,11 +693,11 @@ func (hi *HeaderInserter) FeedHeader(header *types.Header, blockHeight uint64) e // Loop above terminates when either err != nil (handled already) or ch == ancestorHash, therefore ancestorHeight is our forking point forkingPoint = ancestorHeight } - if err = rawdb.WriteHeadHeaderHash(hi.batch, hash); err != nil { + if err = rawdb.WriteHeadHeaderHash(db, hash); err != nil { return fmt.Errorf("[%s] marking head header hash as %x: %w", hi.logPrefix, hash, err) } hi.headerProgress = blockHeight - if err = stages.SaveStageProgress(hi.batch, stages.Headers, blockHeight); err != nil { + if err = stages.SaveStageProgress(db, stages.Headers, blockHeight); err != nil { return fmt.Errorf("[%s] saving Headers progress: %w", hi.logPrefix, err) } // See if the forking point affects the unwindPoint (the block number to which other stages will need to unwind before the new canonical chain is applied) @@ -703,10 +709,10 @@ func (hi *HeaderInserter) FeedHeader(header *types.Header, blockHeight uint64) e if err2 != nil { return fmt.Errorf("[%s] failed to RLP encode header: %w", hi.logPrefix, err2) } - if err = rawdb.WriteTd(hi.batch, hash, blockHeight, td); err != nil { + if err = rawdb.WriteTd(db, hash, blockHeight, td); err != nil { return fmt.Errorf("[%s] failed to WriteTd: %w", hi.logPrefix, err) } - if err = hi.batch.Put(dbutils.HeadersBucket, dbutils.HeaderKey(blockHeight, hash), data); err != nil { + if err = db.Put(dbutils.HeadersBucket, dbutils.HeaderKey(blockHeight, hash), data); err != nil { return fmt.Errorf("[%s] failed to store header: %w", hi.logPrefix, err) } hi.prevHash = hash diff --git a/turbo/stages/headerdownload/header_data_struct.go b/turbo/stages/headerdownload/header_data_struct.go index 1f2224015..5e770b1a5 100644 --- a/turbo/stages/headerdownload/header_data_struct.go +++ b/turbo/stages/headerdownload/header_data_struct.go @@ -10,7 +10,6 @@ import ( "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/consensus" "github.com/ledgerwatch/turbo-geth/core/types" - "github.com/ledgerwatch/turbo-geth/ethdb" ) // Link is a chain link that can be connect to other chain links @@ -248,7 +247,6 @@ func (pp PeerPenalty) String() string { // The headers are "fed" by repeatedly calling the FeedHeader function. type HeaderInserter struct { logPrefix string - batch ethdb.DbWithPendingMutations prevHash common.Hash // Hash of previously seen header - to filter out potential duplicates prevHeight uint64 newCanonical bool @@ -259,10 +257,9 @@ type HeaderInserter struct { headerProgress uint64 } -func NewHeaderInserter(logPrefix string, batch ethdb.DbWithPendingMutations, localTd *big.Int, headerProgress uint64) *HeaderInserter { +func NewHeaderInserter(logPrefix string, localTd *big.Int, headerProgress uint64) *HeaderInserter { return &HeaderInserter{ logPrefix: logPrefix, - batch: batch, localTd: localTd, headerProgress: headerProgress, unwindPoint: headerProgress,