Bad rewind issues fix (#1024)

* Unwind over unwound, no rollbacks

* Exclude bits only for StagedSync

* Nitpick
This commit is contained in:
ledgerwatch 2020-09-01 12:50:36 +01:00 committed by GitHub
parent f19bb0a345
commit d8c0c16e40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 38 deletions

View File

@ -1545,28 +1545,30 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin
rollbackErr error
mode = d.getMode()
)
defer func() {
if rollback > 0 {
lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
if mode != LightSync && mode != StagedSync {
lastFastBlock = d.blockchain.CurrentFastBlock().Number()
lastBlock = d.blockchain.CurrentBlock().Number()
if mode != StagedSync {
defer func() {
if rollback > 0 {
lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
if mode != LightSync && mode != StagedSync {
lastFastBlock = d.blockchain.CurrentFastBlock().Number()
lastBlock = d.blockchain.CurrentBlock().Number()
}
if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
// We're already unwinding the stack, only print the error to make it more visible
log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
}
curFastBlock, curBlock := common.Big0, common.Big0
if mode != LightSync && mode != StagedSync {
curFastBlock = d.blockchain.CurrentFastBlock().Number()
curBlock = d.blockchain.CurrentBlock().Number()
}
log.Warn("Rolled back chain segment",
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
}
if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
// We're already unwinding the stack, only print the error to make it more visible
log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
}
curFastBlock, curBlock := common.Big0, common.Big0
if mode != LightSync && mode != StagedSync {
curFastBlock = d.blockchain.CurrentFastBlock().Number()
curBlock = d.blockchain.CurrentBlock().Number()
}
log.Warn("Rolled back chain segment",
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
}
}()
}()
}
// Wait for batches of headers to process
gotHeaders := false
@ -1606,6 +1608,10 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin
return errStallingPeer
}
}
if mode != StagedSync {
// Disable any rollback and return
rollback = 0
}
// If fast or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
@ -1619,8 +1625,6 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin
return errStallingPeer
}
}
// Disable any rollback and return
rollback = 0
return nil
}
// Otherwise split the chunk of headers into batches and process them
@ -1652,7 +1656,6 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin
reorg, forkBlockNumber, err = stagedsync.InsertHeaderChain(d.stateDB, chunk, d.chainConfig, d.blockchain.Engine(), frequency)
if reorg && d.headersUnwinder != nil {
// Need to unwind further stages
if err1 := d.headersUnwinder.UnwindTo(forkBlockNumber, d.stateDB); err1 != nil {
return fmt.Errorf("unwinding all stages to %d: %v", forkBlockNumber, err1)
}
@ -1665,7 +1668,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin
return fmt.Errorf("saving SyncStage Headers progress: %v", err1)
}
}
if err != nil {
if mode != StagedSync && err != nil {
// If some headers were inserted, add them too to the rollback list
if n > 0 && rollback == 0 {
rollback = chunk[0].Number.Uint64()
@ -1675,11 +1678,13 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, blockNumber uin
}
// All verifications passed, track all headers within the alloted limits
head := chunk[len(chunk)-1].Number.Uint64()
if head-rollback > uint64(fsHeaderSafetyNet) {
rollback = head - uint64(fsHeaderSafetyNet)
} else {
rollback = 1
if mode != StagedSync {
head := chunk[len(chunk)-1].Number.Uint64()
if head-rollback > uint64(fsHeaderSafetyNet) {
rollback = head - uint64(fsHeaderSafetyNet)
} else {
rollback = 1
}
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval

View File

@ -183,7 +183,7 @@ func logProgress(prev, now uint64, batch ethdb.DbWithPendingMutations) uint64 {
runtime.ReadMemStats(&m)
log.Info("Executed blocks:",
"currentBlock", now,
"speed (blk/second)", speed,
"blk/second", speed,
"batch", common.StorageSize(batch.BatchSize()),
"alloc", common.StorageSize(m.Alloc),
"sys", common.StorageSize(m.Sys),

View File

@ -127,7 +127,6 @@ func unwindTxPoolUpdate(from, to uint64, pool *core.TxPool, db ethdb.Getter, qui
headHeader := rawdb.ReadHeader(db, headHash, from)
pool.ResetHead(headHeader.GasLimit, from)
canonical := make([]common.Hash, to-from)
currentHeaderIdx := uint64(0)
if err := db.Walk(dbutils.HeaderPrefix, dbutils.EncodeBlockNumber(from+1), 0, func(k, v []byte) (bool, error) {
if err := common.Stopped(quitCh); err != nil {
@ -138,20 +137,19 @@ func unwindTxPoolUpdate(from, to uint64, pool *core.TxPool, db ethdb.Getter, qui
if !dbutils.CheckCanonicalKey(k) {
return true, nil
}
blockNumber := binary.BigEndian.Uint64(k[:8])
if currentHeaderIdx >= to-from { // if header stage is ahead of body stage
if blockNumber > to {
return false, nil
}
copy(canonical[currentHeaderIdx][:], v)
currentHeaderIdx++
copy(canonical[blockNumber-from-1][:], v)
return true, nil
}); err != nil {
return err
}
log.Info("unwind TxPoolUpdate: Reading canonical hashes complete", "hashes", len(canonical))
senders := make([][]common.Address, to-from+1)
sendersIdx := uint64(0)
if err := db.Walk(dbutils.Senders, dbutils.EncodeBlockNumber(from+1), 0, func(k, v []byte) (bool, error) {
if err := common.Stopped(quitCh); err != nil {
return false, err
@ -171,8 +169,7 @@ func unwindTxPoolUpdate(from, to uint64, pool *core.TxPool, db ethdb.Getter, qui
for i := 0; i < len(sendersArray); i++ {
copy(sendersArray[i][:], v[i*common.AddressLength:])
}
senders[sendersIdx] = sendersArray
sendersIdx++
senders[blockNumber-from-1] = sendersArray
return true, nil
}); err != nil {
log.Error("TxPoolUpdate: walking over sender", "error", err)