diff --git a/cmd/downloader/downloader/torrentcfg/logger.go b/cmd/downloader/downloader/torrentcfg/logger.go index ad849f9d6..c4abac879 100644 --- a/cmd/downloader/downloader/torrentcfg/logger.go +++ b/cmd/downloader/downloader/torrentcfg/logger.go @@ -67,6 +67,6 @@ func (b adapterHandler) Handle(r lg.Record) { log.Error(str) default: - log.Warn("unknown logtype", "msg", r.String()) + log.Debug(r.String(), "log_type", "unknown") } } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 91cc19932..a25b0aa4a 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -36,6 +36,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/kvcache" "github.com/ledgerwatch/erigon-lib/txpool" "github.com/ledgerwatch/erigon/cmd/downloader/downloader/torrentcfg" + "github.com/ledgerwatch/log/v3" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/urfave/cli" @@ -43,8 +44,6 @@ import ( "github.com/ledgerwatch/erigon/eth/protocols/eth" "github.com/ledgerwatch/erigon/params/networkname" - "github.com/ledgerwatch/log/v3" - "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/paths" "github.com/ledgerwatch/erigon/consensus/ethash" diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 3249a0dac..6d72abcfe 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -362,11 +362,11 @@ func WriteRawTransactions(db kv.StatelessWriteTx, txs [][]byte, baseTxId uint64) for _, tx := range txs { txIdKey := make([]byte, 8) binary.BigEndian.PutUint64(txIdKey, txId) - txId++ // If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine. if err := db.Append(kv.EthTx, txIdKey, tx); err != nil { return err } + txId++ } return nil } @@ -401,7 +401,7 @@ func ReadBodyWithTransactions(db kv.Getter, hash common.Hash, number uint64) (*t if canonicalHash == hash { return ReadCanonicalBodyWithTransactions(db, hash, number), nil } - return ReadNonCanonicalBodyWithTransactions(db, hash, number), nil + return NonCanonicalBodyWithTransactions(db, hash, number), nil } func ReadCanonicalBodyWithTransactions(db kv.Getter, hash common.Hash, number uint64) *types.Body { @@ -418,20 +418,6 @@ func ReadCanonicalBodyWithTransactions(db kv.Getter, hash common.Hash, number ui return body } -func ReadNonCanonicalBodyWithTransactions(db kv.Getter, hash common.Hash, number uint64) *types.Body { - body, baseTxId, txAmount := ReadBody(db, hash, number) - if body == nil { - return nil - } - var err error - body.Transactions, err = NonCanonicalTransactions(db, baseTxId, txAmount) - if err != nil { - log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err) - return nil - } - return body -} - func NonCanonicalBodyWithTransactions(db kv.Getter, hash common.Hash, number uint64) *types.Body { body, baseTxId, txAmount := ReadBody(db, hash, number) if body == nil { @@ -523,7 +509,10 @@ func ReadBody(db kv.Getter, hash common.Hash, number uint64) (*types.Body, uint6 } body := new(types.Body) body.Uncles = bodyForStorage.Uncles - return body, bodyForStorage.BaseTxId, bodyForStorage.TxAmount + if bodyForStorage.TxAmount < 2 { + panic(fmt.Sprintf("block body hash too few txs amount: %d, %d", number, bodyForStorage.TxAmount)) + } + return body, bodyForStorage.BaseTxId + 1, bodyForStorage.TxAmount - 2 // 1 system txn in the begining of block, and 1 at the end } func ReadSenders(db kv.Getter, hash common.Hash, number uint64) ([]common.Address, error) { @@ -550,19 +539,19 @@ func WriteRawBodyIfNotExists(db kv.StatelessRwTx, hash common.Hash, number uint6 } func WriteRawBody(db kv.StatelessRwTx, hash common.Hash, number uint64, body *types.RawBody) error { - baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))) + baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))+2) if err != nil { return err } data := types.BodyForStorage{ BaseTxId: baseTxId, - TxAmount: uint32(len(body.Transactions)), + TxAmount: uint32(len(body.Transactions)) + 2, Uncles: body.Uncles, } if err = WriteBodyForStorage(db, hash, number, &data); err != nil { return fmt.Errorf("failed to write body: %w", err) } - if err = WriteRawTransactions(db, body.Transactions, baseTxId); err != nil { + if err = WriteRawTransactions(db, body.Transactions, baseTxId+1); err != nil { return fmt.Errorf("failed to WriteRawTransactions: %w", err) } return nil @@ -571,19 +560,19 @@ func WriteRawBody(db kv.StatelessRwTx, hash common.Hash, number uint64, body *ty func WriteBody(db kv.RwTx, hash common.Hash, number uint64, body *types.Body) error { // Pre-processing body.SendersFromTxs() - baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))) + baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))+2) if err != nil { return err } data := types.BodyForStorage{ BaseTxId: baseTxId, - TxAmount: uint32(len(body.Transactions)), + TxAmount: uint32(len(body.Transactions)) + 2, Uncles: body.Uncles, } if err := WriteBodyForStorage(db, hash, number, &data); err != nil { return fmt.Errorf("failed to write body: %w", err) } - err = WriteTransactions(db, body.Transactions, baseTxId) + err = WriteTransactions(db, body.Transactions, baseTxId+1) if err != nil { return fmt.Errorf("failed to WriteTransactions: %w", err) } @@ -632,13 +621,18 @@ func MakeBodiesCanonical(tx kv.StatelessRwTx, from uint64, ctx context.Context, return err } - id := newBaseId - if err := tx.ForAmount(kv.NonCanonicalTxs, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId), bodyForStorage.TxAmount, func(k, v []byte) error { + // next loop does move only non-system txs. need move system-txs manually (because they may not exist) + i := uint64(0) + if err := tx.ForAmount(kv.NonCanonicalTxs, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId+1), bodyForStorage.TxAmount-2, func(k, v []byte) error { + id := newBaseId + 1 + i if err := tx.Put(kv.EthTx, dbutils.EncodeBlockNumber(id), v); err != nil { return err } - id++ - return tx.Delete(kv.NonCanonicalTxs, k, nil) + if err := tx.Delete(kv.NonCanonicalTxs, k, nil); err != nil { + return err + } + i++ + return nil }); err != nil { return err } @@ -661,6 +655,8 @@ func MakeBodiesCanonical(tx kv.StatelessRwTx, from uint64, ctx context.Context, // MakeBodiesNonCanonical - move all txs of canonical blocks to NonCanonicalTxs bucket func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error { + var firstMovedTxnID uint64 + var firstMovedTxnIDIsSet bool for blockNum := from; ; blockNum++ { h, err := ReadCanonicalHash(tx, blockNum) if err != nil { @@ -678,23 +674,31 @@ func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPre if err := rlp.DecodeBytes(data, bodyForStorage); err != nil { return err } + if !firstMovedTxnIDIsSet { + firstMovedTxnIDIsSet = true + firstMovedTxnID = bodyForStorage.BaseTxId + } // move txs to NonCanonical bucket, it has own sequence newBaseId, err := tx.IncrementSequence(kv.NonCanonicalTxs, uint64(bodyForStorage.TxAmount)) if err != nil { return err } - id := newBaseId - if err := tx.ForAmount(kv.EthTx, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId), bodyForStorage.TxAmount, func(k, v []byte) error { + // next loop does move only non-system txs. need move system-txs manually (because they may not exist) + i := uint64(0) + if err := tx.ForAmount(kv.EthTx, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId+1), bodyForStorage.TxAmount-2, func(k, v []byte) error { + id := newBaseId + 1 + i if err := tx.Put(kv.NonCanonicalTxs, dbutils.EncodeBlockNumber(id), v); err != nil { return err } - id++ - return tx.Delete(kv.EthTx, k, nil) + if err := tx.Delete(kv.EthTx, k, nil); err != nil { + return err + } + i++ + return nil }); err != nil { return err } - bodyForStorage.BaseTxId = newBaseId if err := WriteBodyForStorage(tx, h, blockNum, bodyForStorage); err != nil { return err @@ -710,23 +714,11 @@ func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPre } // EthTx must have canonical id's - means need decrement it's sequence on unwind - c, err := tx.Cursor(kv.EthTx) - if err != nil { - return err + if firstMovedTxnIDIsSet { + if err := ResetSequence(tx, kv.EthTx, firstMovedTxnID); err != nil { + return err + } } - defer c.Close() - k, _, err := c.Last() - if err != nil { - return err - } - var nextTxID uint64 - if k != nil { - nextTxID = binary.BigEndian.Uint64(k) + 1 - } - if err := ResetSequence(tx, kv.EthTx, nextTxID); err != nil { - return err - } - return nil } diff --git a/eth/stagedsync/stage_bodies_test.go b/eth/stagedsync/stage_bodies_test.go index 5628718d8..9580d322d 100644 --- a/eth/stagedsync/stage_bodies_test.go +++ b/eth/stagedsync/stage_bodies_test.go @@ -40,14 +40,14 @@ func TestBodiesUnwind(t *testing.T) { n, err := tx.ReadSequence(kv.EthTx) require.NoError(err) - require.Equal(5*3, int(n)) // from 0, 5 block with 3 txn in each + require.Equal(5*(3+2), int(n)) // from 0, 5 block with 3 txn in each } { err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery) // block 5 already canonical, start from next one require.NoError(err) n, err := tx.ReadSequence(kv.EthTx) require.NoError(err) - require.Equal(10*3, int(n)) + require.Equal(10*(3+2), int(n)) err = rawdb.WriteRawBody(tx, common.Hash{11}, 11, b) require.NoError(err) @@ -56,7 +56,7 @@ func TestBodiesUnwind(t *testing.T) { n, err = tx.ReadSequence(kv.EthTx) require.NoError(err) - require.Equal(11*3, int(n)) + require.Equal(11*(3+2), int(n)) } { @@ -66,12 +66,12 @@ func TestBodiesUnwind(t *testing.T) { n, err := tx.ReadSequence(kv.EthTx) require.NoError(err) - require.Equal(5*3, int(n)) // from 0, 5 block with 3 txn in each + require.Equal(5*(3+2), int(n)) // from 0, 5 block with 3 txn in each err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery) // block 5 already canonical, start from next one require.NoError(err) n, err = tx.ReadSequence(kv.EthTx) require.NoError(err) - require.Equal(11*3, int(n)) + require.Equal(11*(3+2), int(n)) } } diff --git a/eth/stagedsync/stage_senders_test.go b/eth/stagedsync/stage_senders_test.go index 9701c3b91..addca9fa3 100644 --- a/eth/stagedsync/stage_senders_test.go +++ b/eth/stagedsync/stage_senders_test.go @@ -137,15 +137,14 @@ func TestSenders(t *testing.T) { assert.Equal(t, 0, len(senders)) } { - txs, err := rawdb.CanonicalTransactions(tx, 0, 2) + txs, err := rawdb.CanonicalTransactions(tx, 1, 2) assert.NoError(t, err) assert.Equal(t, 2, len(txs)) - txs, err = rawdb.CanonicalTransactions(tx, 2, 3) + txs, err = rawdb.CanonicalTransactions(tx, 5, 3) assert.NoError(t, err) assert.Equal(t, 3, len(txs)) - txs, err = rawdb.CanonicalTransactions(tx, 0, 1024) + txs, err = rawdb.CanonicalTransactions(tx, 5, 1024) assert.NoError(t, err) - assert.Equal(t, 5, len(txs)) + assert.Equal(t, 3, len(txs)) } - } diff --git a/eth/stagedsync/stage_txlookup.go b/eth/stagedsync/stage_txlookup.go index 25c894b71..e921c8b80 100644 --- a/eth/stagedsync/stage_txlookup.go +++ b/eth/stagedsync/stage_txlookup.go @@ -160,7 +160,7 @@ func unwindTxLookup(u *UnwindState, s *StageState, tx kv.RwTx, cfg TxLookupCfg, return fmt.Errorf("rlp decode err: %w", err) } - txs, err := rawdb.CanonicalTransactions(tx, body.BaseTxId, body.TxAmount) + txs, err := rawdb.CanonicalTransactions(tx, body.BaseTxId+1, body.TxAmount-2) if err != nil { return err } @@ -232,7 +232,7 @@ func pruneTxLookup(tx kv.RwTx, logPrefix, tmpDir string, s *PruneState, pruneTo return fmt.Errorf("rlp decode: %w", err) } - txs, err := rawdb.CanonicalTransactions(tx, body.BaseTxId, body.TxAmount) + txs, err := rawdb.CanonicalTransactions(tx, body.BaseTxId+1, body.TxAmount-2) if err != nil { return err } diff --git a/go.mod b/go.mod index 0b648df80..9ec1c1ff7 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible - github.com/ledgerwatch/erigon-lib v0.0.0-20220309172522-75b52ac25e10 + github.com/ledgerwatch/erigon-lib v0.0.0-20220310020438-6f5ec338cbc1 github.com/ledgerwatch/log/v3 v3.4.1 github.com/ledgerwatch/secp256k1 v1.0.0 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect diff --git a/go.sum b/go.sum index c6b67ae42..9fb1452d0 100644 --- a/go.sum +++ b/go.sum @@ -641,8 +641,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220309172522-75b52ac25e10 h1:WpvYRV25W89qbzfSKRrxPK5l1XVRE3Y297fiSfPE8wU= -github.com/ledgerwatch/erigon-lib v0.0.0-20220309172522-75b52ac25e10/go.mod h1:SlDiODxcwf99wgSRhp/JO5piRwUJI6AN7WB5aQ5N7u8= +github.com/ledgerwatch/erigon-lib v0.0.0-20220310020438-6f5ec338cbc1 h1:g3nSeta0vg4XPibGikejEz9PCMVGnk01PTLgUecbEyU= +github.com/ledgerwatch/erigon-lib v0.0.0-20220310020438-6f5ec338cbc1/go.mod h1:SlDiODxcwf99wgSRhp/JO5piRwUJI6AN7WB5aQ5N7u8= github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc= github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ= diff --git a/migrations/migrations.go b/migrations/migrations.go index 819b2565a..501805a4b 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -32,6 +32,7 @@ import ( var migrations = map[kv.Label][]Migration{ kv.ChainDB: { dbSchemaVersion5, + txsBeginEnd, }, kv.TxPoolDB: {}, kv.SentryDB: {}, @@ -45,7 +46,7 @@ type Migration struct { var ( ErrMigrationNonUniqueName = fmt.Errorf("please provide unique migration name") - ErrMigrationCommitNotCalled = fmt.Errorf("migration commit function was not called") + ErrMigrationCommitNotCalled = fmt.Errorf("migration before-commit function was not called") ErrMigrationETLFilesDeleted = fmt.Errorf("db migration progress was interrupted after extraction step and ETL files was deleted, please contact development team for help or re-sync from scratch") ) @@ -167,7 +168,7 @@ func (m *Migrator) Apply(db kv.RwDB, datadir string) error { return err } if err := m.VerifyVersion(db); err != nil { - return err + return fmt.Errorf("migrator.Apply: %w", err) } // migration names must be unique, protection against people's mistake @@ -194,7 +195,7 @@ func (m *Migrator) Apply(db kv.RwDB, datadir string) error { progress, err = tx.GetOne(kv.Migrations, []byte("_progress_"+v.Name)) return err }); err != nil { - return err + return fmt.Errorf("migrator.Apply: %w", err) } if err := v.Up(db, filepath.Join(datadir, "migrations", v.Name), progress, func(tx kv.RwTx, key []byte, isDone bool) error { @@ -224,7 +225,7 @@ func (m *Migrator) Apply(db kv.RwDB, datadir string) error { return nil }); err != nil { - return err + return fmt.Errorf("migrator.Apply.Up: %s, %w", v.Name, err) } if !callbackCalled { @@ -243,7 +244,7 @@ func (m *Migrator) Apply(db kv.RwDB, datadir string) error { } return nil }); err != nil { - return err + return fmt.Errorf("migrator.Apply: %w", err) } log.Info("Updated DB schema to", "version", fmt.Sprintf("%d.%d.%d", kv.DBSchemaVersion.Major, kv.DBSchemaVersion.Minor, kv.DBSchemaVersion.Patch)) return nil diff --git a/migrations/txs_begin_end.go b/migrations/txs_begin_end.go new file mode 100644 index 000000000..f675e5e0e --- /dev/null +++ b/migrations/txs_begin_end.go @@ -0,0 +1,354 @@ +package migrations + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "runtime" + "time" + + common2 "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/common/dbutils" + "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/log/v3" +) + +const ASSERT = false + +var ErrTxsBeginEndNoMigration = fmt.Errorf("in this Erigon version DB format was changed: added additional first/last system-txs to blocks. There is no DB migration for this change. Please re-sync or switch to earlier version") + +var txsBeginEnd = Migration{ + Name: "txs_begin_end", + Up: func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) { + logEvery := time.NewTicker(10 * time.Second) + defer logEvery.Stop() + + var latestBlock uint64 + if err := db.View(context.Background(), func(tx kv.Tx) error { + bodiesProgress, err := stages.GetStageProgress(tx, stages.Bodies) + if err != nil { + return err + } + if progress != nil { + latestBlock = binary.BigEndian.Uint64(progress) + log.Info("[migration] Continue migration", "from_block", latestBlock) + } else { + latestBlock = bodiesProgress + 1 // include block 0 + } + return nil + }); err != nil { + return err + } + + tx, err := db.BeginRw(context.Background()) + if err != nil { + return err + } + defer tx.Rollback() + + numBuf := make([]byte, 8) + numHashBuf := make([]byte, 8+32) + for i := int(latestBlock); i >= 0; i-- { + blockNum := uint64(i) + + select { + case <-logEvery.C: + var m runtime.MemStats + runtime.ReadMemStats(&m) + log.Info("[migration] Adding system-txs", + "progress", fmt.Sprintf("%.2f%%", 100-100*float64(blockNum)/float64(latestBlock)), "block_num", blockNum, + "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) + default: + } + + canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum) + if err != nil { + return err + } + + var oldBlock *types.Body + if ASSERT { + oldBlock = readCanonicalBodyWithTransactionsDeprecated(tx, canonicalHash, blockNum) + } + + binary.BigEndian.PutUint64(numHashBuf[:8], blockNum) + copy(numHashBuf[8:], canonicalHash[:]) + b, err := rawdb.ReadBodyForStorageByKey(tx, numHashBuf) + if err != nil { + return err + } + if b == nil { + continue + } + + txs, err := rawdb.CanonicalTransactions(tx, b.BaseTxId, b.TxAmount) + if err != nil { + return err + } + + b.BaseTxId += (blockNum) * 2 + b.TxAmount += 2 + if err := rawdb.WriteBodyForStorage(tx, canonicalHash, blockNum, b); err != nil { + return fmt.Errorf("failed to write body: %w", err) + } + + // del first tx in block + if err = tx.Delete(kv.EthTx, dbutils.EncodeBlockNumber(b.BaseTxId), nil); err != nil { + return err + } + if err := writeTransactionsNewDeprecated(tx, txs, b.BaseTxId+1); err != nil { + return fmt.Errorf("failed to write body txs: %w", err) + } + // del last tx in block + if err = tx.Delete(kv.EthTx, dbutils.EncodeBlockNumber(b.BaseTxId+uint64(b.TxAmount)-1), nil); err != nil { + return err + } + + if ASSERT { + newBlock, baseTxId, txAmount := rawdb.ReadBody(tx, canonicalHash, blockNum) + newBlock.Transactions, err = rawdb.CanonicalTransactions(tx, baseTxId, txAmount) + for i, oldTx := range oldBlock.Transactions { + newTx := newBlock.Transactions[i] + if oldTx.GetNonce() != newTx.GetNonce() { + panic(blockNum) + } + } + } + + if err = tx.ForPrefix(kv.BlockBody, numHashBuf[:8], func(k, v []byte) error { + if bytes.Equal(k, numHashBuf) { // don't delete canonical blocks + return nil + } + bodyForStorage := new(types.BodyForStorage) + if err := rlp.DecodeBytes(v, bodyForStorage); err != nil { + return err + } + + for i := bodyForStorage.BaseTxId; i < bodyForStorage.BaseTxId+uint64(bodyForStorage.TxAmount); i++ { + binary.BigEndian.PutUint64(numBuf, i) + if err = tx.Delete(kv.NonCanonicalTxs, numBuf, nil); err != nil { + return err + } + } + + if err = tx.Delete(kv.BlockBody, k, nil); err != nil { + return err + } + if err = tx.Delete(kv.Headers, k, nil); err != nil { + return err + } + if err = tx.Delete(kv.HeaderTD, k, nil); err != nil { + return err + } + if err = tx.Delete(kv.HeaderNumber, k[8:], nil); err != nil { + return err + } + if err = tx.Delete(kv.HeaderNumber, k[8:], nil); err != nil { + return err + } + + return nil + }); err != nil { + return err + } + + binary.BigEndian.PutUint64(numBuf, blockNum) + if err := BeforeCommit(tx, numBuf, false); err != nil { + return err + } + if blockNum%10_000 == 0 { + if err := tx.Commit(); err != nil { + return err + } + tx, err = db.BeginRw(context.Background()) + if err != nil { + return err + } + } + } + + if err := tx.Commit(); err != nil { + return err + } + + return db.Update(context.Background(), func(tx kv.RwTx) error { + // reset non-canonical sequence to 0 + v, err := tx.ReadSequence(kv.NonCanonicalTxs) + if err != nil { + return err + } + if _, err := tx.IncrementSequence(kv.NonCanonicalTxs, -v); err != nil { + return err + } + + { + c, err := tx.Cursor(kv.HeaderCanonical) + if err != nil { + return err + } + k, v, err := c.Last() + if err != nil { + return err + } + data, err := tx.GetOne(kv.BlockBody, append(k, v...)) + if err != nil { + return err + } + var newSeqValue uint64 + if len(data) > 0 { + bodyForStorage := new(types.BodyForStorage) + if err := rlp.DecodeBytes(data, bodyForStorage); err != nil { + return fmt.Errorf("rlp.DecodeBytes(bodyForStorage): %w", err) + } + currentSeq, err := tx.ReadSequence(kv.EthTx) + if err != nil { + return err + } + newSeqValue = bodyForStorage.BaseTxId + uint64(bodyForStorage.TxAmount) - currentSeq + } + + if _, err := tx.IncrementSequence(kv.EthTx, newSeqValue); err != nil { + return err + } + } + + // This migration is no-op, but it forces the migration mechanism to apply it and thus write the DB schema version info + return BeforeCommit(tx, nil, true) + }) + }, +} + +func writeRawBodyDeprecated(db kv.StatelessRwTx, hash common.Hash, number uint64, body *types.RawBody) error { + baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))) + if err != nil { + return err + } + data := types.BodyForStorage{ + BaseTxId: baseTxId, + TxAmount: uint32(len(body.Transactions)), + Uncles: body.Uncles, + } + if err = rawdb.WriteBodyForStorage(db, hash, number, &data); err != nil { + return fmt.Errorf("failed to write body: %w", err) + } + if err = rawdb.WriteRawTransactions(db, body.Transactions, baseTxId); err != nil { + return fmt.Errorf("failed to WriteRawTransactions: %w", err) + } + return nil +} +func writeTransactionsNewDeprecated(db kv.RwTx, txs []types.Transaction, baseTxId uint64) error { + txId := baseTxId + buf := bytes.NewBuffer(nil) + for _, tx := range txs { + txIdKey := make([]byte, 8) + binary.BigEndian.PutUint64(txIdKey, txId) + + buf.Reset() + if err := rlp.Encode(buf, tx); err != nil { + return fmt.Errorf("broken tx rlp: %w", err) + } + // If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine. + if err := db.Put(kv.EthTx, txIdKey, common.CopyBytes(buf.Bytes())); err != nil { + return err + } + txId++ + } + return nil +} + +func readCanonicalBodyWithTransactionsDeprecated(db kv.Getter, hash common.Hash, number uint64) *types.Body { + data := rawdb.ReadStorageBodyRLP(db, hash, number) + if len(data) == 0 { + return nil + } + bodyForStorage := new(types.BodyForStorage) + err := rlp.DecodeBytes(data, bodyForStorage) + if err != nil { + log.Error("Invalid block body RLP", "hash", hash, "err", err) + return nil + } + body := new(types.Body) + body.Uncles = bodyForStorage.Uncles + body.Transactions, err = rawdb.CanonicalTransactions(db, bodyForStorage.BaseTxId, bodyForStorage.TxAmount) + if err != nil { + log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err) + return nil + } + return body +} + +func makeBodiesNonCanonicalDeprecated(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error { + for blockNum := from; ; blockNum++ { + h, err := rawdb.ReadCanonicalHash(tx, blockNum) + if err != nil { + return err + } + if h == (common.Hash{}) { + break + } + data := rawdb.ReadStorageBodyRLP(tx, h, blockNum) + if len(data) == 0 { + break + } + + bodyForStorage := new(types.BodyForStorage) + if err := rlp.DecodeBytes(data, bodyForStorage); err != nil { + return err + } + + // move txs to NonCanonical bucket, it has own sequence + newBaseId, err := tx.IncrementSequence(kv.NonCanonicalTxs, uint64(bodyForStorage.TxAmount)) + if err != nil { + return err + } + id := newBaseId + if err := tx.ForAmount(kv.EthTx, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId), bodyForStorage.TxAmount, func(k, v []byte) error { + if err := tx.Put(kv.NonCanonicalTxs, dbutils.EncodeBlockNumber(id), v); err != nil { + return err + } + id++ + return tx.Delete(kv.EthTx, k, nil) + }); err != nil { + return err + } + + bodyForStorage.BaseTxId = newBaseId + if err := rawdb.WriteBodyForStorage(tx, h, blockNum, bodyForStorage); err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-logEvery.C: + log.Info(fmt.Sprintf("[%s] Unwinding transactions...", logPrefix), "current block", blockNum) + default: + } + } + + // EthTx must have canonical id's - means need decrement it's sequence on unwind + c, err := tx.Cursor(kv.EthTx) + if err != nil { + return err + } + defer c.Close() + k, _, err := c.Last() + if err != nil { + return err + } + var nextTxID uint64 + if k != nil { + nextTxID = binary.BigEndian.Uint64(k) + 1 + } + if err := rawdb.ResetSequence(tx, kv.EthTx, nextTxID); err != nil { + return err + } + + return nil +} diff --git a/migrations/txs_begin_end_test.go b/migrations/txs_begin_end_test.go new file mode 100644 index 000000000..f15cc892a --- /dev/null +++ b/migrations/txs_begin_end_test.go @@ -0,0 +1,103 @@ +package migrations + +import ( + "bytes" + "context" + "encoding/binary" + "testing" + "time" + + "github.com/ledgerwatch/erigon-lib/common/u256" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/memdb" + "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/common/dbutils" + "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/stretchr/testify/require" +) + +func TestTxsBeginEnd(t *testing.T) { + require, tmpDir, db := require.New(t), t.TempDir(), memdb.NewTestDB(t) + txn := &types.DynamicFeeTransaction{Tip: u256.N1, FeeCap: u256.N1, CommonTx: types.CommonTx{ChainID: u256.N1, Value: u256.N1, Gas: 1, Nonce: 1}} + buf := bytes.NewBuffer(nil) + err := txn.MarshalBinary(buf) + require.NoError(err) + rlpTxn := buf.Bytes() + logEvery := time.NewTicker(10 * time.Second) + defer logEvery.Stop() + + b := &types.RawBody{Transactions: [][]byte{rlpTxn, rlpTxn, rlpTxn}} + err = db.Update(context.Background(), func(tx kv.RwTx) error { + for i := uint64(0); i < 10; i++ { + hash := common.Hash{byte(i)} + err = writeRawBodyDeprecated(tx, hash, i, b) + require.NoError(err) + err = rawdb.WriteCanonicalHash(tx, hash, i) + require.NoError(err) + } + if err := makeBodiesNonCanonicalDeprecated(tx, 7, context.Background(), "", logEvery); err != nil { + return err + } + + for i := uint64(7); i < 10; i++ { + err = rawdb.DeleteCanonicalHash(tx, i) + require.NoError(err) + hash := common.Hash{0xa, byte(i)} + err = writeRawBodyDeprecated(tx, hash, i, b) + require.NoError(err) + err = rawdb.WriteCanonicalHash(tx, hash, i) + require.NoError(err) + } + if err := stages.SaveStageProgress(tx, stages.Bodies, 9); err != nil { + return err + } + return nil + }) + require.NoError(err) + + migrator := NewMigrator(kv.ChainDB) + migrator.Migrations = []Migration{txsBeginEnd} + err = migrator.Apply(db, tmpDir) + require.NoError(err) + + err = db.View(context.Background(), func(tx kv.Tx) error { + v, err := tx.ReadSequence(kv.EthTx) + require.NoError(err) + require.Equal(uint64(3*10+2*10), v) + return nil + }) + require.NoError(err) + + err = db.View(context.Background(), func(tx kv.Tx) error { + for i := uint64(7); i < 10; i++ { + hash := common.Hash{byte(i)} + k := make([]byte, 8+32) + binary.BigEndian.PutUint64(k, 7) + copy(k[8:], hash[:]) + + has, err := tx.Has(kv.BlockBody, k) + require.NoError(err) + require.False(has) + } + + c, err := tx.Cursor(kv.NonCanonicalTxs) + require.NoError(err) + cnt, err := c.Count() + require.NoError(err) + require.Zero(cnt) + + v, err := tx.ReadSequence(kv.NonCanonicalTxs) + require.NoError(err) + require.Zero(v) + + has, err := tx.Has(kv.EthTx, dbutils.EncodeBlockNumber(0)) + require.NoError(err) + require.False(has) + + return nil + }) + require.NoError(err) + +} diff --git a/node/node_test.go b/node/node_test.go index 6edbacaa7..f8046af41 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -33,6 +33,7 @@ import ( "github.com/ledgerwatch/erigon/p2p" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/log/v3" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/assert" ) @@ -172,7 +173,7 @@ func TestNodeCloseClosesDB(t *testing.T) { stack, _ := New(testNodeConfig(t)) defer stack.Close() - db, err := OpenDatabase(stack.Config(), log.New(), kv.ChainDB) + db, err := OpenDatabase(stack.Config(), log.New(), kv.SentryDB) if err != nil { t.Fatal("can't open DB:", err) } @@ -196,14 +197,14 @@ func TestNodeOpenDatabaseFromLifecycleStart(t *testing.T) { t.Skip("fix me on win please") } - stack, _ := New(testNodeConfig(t)) + stack, err := New(testNodeConfig(t)) + require.NoError(t, err) defer stack.Close() var db kv.RwDB - var err error stack.RegisterLifecycle(&InstrumentedService{ startHook: func() { - db, err = OpenDatabase(stack.Config(), log.New(), kv.ChainDB) + db, err = OpenDatabase(stack.Config(), log.New(), kv.SentryDB) if err != nil { t.Fatal("can't open DB:", err) } diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index 2e9a6cb6a..56eae4283 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -251,7 +251,7 @@ func snapshotBlocks(ctx context.Context, chainDB kv.RoDB, fromBlock, toBlock, bl workers = 1 } if err := snapshotsync.DumpBlocks(ctx, fromBlock, last, blocksPerFile, tmpDir, snapshotDir, chainDB, workers, log.LvlInfo); err != nil { - return err + return fmt.Errorf("DumpBlocks: %w", err) } return nil } diff --git a/turbo/snapshotsync/block_reader.go b/turbo/snapshotsync/block_reader.go index 5a81e2d3b..7cf40190f 100644 --- a/turbo/snapshotsync/block_reader.go +++ b/turbo/snapshotsync/block_reader.go @@ -3,6 +3,7 @@ package snapshotsync import ( "bytes" "context" + "encoding/binary" "fmt" "sync" @@ -346,14 +347,17 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k return nil, nil, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.TxnHashIdx.BaseDataID(), sn.Transactions.FilePath()) } - txs := make([]types.Transaction, b.TxAmount) - senders = make([]common.Address, b.TxAmount) - if b.TxAmount > 0 { - txnOffset := sn.TxnHashIdx.Lookup2(b.BaseTxId - sn.TxnHashIdx.BaseDataID()) // need subtract baseID of indexFile + txs := make([]types.Transaction, b.TxAmount-2) + senders = make([]common.Address, b.TxAmount-2) + if b.TxAmount > 2 { + r := recsplit.NewIndexReader(sn.TxnIdsIdx) + binary.BigEndian.PutUint64(buf[:8], b.BaseTxId-sn.TxnIdsIdx.BaseDataID()) + txnOffset := r.Lookup(buf[:8]) gg = sn.Transactions.MakeGetter() gg.Reset(txnOffset) stream := rlp.NewStream(reader, 0) - for i := uint32(0); i < b.TxAmount; i++ { + buf, _ = gg.Next(buf[:0]) //first system-tx + for i := uint32(0); i < b.TxAmount-2; i++ { buf, _ = gg.Next(buf[:0]) senders[i].SetBytes(buf[1 : 1+20]) txRlp := buf[1+20:] @@ -429,7 +433,7 @@ func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *B body := new(types.Body) body.Uncles = b.Uncles - return body, b.BaseTxId, b.TxAmount, nil + return body, b.BaseTxId + 1, b.TxAmount - 2, nil // empty txs in the beginning and end of block } func (back *BlockReaderWithSnapshots) bodyWithTransactionsFromSnapshot(blockHeight uint64, sn *BlocksSnapshot, buf []byte) (*types.Body, []common.Address, uint64, uint32, error) { @@ -441,7 +445,9 @@ func (back *BlockReaderWithSnapshots) bodyWithTransactionsFromSnapshot(blockHeig senders := make([]common.Address, txsAmount) reader := bytes.NewReader(buf) if txsAmount > 0 { - txnOffset := sn.TxnHashIdx.Lookup2(baseTxnID - sn.TxnHashIdx.BaseDataID()) // need subtract baseID of indexFile + r := recsplit.NewIndexReader(sn.TxnIdsIdx) + binary.BigEndian.PutUint64(buf[:8], baseTxnID-sn.TxnIdsIdx.BaseDataID()) + txnOffset := r.Lookup(buf[:8]) gg := sn.Transactions.MakeGetter() gg.Reset(txnOffset) stream := rlp.NewStream(reader, 0) @@ -467,9 +473,7 @@ func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, buf []byte) sn := back.sn.blocks[i] reader := recsplit.NewIndexReader(sn.TxnHashIdx) - localID := reader.Lookup(txnHash[:]) - txnID = localID + sn.TxnHashIdx.BaseDataID() - offset := sn.TxnHashIdx.Lookup2(localID) + offset := reader.Lookup(txnHash[:]) gg := sn.Transactions.MakeGetter() gg.Reset(offset) //fmt.Printf("try: %d, %d, %d, %d\n", i, sn.From, localID, blockNum) @@ -480,10 +484,7 @@ func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, buf []byte) } reader2 := recsplit.NewIndexReader(sn.TxnHash2BlockNumIdx) - localID = reader2.Lookup(txnHash[:]) - blockNum = sn.TxnHash2BlockNumIdx.Lookup2(localID) - //fmt.Printf("try2: %d,%d,%d,%d,%d,%d\n", sn.TxnHash2BlockNumIdx.Lookup2(0), sn.TxnHash2BlockNumIdx.Lookup2(1), sn.TxnHash2BlockNumIdx.Lookup2(2), sn.TxnHash2BlockNumIdx.Lookup2(3), sn.TxnHash2BlockNumIdx.Lookup2(4), sn.TxnHash2BlockNumIdx.Lookup2(5)) - //fmt.Printf("try3: %d,%d,%d,%d\n", sn.TxnHash2BlockNumIdx.Lookup(common.FromHex("0xc2c3ba07f05ddd8552508e7facf25dc5bd6d16e95c12cff42cb8b9ea6bbfc225")), sn.TxnHash2BlockNumIdx.Lookup(common.FromHex("0xca8a182f21b98318e94ec7884f572c0a1385dbc10a2bea62a38079eab7d8cfef")), sn.TxnHash2BlockNumIdx.Lookup(common.FromHex("0xf1b3306dd4bfa2a86f7f1b3c22bf0b6b4da50f5d37f2fed89d1221cb3690c700")), sn.TxnHash2BlockNumIdx.Lookup(common.FromHex("0xf59129e464525261217833d4bafae0ed8be5a94044eafacb47a45a4b23802a70"))) + blockNum = reader2.Lookup(txnHash[:]) sender := buf[1 : 1+20] txn, err = types.DecodeTransaction(rlp.NewStream(bytes.NewReader(buf[1+20:]), uint64(len(buf)))) if err != nil { diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 6fd4b06ae..f48ffe291 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -44,6 +44,7 @@ type BlocksSnapshot struct { BodyNumberIdx *recsplit.Index // block_num_u64 -> bodies_segment_offset HeaderHashIdx *recsplit.Index // header_hash -> headers_segment_offset TxnHashIdx *recsplit.Index // transaction_hash -> transactions_segment_offset + TxnIdsIdx *recsplit.Index // transaction_id -> transactions_segment_offset TxnHash2BlockNumIdx *recsplit.Index // transaction_hash -> block_number From, To uint64 // [from,to) @@ -59,6 +60,7 @@ const ( const ( Transactions2Block Type = "transactions-to-block" + TransactionsId Type = "transactions-id" ) var AllSnapshotTypes = []Type{Headers, Bodies, Transactions} @@ -172,6 +174,15 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) { return err } + if bs.TxnIdsIdx != nil { + bs.TxnIdsIdx.Close() + bs.TxnIdsIdx = nil + } + bs.TxnIdsIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, TransactionsId))) + if err != nil { + return err + } + if bs.TxnHash2BlockNumIdx != nil { bs.TxnHash2BlockNumIdx.Close() bs.TxnHash2BlockNumIdx = nil @@ -296,6 +307,9 @@ func (s *RoSnapshots) closeIndices() { if s.TxnHashIdx != nil { s.TxnHashIdx.Close() } + if s.TxnIdsIdx != nil { + s.TxnIdsIdx.Close() + } if s.TxnHash2BlockNumIdx != nil { s.TxnHash2BlockNumIdx.Close() } @@ -658,20 +672,21 @@ func DumpBlocks(ctx context.Context, blockFrom, blockTo, blocksPerFile uint64, t return nil } func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, snapshotDir string, chainDB kv.RoDB, workers int, lvl log.Lvl) error { - segmentFile := filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Bodies)) + segmentFile := filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Transactions)) + if _, err := DumpTxs(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { + return fmt.Errorf("DumpTxs: %w", err) + } + + segmentFile = filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Bodies)) if err := DumpBodies(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { - return err + return fmt.Errorf("DumpBodies: %w", err) } segmentFile = filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Headers)) if err := DumpHeaders(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { - return err + return fmt.Errorf("DumpHeaders: %w", err) } - segmentFile = filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Transactions)) - if _, err := DumpTxs(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { - return err - } return nil } @@ -686,7 +701,7 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF f, err := compress.NewCompressor(ctx, "Transactions", segmentFile, tmpDir, compress.MinPatternScore, workers) if err != nil { - return 0, err + return 0, fmt.Errorf("NewCompressor: %w, %s", err, segmentFile) } defer f.Close() @@ -695,13 +710,50 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF parseCtx := txpool.NewTxParseContext(*chainID) parseCtx.WithSender(false) slot := txpool.TxSlot{} + var sender [20]byte + parse := func(v, valueBuf []byte, senders []common.Address, j int) ([]byte, error) { + if _, err := parseCtx.ParseTransaction(v, 0, &slot, sender[:], false /* hasEnvelope */); err != nil { + return valueBuf, err + } + if len(senders) > 0 { + sender = senders[j] + } + + valueBuf = valueBuf[:0] + valueBuf = append(valueBuf, slot.IdHash[:1]...) + valueBuf = append(valueBuf, sender[:]...) + valueBuf = append(valueBuf, v...) + return valueBuf, nil + } valueBuf := make([]byte, 16*4096) + addSystemTx := func(tx kv.Tx, txId uint64) error { + binary.BigEndian.PutUint64(numBuf, txId) + tv, err := tx.GetOne(kv.EthTx, numBuf[:8]) + if err != nil { + return err + } + if tv == nil { + if err := f.AddWord(nil); err != nil { + return fmt.Errorf("AddWord1: %d", err) + } + return nil + } + + parseCtx.WithSender(false) + valueBuf, err = parse(tv, valueBuf, nil, 0) + if err != nil { + return err + } + if err := f.AddWord(valueBuf); err != nil { + return fmt.Errorf("AddWord2: %d", err) + } + return nil + } firstIDSaved := false from := dbutils.EncodeBlockNumber(blockFrom) var lastBody types.BodyForStorage - var sender [20]byte if err := kv.BigChunks(db, kv.HeaderCanonical, from, func(tx kv.Tx, k, v []byte) (bool, error) { blockNum := binary.BigEndian.Uint64(k) if blockNum >= blockTo { @@ -710,6 +762,9 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF h := common.BytesToHash(v) dataRLP := rawdb.ReadStorageBodyRLP(tx, h, blockNum) + if dataRLP == nil { + return false, fmt.Errorf("body not found: %d, %x", blockNum, h) + } var body types.BodyForStorage if e := rlp.DecodeBytes(dataRLP, &body); e != nil { return false, e @@ -723,31 +778,33 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF return false, err } - binary.BigEndian.PutUint64(numBuf, body.BaseTxId) - if !firstIDSaved { firstIDSaved = true firstTxID = body.BaseTxId } j := 0 - if err := tx.ForAmount(kv.EthTx, numBuf[:8], body.TxAmount, func(tk, tv []byte) error { + + if err := addSystemTx(tx, body.BaseTxId); err != nil { + return false, err + } + count++ + if prevTxID > 0 { + prevTxID++ + } else { + prevTxID = body.BaseTxId + } + binary.BigEndian.PutUint64(numBuf, body.BaseTxId+1) + if err := tx.ForAmount(kv.EthTx, numBuf[:8], body.TxAmount-2, func(tk, tv []byte) error { id := binary.BigEndian.Uint64(tk) if prevTxID != 0 && id != prevTxID+1 { panic(fmt.Sprintf("no gaps in tx ids are allowed: block %d does jump from %d to %d", blockNum, prevTxID, id)) } prevTxID = id parseCtx.WithSender(len(senders) == 0) - if _, err := parseCtx.ParseTransaction(tv, 0, &slot, sender[:], false /* hasEnvelope */); err != nil { + valueBuf, err = parse(tv, valueBuf, senders, j) + if err != nil { return err } - if len(senders) > 0 { - sender = senders[j] - } - - valueBuf = valueBuf[:0] - valueBuf = append(valueBuf, slot.IdHash[:1]...) - valueBuf = append(valueBuf, sender[:]...) - valueBuf = append(valueBuf, tv...) if err := f.AddWord(valueBuf); err != nil { return err } @@ -767,17 +824,23 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF } return nil }); err != nil { + return false, fmt.Errorf("ForAmount: %w", err) + } + + if err := addSystemTx(tx, body.BaseTxId+uint64(body.TxAmount)-1); err != nil { return false, err } + prevTxID++ + count++ return true, nil }); err != nil { - return 0, err + return 0, fmt.Errorf("BigChunks: %w", err) } if lastBody.BaseTxId+uint64(lastBody.TxAmount)-firstTxID != count { return 0, fmt.Errorf("incorrect tx count: %d, expected: %d", count, lastBody.BaseTxId+uint64(lastBody.TxAmount)-firstTxID) } if err := f.Compress(); err != nil { - return 0, err + return 0, fmt.Errorf("compress: %w", err) } _, fileName := filepath.Split(segmentFile) @@ -841,7 +904,7 @@ func DumpHeaders(ctx context.Context, db kv.RoDB, segmentFilePath, tmpDir string return err } if err := f.Compress(); err != nil { - return err + return fmt.Errorf("compress: %w", err) } return nil @@ -896,12 +959,14 @@ func DumpBodies(ctx context.Context, db kv.RoDB, segmentFilePath, tmpDir string, return err } if err := f.Compress(); err != nil { - return err + return fmt.Errorf("compress: %w", err) } return nil } +var EmptyTxHash = common.Hash{} + func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSnapshot, firstTxID, firstBlockNum, expectedCount uint64, segmentFilePath, tmpDir string, logEvery *time.Ticker, lvl log.Lvl) error { dir, _ := filepath.Split(segmentFilePath) @@ -914,8 +979,8 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna buf := make([]byte, 1024) txnHashIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ - KeyCount: d.Count(), - Enums: true, + KeyCount: d.Count() - d.EmptyWordsCount(), + Enums: false, BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, @@ -925,9 +990,21 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna if err != nil { return err } - txnHash2BlockNumIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ + txnIdIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ KeyCount: d.Count(), - Enums: true, + Enums: false, + BucketSize: 2000, + LeafSize: 8, + TmpDir: tmpDir, + IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, TransactionsId)), + BaseDataID: firstTxID, + }) + if err != nil { + return err + } + txnHash2BlockNumIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ + KeyCount: d.Count() - d.EmptyWordsCount(), + Enums: false, BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, @@ -940,12 +1017,14 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna RETRY: txnHashIdx.NoLogs(true) + txnIdIdx.NoLogs(true) txnHash2BlockNumIdx.NoLogs(true) ch := forEachAsync(ctx, d) type txHashWithOffet struct { txnHash [32]byte i, offset uint64 + empty bool // block may have empty txn in the beginning or end of block. such txs have no hash, but have ID err error } txsCh := make(chan txHashWithOffet, 1024) @@ -963,6 +1042,12 @@ RETRY: txsCh2 <- txHashWithOffet{err: it.err} return } + if len(it.word) == 0 { + txsCh <- txHashWithOffet{empty: true, i: it.i, offset: it.offset} + txsCh2 <- txHashWithOffet{empty: true, i: it.i, offset: it.offset} + continue + } + if _, err := parseCtx.ParseTransaction(it.word[1+20:], 0, &slot, sender[:], true /* hasEnvelope */); err != nil { txsCh <- txHashWithOffet{err: it.err} txsCh2 <- txHashWithOffet{err: it.err} @@ -974,8 +1059,9 @@ RETRY: }() wg := sync.WaitGroup{} - errCh := make(chan error, 2) + errCh := make(chan error, 3) defer close(errCh) + num := make([]byte, 8) wg.Add(1) go func() { @@ -987,8 +1073,18 @@ RETRY: errCh <- it.err return } + j++ + binary.BigEndian.PutUint64(num, it.i) + if err := txnIdIdx.AddKey(num, it.offset); err != nil { + errCh <- it.err + return + } + if it.empty { + continue + } if err := txnHashIdx.AddKey(it.txnHash[:], it.offset); err != nil { errCh <- it.err + return } select { @@ -997,14 +1093,22 @@ RETRY: return default: } - j++ } if j != expectedCount { panic(fmt.Errorf("expect: %d, got %d\n", expectedCount, j)) } - errCh <- txnHashIdx.Build() + if err := txnHashIdx.Build(); err != nil { + errCh <- fmt.Errorf("txnHashIdx: %w", err) + } else { + errCh <- nil + } + if err := txnIdIdx.Build(); err != nil { + errCh <- fmt.Errorf("txnIdIdx: %w", err) + } else { + errCh <- nil + } }() wg.Add(1) go func() { @@ -1034,6 +1138,10 @@ RETRY: blockNum++ } + if it.empty { + continue + } + if err := txnHash2BlockNumIdx.AddKey(it.txnHash[:], blockNum); err != nil { return err } @@ -1053,17 +1161,22 @@ RETRY: errCh <- err return } - errCh <- txnHash2BlockNumIdx.Build() + if err := txnHash2BlockNumIdx.Build(); err != nil { + errCh <- fmt.Errorf("txnHash2BlockNumIdx: %w", err) + } else { + errCh <- nil + } }() wg.Wait() - for i := 0; i < 2; i++ { + for i := 0; i < 3; i++ { err = <-errCh if err != nil { if errors.Is(err, recsplit.ErrCollision) { log.Warn("Building recsplit. Collision happened. It's ok. Restarting with another salt...", "err", err) txnHashIdx.ResetNextSalt() + txnIdIdx.ResetNextSalt() txnHash2BlockNumIdx.ResetNextSalt() goto RETRY } diff --git a/turbo/snapshotsync/snapshothashes/erigon-snapshots b/turbo/snapshotsync/snapshothashes/erigon-snapshots index 6c87e8d78..61d1023d6 160000 --- a/turbo/snapshotsync/snapshothashes/erigon-snapshots +++ b/turbo/snapshotsync/snapshothashes/erigon-snapshots @@ -1 +1 @@ -Subproject commit 6c87e8d78b1daf5cbc4f23e727084f1255a2b794 +Subproject commit 61d1023d686b8d3c623d601495c8d2f0737d3538