From d8c0c16e40c953c3596e9cfdd13e2973630a7088 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Tue, 1 Sep 2020 12:50:36 +0100 Subject: [PATCH] Bad rewind issues fix (#1024) * Unwind over unwound, no rollbacks * Exclude bits only for StagedSync * Nitpick --- eth/downloader/downloader.go | 65 ++++++++++++++++++--------------- eth/stagedsync/stage_execute.go | 2 +- eth/stagedsync/stage_txpool.go | 11 ++---- 3 files changed, 40 insertions(+), 38 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 613963a7c..d12c0506f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1545,28 +1545,30 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin rollbackErr error mode = d.getMode() ) - defer func() { - if rollback > 0 { - lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0 - if mode != LightSync && mode != StagedSync { - lastFastBlock = d.blockchain.CurrentFastBlock().Number() - lastBlock = d.blockchain.CurrentBlock().Number() + if mode != StagedSync { + defer func() { + if rollback > 0 { + lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0 + if mode != LightSync && mode != StagedSync { + lastFastBlock = d.blockchain.CurrentFastBlock().Number() + lastBlock = d.blockchain.CurrentBlock().Number() + } + if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block + // We're already unwinding the stack, only print the error to make it more visible + log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err) + } + curFastBlock, curBlock := common.Big0, common.Big0 + if mode != LightSync && mode != StagedSync { + curFastBlock = d.blockchain.CurrentFastBlock().Number() + curBlock = d.blockchain.CurrentBlock().Number() + } + log.Warn("Rolled back chain segment", + "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number), + "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), + "block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr) } - if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block - // We're already unwinding the stack, only print the error to make it more visible - log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err) - } - curFastBlock, curBlock := common.Big0, common.Big0 - if mode != LightSync && mode != StagedSync { - curFastBlock = d.blockchain.CurrentFastBlock().Number() - curBlock = d.blockchain.CurrentBlock().Number() - } - log.Warn("Rolled back chain segment", - "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number), - "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), - "block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr) - } - }() + }() + } // Wait for batches of headers to process gotHeaders := false @@ -1606,6 +1608,10 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin return errStallingPeer } } + if mode != StagedSync { + // Disable any rollback and return + rollback = 0 + } // If fast or light syncing, ensure promised headers are indeed delivered. This is // needed to detect scenarios where an attacker feeds a bad pivot and then bails out // of delivering the post-pivot blocks that would flag the invalid content. @@ -1619,8 +1625,6 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin return errStallingPeer } } - // Disable any rollback and return - rollback = 0 return nil } // Otherwise split the chunk of headers into batches and process them @@ -1652,7 +1656,6 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin reorg, forkBlockNumber, err = stagedsync.InsertHeaderChain(d.stateDB, chunk, d.chainConfig, d.blockchain.Engine(), frequency) if reorg && d.headersUnwinder != nil { // Need to unwind further stages - if err1 := d.headersUnwinder.UnwindTo(forkBlockNumber, d.stateDB); err1 != nil { return fmt.Errorf("unwinding all stages to %d: %v", forkBlockNumber, err1) } @@ -1665,7 +1668,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin return fmt.Errorf("saving SyncStage Headers progress: %v", err1) } } - if err != nil { + if mode != StagedSync && err != nil { // If some headers were inserted, add them too to the rollback list if n > 0 && rollback == 0 { rollback = chunk[0].Number.Uint64() @@ -1675,11 +1678,13 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin } // All verifications passed, track all headers within the alloted limits - head := chunk[len(chunk)-1].Number.Uint64() - if head-rollback > uint64(fsHeaderSafetyNet) { - rollback = head - uint64(fsHeaderSafetyNet) - } else { - rollback = 1 + if mode != StagedSync { + head := chunk[len(chunk)-1].Number.Uint64() + if head-rollback > uint64(fsHeaderSafetyNet) { + rollback = head - uint64(fsHeaderSafetyNet) + } else { + rollback = 1 + } } } // Unless we're doing light chains, schedule the headers for associated content retrieval diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 2c6506b25..f592eb76b 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -183,7 +183,7 @@ func logProgress(prev, now uint64, batch ethdb.DbWithPendingMutations) uint64 { runtime.ReadMemStats(&m) log.Info("Executed blocks:", "currentBlock", now, - "speed (blk/second)", speed, + "blk/second", speed, "batch", common.StorageSize(batch.BatchSize()), "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), diff --git a/eth/stagedsync/stage_txpool.go b/eth/stagedsync/stage_txpool.go index 0e9fd3221..02ccad0ca 100644 --- a/eth/stagedsync/stage_txpool.go +++ b/eth/stagedsync/stage_txpool.go @@ -127,7 +127,6 @@ func unwindTxPoolUpdate(from, to uint64, pool *core.TxPool, db ethdb.Getter, qui headHeader := rawdb.ReadHeader(db, headHash, from) pool.ResetHead(headHeader.GasLimit, from) canonical := make([]common.Hash, to-from) - currentHeaderIdx := uint64(0) if err := db.Walk(dbutils.HeaderPrefix, dbutils.EncodeBlockNumber(from+1), 0, func(k, v []byte) (bool, error) { if err := common.Stopped(quitCh); err != nil { @@ -138,20 +137,19 @@ func unwindTxPoolUpdate(from, to uint64, pool *core.TxPool, db ethdb.Getter, qui if !dbutils.CheckCanonicalKey(k) { return true, nil } + blockNumber := binary.BigEndian.Uint64(k[:8]) - if currentHeaderIdx >= to-from { // if header stage is ahead of body stage + if blockNumber > to { return false, nil } - copy(canonical[currentHeaderIdx][:], v) - currentHeaderIdx++ + copy(canonical[blockNumber-from-1][:], v) return true, nil }); err != nil { return err } log.Info("unwind TxPoolUpdate: Reading canonical hashes complete", "hashes", len(canonical)) senders := make([][]common.Address, to-from+1) - sendersIdx := uint64(0) if err := db.Walk(dbutils.Senders, dbutils.EncodeBlockNumber(from+1), 0, func(k, v []byte) (bool, error) { if err := common.Stopped(quitCh); err != nil { return false, err @@ -171,8 +169,7 @@ func unwindTxPoolUpdate(from, to uint64, pool *core.TxPool, db ethdb.Getter, qui for i := 0; i < len(sendersArray); i++ { copy(sendersArray[i][:], v[i*common.AddressLength:]) } - senders[sendersIdx] = sendersArray - sendersIdx++ + senders[blockNumber-from-1] = sendersArray return true, nil }); err != nil { log.Error("TxPoolUpdate: walking over sender", "error", err)