mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
Delete bad blocks on unwind (#4529)
* Delete bad headers * Delete bad bodies
This commit is contained in:
parent
389af4fc06
commit
9b8888d797
@ -486,7 +486,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
// remove all canonical markers from this point
|
||||
if err = rawdb.TruncateCanonicalHash(tx, progress+1); err != nil {
|
||||
if err = rawdb.TruncateCanonicalHash(tx, progress+1, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = rawdb.TruncateTd(tx, progress+1); err != nil {
|
||||
|
@ -59,8 +59,11 @@ func WriteCanonicalHash(db kv.Putter, hash common.Hash, number uint64) error {
|
||||
}
|
||||
|
||||
// TruncateCanonicalHash removes all the number to hash canonical mapping from block number N
|
||||
func TruncateCanonicalHash(tx kv.RwTx, blockFrom uint64) error {
|
||||
if err := tx.ForEach(kv.HeaderCanonical, dbutils.EncodeBlockNumber(blockFrom), func(k, _ []byte) error {
|
||||
func TruncateCanonicalHash(tx kv.RwTx, blockFrom uint64, deleteHeaders bool) error {
|
||||
if err := tx.ForEach(kv.HeaderCanonical, dbutils.EncodeBlockNumber(blockFrom), func(k, v []byte) error {
|
||||
if deleteHeaders {
|
||||
deleteHeader(tx, common.BytesToHash(v), blockFrom)
|
||||
}
|
||||
return tx.Delete(kv.HeaderCanonical, k, nil)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("TruncateCanonicalHash: %w", err)
|
||||
@ -768,7 +771,7 @@ func MakeBodiesCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix
|
||||
}
|
||||
|
||||
// MakeBodiesNonCanonical - move all txs of canonical blocks to NonCanonicalTxs bucket
|
||||
func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error {
|
||||
func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, deleteBodies bool, ctx context.Context, logPrefix string, logEvery *time.Ticker) error {
|
||||
var firstMovedTxnID uint64
|
||||
var firstMovedTxnIDIsSet bool
|
||||
for blockNum := from; ; blockNum++ {
|
||||
@ -793,17 +796,22 @@ func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPre
|
||||
firstMovedTxnID = bodyForStorage.BaseTxId
|
||||
}
|
||||
|
||||
// move txs to NonCanonical bucket, it has own sequence
|
||||
newBaseId, err := tx.IncrementSequence(kv.NonCanonicalTxs, uint64(bodyForStorage.TxAmount))
|
||||
if err != nil {
|
||||
return err
|
||||
newBaseId := uint64(0)
|
||||
if !deleteBodies {
|
||||
// move txs to NonCanonical bucket, it has own sequence
|
||||
newBaseId, err = tx.IncrementSequence(kv.NonCanonicalTxs, uint64(bodyForStorage.TxAmount))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// next loop does move only non-system txs. need move system-txs manually (because they may not exist)
|
||||
i := uint64(0)
|
||||
if err := tx.ForAmount(kv.EthTx, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId+1), bodyForStorage.TxAmount-2, func(k, v []byte) error {
|
||||
id := newBaseId + 1 + i
|
||||
if err := tx.Put(kv.NonCanonicalTxs, dbutils.EncodeBlockNumber(id), v); err != nil {
|
||||
return err
|
||||
if !deleteBodies {
|
||||
id := newBaseId + 1 + i
|
||||
if err := tx.Put(kv.NonCanonicalTxs, dbutils.EncodeBlockNumber(id), v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := tx.Delete(kv.EthTx, k, nil); err != nil {
|
||||
return err
|
||||
@ -813,9 +821,14 @@ func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPre
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
bodyForStorage.BaseTxId = newBaseId
|
||||
if err := WriteBodyForStorage(tx, h, blockNum, bodyForStorage); err != nil {
|
||||
return err
|
||||
|
||||
if deleteBodies {
|
||||
deleteBody(tx, h, blockNum)
|
||||
} else {
|
||||
bodyForStorage.BaseTxId = newBaseId
|
||||
if err := WriteBodyForStorage(tx, h, blockNum, bodyForStorage); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
@ -1158,8 +1171,8 @@ func WriteBlock(db kv.RwTx, block *types.Block) error {
|
||||
|
||||
// DeleteAncientBlocks - delete [1, to) old blocks after moving it to snapshots.
|
||||
// keeps genesis in db: [1, to)
|
||||
// doesn't change sequnces of kv.EthTx and kv.NonCanonicalTxs
|
||||
// doesn't delete Reciepts, Senders, Canonical markers, TotalDifficulty
|
||||
// doesn't change sequences of kv.EthTx and kv.NonCanonicalTxs
|
||||
// doesn't delete Receipts, Senders, Canonical markers, TotalDifficulty
|
||||
// returns [deletedFrom, deletedTo)
|
||||
func DeleteAncientBlocks(tx kv.RwTx, blockTo uint64, blocksDeleteLimit int) (deletedFrom, deletedTo uint64, err error) {
|
||||
c, err := tx.Cursor(kv.Headers)
|
||||
@ -1283,8 +1296,8 @@ func SecondKey(tx kv.Tx, table string) ([]byte, error) {
|
||||
}
|
||||
|
||||
// TruncateBlocks - delete block >= blockFrom
|
||||
// does decrement sequnces of kv.EthTx and kv.NonCanonicalTxs
|
||||
// doesn't delete Reciepts, Senders, Canonical markers, TotalDifficulty
|
||||
// does decrement sequences of kv.EthTx and kv.NonCanonicalTxs
|
||||
// doesn't delete Receipts, Senders, Canonical markers, TotalDifficulty
|
||||
func TruncateBlocks(ctx context.Context, tx kv.RwTx, blockFrom uint64) error {
|
||||
logEvery := time.NewTicker(20 * time.Second)
|
||||
defer logEvery.Stop()
|
||||
|
@ -302,7 +302,7 @@ func TestCanonicalMappingStorage(t *testing.T) {
|
||||
t.Fatalf("Retrieved canonical mapping mismatch: have %v, want %v", entry, hash)
|
||||
}
|
||||
// Delete the TD and verify the execution
|
||||
err = TruncateCanonicalHash(tx, number)
|
||||
err = TruncateCanonicalHash(tx, number, false)
|
||||
if err != nil {
|
||||
t.Fatalf("DeleteCanonicalHash failed: %v", err)
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ func ResetBlocks(tx kv.RwTx) error {
|
||||
}
|
||||
|
||||
// remove all canonical markers from this point
|
||||
if err := rawdb.TruncateCanonicalHash(tx, 1); err != nil {
|
||||
if err := rawdb.TruncateCanonicalHash(tx, 1, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := rawdb.TruncateTd(tx, 1); err != nil {
|
||||
@ -69,7 +69,7 @@ func ResetBlocks(tx kv.RwTx) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// ensure no grabage records left (it may happen if db is inconsistent)
|
||||
// ensure no garbage records left (it may happen if db is inconsistent)
|
||||
if err := tx.ForEach(kv.BlockBody, dbutils.EncodeBlockNumber(2), func(k, _ []byte) error { return tx.Delete(kv.BlockBody, k, nil) }); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"github.com/c2h5oh/datasize"
|
||||
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon/common"
|
||||
"github.com/ledgerwatch/erigon/core/rawdb"
|
||||
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
|
||||
"github.com/ledgerwatch/erigon/params"
|
||||
@ -281,7 +282,8 @@ func UnwindBodiesStage(u *UnwindState, tx kv.RwTx, cfg BodiesCfg, ctx context.Co
|
||||
logEvery := time.NewTicker(logInterval)
|
||||
defer logEvery.Stop()
|
||||
|
||||
if err := rawdb.MakeBodiesNonCanonical(tx, u.UnwindPoint+1, ctx, u.LogPrefix(), logEvery); err != nil {
|
||||
badBlock := u.BadBlock != (common.Hash{})
|
||||
if err := rawdb.MakeBodiesNonCanonical(tx, u.UnwindPoint+1, badBlock /* deleteBodies */, ctx, u.LogPrefix(), logEvery); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ func TestBodiesUnwind(t *testing.T) {
|
||||
require.NoError(err)
|
||||
}
|
||||
{
|
||||
err = rawdb.MakeBodiesNonCanonical(tx, 5+1, ctx, "test", logEvery) // block 5 already canonical, start from next one
|
||||
err = rawdb.MakeBodiesNonCanonical(tx, 5+1, false, ctx, "test", logEvery) // block 5 already canonical, start from next one
|
||||
require.NoError(err)
|
||||
|
||||
n, err := tx.ReadSequence(kv.EthTx)
|
||||
@ -61,7 +61,7 @@ func TestBodiesUnwind(t *testing.T) {
|
||||
|
||||
{
|
||||
// unwind to block 5, means mark blocks >= 6 as non-canonical
|
||||
err = rawdb.MakeBodiesNonCanonical(tx, 5+1, ctx, "test", logEvery)
|
||||
err = rawdb.MakeBodiesNonCanonical(tx, 5+1, false, ctx, "test", logEvery)
|
||||
require.NoError(err)
|
||||
|
||||
n, err := tx.ReadSequence(kv.EthTx)
|
||||
|
@ -953,7 +953,7 @@ func HeadersUnwind(u *UnwindState, s *StageState, tx kv.RwTx, cfg HeadersCfg, te
|
||||
return fmt.Errorf("iterate over headers to mark bad headers: %w", err)
|
||||
}
|
||||
}
|
||||
if err := rawdb.TruncateCanonicalHash(tx, u.UnwindPoint+1); err != nil {
|
||||
if err := rawdb.TruncateCanonicalHash(tx, u.UnwindPoint+1, badBlock /* deleteHeaders */); err != nil {
|
||||
return err
|
||||
}
|
||||
if badBlock {
|
||||
@ -996,6 +996,11 @@ func HeadersUnwind(u *UnwindState, s *StageState, tx kv.RwTx, cfg HeadersCfg, te
|
||||
return err
|
||||
}
|
||||
}
|
||||
/* TODO(yperbasis): Is it safe?
|
||||
if err := rawdb.TruncateTd(tx, u.UnwindPoint+1); err != nil {
|
||||
return err
|
||||
}
|
||||
*/
|
||||
if maxNum == 0 {
|
||||
maxNum = u.UnwindPoint
|
||||
if maxHash, err = rawdb.ReadCanonicalHash(tx, maxNum); err != nil {
|
||||
|
@ -41,7 +41,7 @@ func TestTxsBeginEnd(t *testing.T) {
|
||||
return err
|
||||
}
|
||||
|
||||
err = rawdb.TruncateCanonicalHash(tx, 7)
|
||||
err = rawdb.TruncateCanonicalHash(tx, 7, false)
|
||||
for i := uint64(7); i < 10; i++ {
|
||||
require.NoError(err)
|
||||
hash := common.Hash{0xa, byte(i)}
|
||||
|
Loading…
Reference in New Issue
Block a user