Add system-txs to begin end of block (#3654)

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* up torrent

* up torrent

* up torrent

* up torrent

* up torrent

* up torrent

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save

* save
This commit is contained in:
Alex Sharov 2022-03-10 14:48:58 +07:00 committed by GitHub
parent 63adf0218a
commit b003d07839
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 688 additions and 125 deletions

View File

@ -67,6 +67,6 @@ func (b adapterHandler) Handle(r lg.Record) {
log.Error(str)
default:
log.Warn("unknown logtype", "msg", r.String())
log.Debug(r.String(), "log_type", "unknown")
}
}

View File

@ -36,6 +36,7 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
"github.com/ledgerwatch/erigon-lib/txpool"
"github.com/ledgerwatch/erigon/cmd/downloader/downloader/torrentcfg"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/urfave/cli"
@ -43,8 +44,6 @@ import (
"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/params/networkname"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/paths"
"github.com/ledgerwatch/erigon/consensus/ethash"

View File

@ -362,11 +362,11 @@ func WriteRawTransactions(db kv.StatelessWriteTx, txs [][]byte, baseTxId uint64)
for _, tx := range txs {
txIdKey := make([]byte, 8)
binary.BigEndian.PutUint64(txIdKey, txId)
txId++
// If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine.
if err := db.Append(kv.EthTx, txIdKey, tx); err != nil {
return err
}
txId++
}
return nil
}
@ -401,7 +401,7 @@ func ReadBodyWithTransactions(db kv.Getter, hash common.Hash, number uint64) (*t
if canonicalHash == hash {
return ReadCanonicalBodyWithTransactions(db, hash, number), nil
}
return ReadNonCanonicalBodyWithTransactions(db, hash, number), nil
return NonCanonicalBodyWithTransactions(db, hash, number), nil
}
func ReadCanonicalBodyWithTransactions(db kv.Getter, hash common.Hash, number uint64) *types.Body {
@ -418,20 +418,6 @@ func ReadCanonicalBodyWithTransactions(db kv.Getter, hash common.Hash, number ui
return body
}
func ReadNonCanonicalBodyWithTransactions(db kv.Getter, hash common.Hash, number uint64) *types.Body {
body, baseTxId, txAmount := ReadBody(db, hash, number)
if body == nil {
return nil
}
var err error
body.Transactions, err = NonCanonicalTransactions(db, baseTxId, txAmount)
if err != nil {
log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err)
return nil
}
return body
}
func NonCanonicalBodyWithTransactions(db kv.Getter, hash common.Hash, number uint64) *types.Body {
body, baseTxId, txAmount := ReadBody(db, hash, number)
if body == nil {
@ -523,7 +509,10 @@ func ReadBody(db kv.Getter, hash common.Hash, number uint64) (*types.Body, uint6
}
body := new(types.Body)
body.Uncles = bodyForStorage.Uncles
return body, bodyForStorage.BaseTxId, bodyForStorage.TxAmount
if bodyForStorage.TxAmount < 2 {
panic(fmt.Sprintf("block body hash too few txs amount: %d, %d", number, bodyForStorage.TxAmount))
}
return body, bodyForStorage.BaseTxId + 1, bodyForStorage.TxAmount - 2 // 1 system txn in the begining of block, and 1 at the end
}
func ReadSenders(db kv.Getter, hash common.Hash, number uint64) ([]common.Address, error) {
@ -550,19 +539,19 @@ func WriteRawBodyIfNotExists(db kv.StatelessRwTx, hash common.Hash, number uint6
}
func WriteRawBody(db kv.StatelessRwTx, hash common.Hash, number uint64, body *types.RawBody) error {
baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions)))
baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))+2)
if err != nil {
return err
}
data := types.BodyForStorage{
BaseTxId: baseTxId,
TxAmount: uint32(len(body.Transactions)),
TxAmount: uint32(len(body.Transactions)) + 2,
Uncles: body.Uncles,
}
if err = WriteBodyForStorage(db, hash, number, &data); err != nil {
return fmt.Errorf("failed to write body: %w", err)
}
if err = WriteRawTransactions(db, body.Transactions, baseTxId); err != nil {
if err = WriteRawTransactions(db, body.Transactions, baseTxId+1); err != nil {
return fmt.Errorf("failed to WriteRawTransactions: %w", err)
}
return nil
@ -571,19 +560,19 @@ func WriteRawBody(db kv.StatelessRwTx, hash common.Hash, number uint64, body *ty
func WriteBody(db kv.RwTx, hash common.Hash, number uint64, body *types.Body) error {
// Pre-processing
body.SendersFromTxs()
baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions)))
baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))+2)
if err != nil {
return err
}
data := types.BodyForStorage{
BaseTxId: baseTxId,
TxAmount: uint32(len(body.Transactions)),
TxAmount: uint32(len(body.Transactions)) + 2,
Uncles: body.Uncles,
}
if err := WriteBodyForStorage(db, hash, number, &data); err != nil {
return fmt.Errorf("failed to write body: %w", err)
}
err = WriteTransactions(db, body.Transactions, baseTxId)
err = WriteTransactions(db, body.Transactions, baseTxId+1)
if err != nil {
return fmt.Errorf("failed to WriteTransactions: %w", err)
}
@ -632,13 +621,18 @@ func MakeBodiesCanonical(tx kv.StatelessRwTx, from uint64, ctx context.Context,
return err
}
id := newBaseId
if err := tx.ForAmount(kv.NonCanonicalTxs, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId), bodyForStorage.TxAmount, func(k, v []byte) error {
// next loop does move only non-system txs. need move system-txs manually (because they may not exist)
i := uint64(0)
if err := tx.ForAmount(kv.NonCanonicalTxs, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId+1), bodyForStorage.TxAmount-2, func(k, v []byte) error {
id := newBaseId + 1 + i
if err := tx.Put(kv.EthTx, dbutils.EncodeBlockNumber(id), v); err != nil {
return err
}
id++
return tx.Delete(kv.NonCanonicalTxs, k, nil)
if err := tx.Delete(kv.NonCanonicalTxs, k, nil); err != nil {
return err
}
i++
return nil
}); err != nil {
return err
}
@ -661,6 +655,8 @@ func MakeBodiesCanonical(tx kv.StatelessRwTx, from uint64, ctx context.Context,
// MakeBodiesNonCanonical - move all txs of canonical blocks to NonCanonicalTxs bucket
func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error {
var firstMovedTxnID uint64
var firstMovedTxnIDIsSet bool
for blockNum := from; ; blockNum++ {
h, err := ReadCanonicalHash(tx, blockNum)
if err != nil {
@ -678,23 +674,31 @@ func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPre
if err := rlp.DecodeBytes(data, bodyForStorage); err != nil {
return err
}
if !firstMovedTxnIDIsSet {
firstMovedTxnIDIsSet = true
firstMovedTxnID = bodyForStorage.BaseTxId
}
// move txs to NonCanonical bucket, it has own sequence
newBaseId, err := tx.IncrementSequence(kv.NonCanonicalTxs, uint64(bodyForStorage.TxAmount))
if err != nil {
return err
}
id := newBaseId
if err := tx.ForAmount(kv.EthTx, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId), bodyForStorage.TxAmount, func(k, v []byte) error {
// next loop does move only non-system txs. need move system-txs manually (because they may not exist)
i := uint64(0)
if err := tx.ForAmount(kv.EthTx, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId+1), bodyForStorage.TxAmount-2, func(k, v []byte) error {
id := newBaseId + 1 + i
if err := tx.Put(kv.NonCanonicalTxs, dbutils.EncodeBlockNumber(id), v); err != nil {
return err
}
id++
return tx.Delete(kv.EthTx, k, nil)
if err := tx.Delete(kv.EthTx, k, nil); err != nil {
return err
}
i++
return nil
}); err != nil {
return err
}
bodyForStorage.BaseTxId = newBaseId
if err := WriteBodyForStorage(tx, h, blockNum, bodyForStorage); err != nil {
return err
@ -710,23 +714,11 @@ func MakeBodiesNonCanonical(tx kv.RwTx, from uint64, ctx context.Context, logPre
}
// EthTx must have canonical id's - means need decrement it's sequence on unwind
c, err := tx.Cursor(kv.EthTx)
if err != nil {
return err
if firstMovedTxnIDIsSet {
if err := ResetSequence(tx, kv.EthTx, firstMovedTxnID); err != nil {
return err
}
}
defer c.Close()
k, _, err := c.Last()
if err != nil {
return err
}
var nextTxID uint64
if k != nil {
nextTxID = binary.BigEndian.Uint64(k) + 1
}
if err := ResetSequence(tx, kv.EthTx, nextTxID); err != nil {
return err
}
return nil
}

View File

@ -40,14 +40,14 @@ func TestBodiesUnwind(t *testing.T) {
n, err := tx.ReadSequence(kv.EthTx)
require.NoError(err)
require.Equal(5*3, int(n)) // from 0, 5 block with 3 txn in each
require.Equal(5*(3+2), int(n)) // from 0, 5 block with 3 txn in each
}
{
err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery) // block 5 already canonical, start from next one
require.NoError(err)
n, err := tx.ReadSequence(kv.EthTx)
require.NoError(err)
require.Equal(10*3, int(n))
require.Equal(10*(3+2), int(n))
err = rawdb.WriteRawBody(tx, common.Hash{11}, 11, b)
require.NoError(err)
@ -56,7 +56,7 @@ func TestBodiesUnwind(t *testing.T) {
n, err = tx.ReadSequence(kv.EthTx)
require.NoError(err)
require.Equal(11*3, int(n))
require.Equal(11*(3+2), int(n))
}
{
@ -66,12 +66,12 @@ func TestBodiesUnwind(t *testing.T) {
n, err := tx.ReadSequence(kv.EthTx)
require.NoError(err)
require.Equal(5*3, int(n)) // from 0, 5 block with 3 txn in each
require.Equal(5*(3+2), int(n)) // from 0, 5 block with 3 txn in each
err = rawdb.MakeBodiesCanonical(tx, 5+1, ctx, "test", logEvery) // block 5 already canonical, start from next one
require.NoError(err)
n, err = tx.ReadSequence(kv.EthTx)
require.NoError(err)
require.Equal(11*3, int(n))
require.Equal(11*(3+2), int(n))
}
}

View File

@ -137,15 +137,14 @@ func TestSenders(t *testing.T) {
assert.Equal(t, 0, len(senders))
}
{
txs, err := rawdb.CanonicalTransactions(tx, 0, 2)
txs, err := rawdb.CanonicalTransactions(tx, 1, 2)
assert.NoError(t, err)
assert.Equal(t, 2, len(txs))
txs, err = rawdb.CanonicalTransactions(tx, 2, 3)
txs, err = rawdb.CanonicalTransactions(tx, 5, 3)
assert.NoError(t, err)
assert.Equal(t, 3, len(txs))
txs, err = rawdb.CanonicalTransactions(tx, 0, 1024)
txs, err = rawdb.CanonicalTransactions(tx, 5, 1024)
assert.NoError(t, err)
assert.Equal(t, 5, len(txs))
assert.Equal(t, 3, len(txs))
}
}

View File

@ -160,7 +160,7 @@ func unwindTxLookup(u *UnwindState, s *StageState, tx kv.RwTx, cfg TxLookupCfg,
return fmt.Errorf("rlp decode err: %w", err)
}
txs, err := rawdb.CanonicalTransactions(tx, body.BaseTxId, body.TxAmount)
txs, err := rawdb.CanonicalTransactions(tx, body.BaseTxId+1, body.TxAmount-2)
if err != nil {
return err
}
@ -232,7 +232,7 @@ func pruneTxLookup(tx kv.RwTx, logPrefix, tmpDir string, s *PruneState, pruneTo
return fmt.Errorf("rlp decode: %w", err)
}
txs, err := rawdb.CanonicalTransactions(tx, body.BaseTxId, body.TxAmount)
txs, err := rawdb.CanonicalTransactions(tx, body.BaseTxId+1, body.TxAmount-2)
if err != nil {
return err
}

2
go.mod
View File

@ -40,7 +40,7 @@ require (
github.com/json-iterator/go v1.1.12
github.com/julienschmidt/httprouter v1.3.0
github.com/kevinburke/go-bindata v3.21.0+incompatible
github.com/ledgerwatch/erigon-lib v0.0.0-20220309172522-75b52ac25e10
github.com/ledgerwatch/erigon-lib v0.0.0-20220310020438-6f5ec338cbc1
github.com/ledgerwatch/log/v3 v3.4.1
github.com/ledgerwatch/secp256k1 v1.0.0
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect

4
go.sum
View File

@ -641,8 +641,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20220309172522-75b52ac25e10 h1:WpvYRV25W89qbzfSKRrxPK5l1XVRE3Y297fiSfPE8wU=
github.com/ledgerwatch/erigon-lib v0.0.0-20220309172522-75b52ac25e10/go.mod h1:SlDiODxcwf99wgSRhp/JO5piRwUJI6AN7WB5aQ5N7u8=
github.com/ledgerwatch/erigon-lib v0.0.0-20220310020438-6f5ec338cbc1 h1:g3nSeta0vg4XPibGikejEz9PCMVGnk01PTLgUecbEyU=
github.com/ledgerwatch/erigon-lib v0.0.0-20220310020438-6f5ec338cbc1/go.mod h1:SlDiODxcwf99wgSRhp/JO5piRwUJI6AN7WB5aQ5N7u8=
github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=
github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=

View File

@ -32,6 +32,7 @@ import (
var migrations = map[kv.Label][]Migration{
kv.ChainDB: {
dbSchemaVersion5,
txsBeginEnd,
},
kv.TxPoolDB: {},
kv.SentryDB: {},
@ -45,7 +46,7 @@ type Migration struct {
var (
ErrMigrationNonUniqueName = fmt.Errorf("please provide unique migration name")
ErrMigrationCommitNotCalled = fmt.Errorf("migration commit function was not called")
ErrMigrationCommitNotCalled = fmt.Errorf("migration before-commit function was not called")
ErrMigrationETLFilesDeleted = fmt.Errorf("db migration progress was interrupted after extraction step and ETL files was deleted, please contact development team for help or re-sync from scratch")
)
@ -167,7 +168,7 @@ func (m *Migrator) Apply(db kv.RwDB, datadir string) error {
return err
}
if err := m.VerifyVersion(db); err != nil {
return err
return fmt.Errorf("migrator.Apply: %w", err)
}
// migration names must be unique, protection against people's mistake
@ -194,7 +195,7 @@ func (m *Migrator) Apply(db kv.RwDB, datadir string) error {
progress, err = tx.GetOne(kv.Migrations, []byte("_progress_"+v.Name))
return err
}); err != nil {
return err
return fmt.Errorf("migrator.Apply: %w", err)
}
if err := v.Up(db, filepath.Join(datadir, "migrations", v.Name), progress, func(tx kv.RwTx, key []byte, isDone bool) error {
@ -224,7 +225,7 @@ func (m *Migrator) Apply(db kv.RwDB, datadir string) error {
return nil
}); err != nil {
return err
return fmt.Errorf("migrator.Apply.Up: %s, %w", v.Name, err)
}
if !callbackCalled {
@ -243,7 +244,7 @@ func (m *Migrator) Apply(db kv.RwDB, datadir string) error {
}
return nil
}); err != nil {
return err
return fmt.Errorf("migrator.Apply: %w", err)
}
log.Info("Updated DB schema to", "version", fmt.Sprintf("%d.%d.%d", kv.DBSchemaVersion.Major, kv.DBSchemaVersion.Minor, kv.DBSchemaVersion.Patch))
return nil

354
migrations/txs_begin_end.go Normal file
View File

@ -0,0 +1,354 @@
package migrations
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"runtime"
"time"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/log/v3"
)
const ASSERT = false
var ErrTxsBeginEndNoMigration = fmt.Errorf("in this Erigon version DB format was changed: added additional first/last system-txs to blocks. There is no DB migration for this change. Please re-sync or switch to earlier version")
var txsBeginEnd = Migration{
Name: "txs_begin_end",
Up: func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
logEvery := time.NewTicker(10 * time.Second)
defer logEvery.Stop()
var latestBlock uint64
if err := db.View(context.Background(), func(tx kv.Tx) error {
bodiesProgress, err := stages.GetStageProgress(tx, stages.Bodies)
if err != nil {
return err
}
if progress != nil {
latestBlock = binary.BigEndian.Uint64(progress)
log.Info("[migration] Continue migration", "from_block", latestBlock)
} else {
latestBlock = bodiesProgress + 1 // include block 0
}
return nil
}); err != nil {
return err
}
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
numBuf := make([]byte, 8)
numHashBuf := make([]byte, 8+32)
for i := int(latestBlock); i >= 0; i-- {
blockNum := uint64(i)
select {
case <-logEvery.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info("[migration] Adding system-txs",
"progress", fmt.Sprintf("%.2f%%", 100-100*float64(blockNum)/float64(latestBlock)), "block_num", blockNum,
"alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
default:
}
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum)
if err != nil {
return err
}
var oldBlock *types.Body
if ASSERT {
oldBlock = readCanonicalBodyWithTransactionsDeprecated(tx, canonicalHash, blockNum)
}
binary.BigEndian.PutUint64(numHashBuf[:8], blockNum)
copy(numHashBuf[8:], canonicalHash[:])
b, err := rawdb.ReadBodyForStorageByKey(tx, numHashBuf)
if err != nil {
return err
}
if b == nil {
continue
}
txs, err := rawdb.CanonicalTransactions(tx, b.BaseTxId, b.TxAmount)
if err != nil {
return err
}
b.BaseTxId += (blockNum) * 2
b.TxAmount += 2
if err := rawdb.WriteBodyForStorage(tx, canonicalHash, blockNum, b); err != nil {
return fmt.Errorf("failed to write body: %w", err)
}
// del first tx in block
if err = tx.Delete(kv.EthTx, dbutils.EncodeBlockNumber(b.BaseTxId), nil); err != nil {
return err
}
if err := writeTransactionsNewDeprecated(tx, txs, b.BaseTxId+1); err != nil {
return fmt.Errorf("failed to write body txs: %w", err)
}
// del last tx in block
if err = tx.Delete(kv.EthTx, dbutils.EncodeBlockNumber(b.BaseTxId+uint64(b.TxAmount)-1), nil); err != nil {
return err
}
if ASSERT {
newBlock, baseTxId, txAmount := rawdb.ReadBody(tx, canonicalHash, blockNum)
newBlock.Transactions, err = rawdb.CanonicalTransactions(tx, baseTxId, txAmount)
for i, oldTx := range oldBlock.Transactions {
newTx := newBlock.Transactions[i]
if oldTx.GetNonce() != newTx.GetNonce() {
panic(blockNum)
}
}
}
if err = tx.ForPrefix(kv.BlockBody, numHashBuf[:8], func(k, v []byte) error {
if bytes.Equal(k, numHashBuf) { // don't delete canonical blocks
return nil
}
bodyForStorage := new(types.BodyForStorage)
if err := rlp.DecodeBytes(v, bodyForStorage); err != nil {
return err
}
for i := bodyForStorage.BaseTxId; i < bodyForStorage.BaseTxId+uint64(bodyForStorage.TxAmount); i++ {
binary.BigEndian.PutUint64(numBuf, i)
if err = tx.Delete(kv.NonCanonicalTxs, numBuf, nil); err != nil {
return err
}
}
if err = tx.Delete(kv.BlockBody, k, nil); err != nil {
return err
}
if err = tx.Delete(kv.Headers, k, nil); err != nil {
return err
}
if err = tx.Delete(kv.HeaderTD, k, nil); err != nil {
return err
}
if err = tx.Delete(kv.HeaderNumber, k[8:], nil); err != nil {
return err
}
if err = tx.Delete(kv.HeaderNumber, k[8:], nil); err != nil {
return err
}
return nil
}); err != nil {
return err
}
binary.BigEndian.PutUint64(numBuf, blockNum)
if err := BeforeCommit(tx, numBuf, false); err != nil {
return err
}
if blockNum%10_000 == 0 {
if err := tx.Commit(); err != nil {
return err
}
tx, err = db.BeginRw(context.Background())
if err != nil {
return err
}
}
}
if err := tx.Commit(); err != nil {
return err
}
return db.Update(context.Background(), func(tx kv.RwTx) error {
// reset non-canonical sequence to 0
v, err := tx.ReadSequence(kv.NonCanonicalTxs)
if err != nil {
return err
}
if _, err := tx.IncrementSequence(kv.NonCanonicalTxs, -v); err != nil {
return err
}
{
c, err := tx.Cursor(kv.HeaderCanonical)
if err != nil {
return err
}
k, v, err := c.Last()
if err != nil {
return err
}
data, err := tx.GetOne(kv.BlockBody, append(k, v...))
if err != nil {
return err
}
var newSeqValue uint64
if len(data) > 0 {
bodyForStorage := new(types.BodyForStorage)
if err := rlp.DecodeBytes(data, bodyForStorage); err != nil {
return fmt.Errorf("rlp.DecodeBytes(bodyForStorage): %w", err)
}
currentSeq, err := tx.ReadSequence(kv.EthTx)
if err != nil {
return err
}
newSeqValue = bodyForStorage.BaseTxId + uint64(bodyForStorage.TxAmount) - currentSeq
}
if _, err := tx.IncrementSequence(kv.EthTx, newSeqValue); err != nil {
return err
}
}
// This migration is no-op, but it forces the migration mechanism to apply it and thus write the DB schema version info
return BeforeCommit(tx, nil, true)
})
},
}
func writeRawBodyDeprecated(db kv.StatelessRwTx, hash common.Hash, number uint64, body *types.RawBody) error {
baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions)))
if err != nil {
return err
}
data := types.BodyForStorage{
BaseTxId: baseTxId,
TxAmount: uint32(len(body.Transactions)),
Uncles: body.Uncles,
}
if err = rawdb.WriteBodyForStorage(db, hash, number, &data); err != nil {
return fmt.Errorf("failed to write body: %w", err)
}
if err = rawdb.WriteRawTransactions(db, body.Transactions, baseTxId); err != nil {
return fmt.Errorf("failed to WriteRawTransactions: %w", err)
}
return nil
}
func writeTransactionsNewDeprecated(db kv.RwTx, txs []types.Transaction, baseTxId uint64) error {
txId := baseTxId
buf := bytes.NewBuffer(nil)
for _, tx := range txs {
txIdKey := make([]byte, 8)
binary.BigEndian.PutUint64(txIdKey, txId)
buf.Reset()
if err := rlp.Encode(buf, tx); err != nil {
return fmt.Errorf("broken tx rlp: %w", err)
}
// If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine.
if err := db.Put(kv.EthTx, txIdKey, common.CopyBytes(buf.Bytes())); err != nil {
return err
}
txId++
}
return nil
}
func readCanonicalBodyWithTransactionsDeprecated(db kv.Getter, hash common.Hash, number uint64) *types.Body {
data := rawdb.ReadStorageBodyRLP(db, hash, number)
if len(data) == 0 {
return nil
}
bodyForStorage := new(types.BodyForStorage)
err := rlp.DecodeBytes(data, bodyForStorage)
if err != nil {
log.Error("Invalid block body RLP", "hash", hash, "err", err)
return nil
}
body := new(types.Body)
body.Uncles = bodyForStorage.Uncles
body.Transactions, err = rawdb.CanonicalTransactions(db, bodyForStorage.BaseTxId, bodyForStorage.TxAmount)
if err != nil {
log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err)
return nil
}
return body
}
func makeBodiesNonCanonicalDeprecated(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error {
for blockNum := from; ; blockNum++ {
h, err := rawdb.ReadCanonicalHash(tx, blockNum)
if err != nil {
return err
}
if h == (common.Hash{}) {
break
}
data := rawdb.ReadStorageBodyRLP(tx, h, blockNum)
if len(data) == 0 {
break
}
bodyForStorage := new(types.BodyForStorage)
if err := rlp.DecodeBytes(data, bodyForStorage); err != nil {
return err
}
// move txs to NonCanonical bucket, it has own sequence
newBaseId, err := tx.IncrementSequence(kv.NonCanonicalTxs, uint64(bodyForStorage.TxAmount))
if err != nil {
return err
}
id := newBaseId
if err := tx.ForAmount(kv.EthTx, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId), bodyForStorage.TxAmount, func(k, v []byte) error {
if err := tx.Put(kv.NonCanonicalTxs, dbutils.EncodeBlockNumber(id), v); err != nil {
return err
}
id++
return tx.Delete(kv.EthTx, k, nil)
}); err != nil {
return err
}
bodyForStorage.BaseTxId = newBaseId
if err := rawdb.WriteBodyForStorage(tx, h, blockNum, bodyForStorage); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-logEvery.C:
log.Info(fmt.Sprintf("[%s] Unwinding transactions...", logPrefix), "current block", blockNum)
default:
}
}
// EthTx must have canonical id's - means need decrement it's sequence on unwind
c, err := tx.Cursor(kv.EthTx)
if err != nil {
return err
}
defer c.Close()
k, _, err := c.Last()
if err != nil {
return err
}
var nextTxID uint64
if k != nil {
nextTxID = binary.BigEndian.Uint64(k) + 1
}
if err := rawdb.ResetSequence(tx, kv.EthTx, nextTxID); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,103 @@
package migrations
import (
"bytes"
"context"
"encoding/binary"
"testing"
"time"
"github.com/ledgerwatch/erigon-lib/common/u256"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/stretchr/testify/require"
)
func TestTxsBeginEnd(t *testing.T) {
require, tmpDir, db := require.New(t), t.TempDir(), memdb.NewTestDB(t)
txn := &types.DynamicFeeTransaction{Tip: u256.N1, FeeCap: u256.N1, CommonTx: types.CommonTx{ChainID: u256.N1, Value: u256.N1, Gas: 1, Nonce: 1}}
buf := bytes.NewBuffer(nil)
err := txn.MarshalBinary(buf)
require.NoError(err)
rlpTxn := buf.Bytes()
logEvery := time.NewTicker(10 * time.Second)
defer logEvery.Stop()
b := &types.RawBody{Transactions: [][]byte{rlpTxn, rlpTxn, rlpTxn}}
err = db.Update(context.Background(), func(tx kv.RwTx) error {
for i := uint64(0); i < 10; i++ {
hash := common.Hash{byte(i)}
err = writeRawBodyDeprecated(tx, hash, i, b)
require.NoError(err)
err = rawdb.WriteCanonicalHash(tx, hash, i)
require.NoError(err)
}
if err := makeBodiesNonCanonicalDeprecated(tx, 7, context.Background(), "", logEvery); err != nil {
return err
}
for i := uint64(7); i < 10; i++ {
err = rawdb.DeleteCanonicalHash(tx, i)
require.NoError(err)
hash := common.Hash{0xa, byte(i)}
err = writeRawBodyDeprecated(tx, hash, i, b)
require.NoError(err)
err = rawdb.WriteCanonicalHash(tx, hash, i)
require.NoError(err)
}
if err := stages.SaveStageProgress(tx, stages.Bodies, 9); err != nil {
return err
}
return nil
})
require.NoError(err)
migrator := NewMigrator(kv.ChainDB)
migrator.Migrations = []Migration{txsBeginEnd}
err = migrator.Apply(db, tmpDir)
require.NoError(err)
err = db.View(context.Background(), func(tx kv.Tx) error {
v, err := tx.ReadSequence(kv.EthTx)
require.NoError(err)
require.Equal(uint64(3*10+2*10), v)
return nil
})
require.NoError(err)
err = db.View(context.Background(), func(tx kv.Tx) error {
for i := uint64(7); i < 10; i++ {
hash := common.Hash{byte(i)}
k := make([]byte, 8+32)
binary.BigEndian.PutUint64(k, 7)
copy(k[8:], hash[:])
has, err := tx.Has(kv.BlockBody, k)
require.NoError(err)
require.False(has)
}
c, err := tx.Cursor(kv.NonCanonicalTxs)
require.NoError(err)
cnt, err := c.Count()
require.NoError(err)
require.Zero(cnt)
v, err := tx.ReadSequence(kv.NonCanonicalTxs)
require.NoError(err)
require.Zero(v)
has, err := tx.Has(kv.EthTx, dbutils.EncodeBlockNumber(0))
require.NoError(err)
require.False(has)
return nil
})
require.NoError(err)
}

View File

@ -33,6 +33,7 @@ import (
"github.com/ledgerwatch/erigon/p2p"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/log/v3"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
)
@ -172,7 +173,7 @@ func TestNodeCloseClosesDB(t *testing.T) {
stack, _ := New(testNodeConfig(t))
defer stack.Close()
db, err := OpenDatabase(stack.Config(), log.New(), kv.ChainDB)
db, err := OpenDatabase(stack.Config(), log.New(), kv.SentryDB)
if err != nil {
t.Fatal("can't open DB:", err)
}
@ -196,14 +197,14 @@ func TestNodeOpenDatabaseFromLifecycleStart(t *testing.T) {
t.Skip("fix me on win please")
}
stack, _ := New(testNodeConfig(t))
stack, err := New(testNodeConfig(t))
require.NoError(t, err)
defer stack.Close()
var db kv.RwDB
var err error
stack.RegisterLifecycle(&InstrumentedService{
startHook: func() {
db, err = OpenDatabase(stack.Config(), log.New(), kv.ChainDB)
db, err = OpenDatabase(stack.Config(), log.New(), kv.SentryDB)
if err != nil {
t.Fatal("can't open DB:", err)
}

View File

@ -251,7 +251,7 @@ func snapshotBlocks(ctx context.Context, chainDB kv.RoDB, fromBlock, toBlock, bl
workers = 1
}
if err := snapshotsync.DumpBlocks(ctx, fromBlock, last, blocksPerFile, tmpDir, snapshotDir, chainDB, workers, log.LvlInfo); err != nil {
return err
return fmt.Errorf("DumpBlocks: %w", err)
}
return nil
}

View File

@ -3,6 +3,7 @@ package snapshotsync
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"sync"
@ -346,14 +347,17 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k
return nil, nil, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.TxnHashIdx.BaseDataID(), sn.Transactions.FilePath())
}
txs := make([]types.Transaction, b.TxAmount)
senders = make([]common.Address, b.TxAmount)
if b.TxAmount > 0 {
txnOffset := sn.TxnHashIdx.Lookup2(b.BaseTxId - sn.TxnHashIdx.BaseDataID()) // need subtract baseID of indexFile
txs := make([]types.Transaction, b.TxAmount-2)
senders = make([]common.Address, b.TxAmount-2)
if b.TxAmount > 2 {
r := recsplit.NewIndexReader(sn.TxnIdsIdx)
binary.BigEndian.PutUint64(buf[:8], b.BaseTxId-sn.TxnIdsIdx.BaseDataID())
txnOffset := r.Lookup(buf[:8])
gg = sn.Transactions.MakeGetter()
gg.Reset(txnOffset)
stream := rlp.NewStream(reader, 0)
for i := uint32(0); i < b.TxAmount; i++ {
buf, _ = gg.Next(buf[:0]) //first system-tx
for i := uint32(0); i < b.TxAmount-2; i++ {
buf, _ = gg.Next(buf[:0])
senders[i].SetBytes(buf[1 : 1+20])
txRlp := buf[1+20:]
@ -429,7 +433,7 @@ func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *B
body := new(types.Body)
body.Uncles = b.Uncles
return body, b.BaseTxId, b.TxAmount, nil
return body, b.BaseTxId + 1, b.TxAmount - 2, nil // empty txs in the beginning and end of block
}
func (back *BlockReaderWithSnapshots) bodyWithTransactionsFromSnapshot(blockHeight uint64, sn *BlocksSnapshot, buf []byte) (*types.Body, []common.Address, uint64, uint32, error) {
@ -441,7 +445,9 @@ func (back *BlockReaderWithSnapshots) bodyWithTransactionsFromSnapshot(blockHeig
senders := make([]common.Address, txsAmount)
reader := bytes.NewReader(buf)
if txsAmount > 0 {
txnOffset := sn.TxnHashIdx.Lookup2(baseTxnID - sn.TxnHashIdx.BaseDataID()) // need subtract baseID of indexFile
r := recsplit.NewIndexReader(sn.TxnIdsIdx)
binary.BigEndian.PutUint64(buf[:8], baseTxnID-sn.TxnIdsIdx.BaseDataID())
txnOffset := r.Lookup(buf[:8])
gg := sn.Transactions.MakeGetter()
gg.Reset(txnOffset)
stream := rlp.NewStream(reader, 0)
@ -467,9 +473,7 @@ func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, buf []byte)
sn := back.sn.blocks[i]
reader := recsplit.NewIndexReader(sn.TxnHashIdx)
localID := reader.Lookup(txnHash[:])
txnID = localID + sn.TxnHashIdx.BaseDataID()
offset := sn.TxnHashIdx.Lookup2(localID)
offset := reader.Lookup(txnHash[:])
gg := sn.Transactions.MakeGetter()
gg.Reset(offset)
//fmt.Printf("try: %d, %d, %d, %d\n", i, sn.From, localID, blockNum)
@ -480,10 +484,7 @@ func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, buf []byte)
}
reader2 := recsplit.NewIndexReader(sn.TxnHash2BlockNumIdx)
localID = reader2.Lookup(txnHash[:])
blockNum = sn.TxnHash2BlockNumIdx.Lookup2(localID)
//fmt.Printf("try2: %d,%d,%d,%d,%d,%d\n", sn.TxnHash2BlockNumIdx.Lookup2(0), sn.TxnHash2BlockNumIdx.Lookup2(1), sn.TxnHash2BlockNumIdx.Lookup2(2), sn.TxnHash2BlockNumIdx.Lookup2(3), sn.TxnHash2BlockNumIdx.Lookup2(4), sn.TxnHash2BlockNumIdx.Lookup2(5))
//fmt.Printf("try3: %d,%d,%d,%d\n", sn.TxnHash2BlockNumIdx.Lookup(common.FromHex("0xc2c3ba07f05ddd8552508e7facf25dc5bd6d16e95c12cff42cb8b9ea6bbfc225")), sn.TxnHash2BlockNumIdx.Lookup(common.FromHex("0xca8a182f21b98318e94ec7884f572c0a1385dbc10a2bea62a38079eab7d8cfef")), sn.TxnHash2BlockNumIdx.Lookup(common.FromHex("0xf1b3306dd4bfa2a86f7f1b3c22bf0b6b4da50f5d37f2fed89d1221cb3690c700")), sn.TxnHash2BlockNumIdx.Lookup(common.FromHex("0xf59129e464525261217833d4bafae0ed8be5a94044eafacb47a45a4b23802a70")))
blockNum = reader2.Lookup(txnHash[:])
sender := buf[1 : 1+20]
txn, err = types.DecodeTransaction(rlp.NewStream(bytes.NewReader(buf[1+20:]), uint64(len(buf))))
if err != nil {

View File

@ -44,6 +44,7 @@ type BlocksSnapshot struct {
BodyNumberIdx *recsplit.Index // block_num_u64 -> bodies_segment_offset
HeaderHashIdx *recsplit.Index // header_hash -> headers_segment_offset
TxnHashIdx *recsplit.Index // transaction_hash -> transactions_segment_offset
TxnIdsIdx *recsplit.Index // transaction_id -> transactions_segment_offset
TxnHash2BlockNumIdx *recsplit.Index // transaction_hash -> block_number
From, To uint64 // [from,to)
@ -59,6 +60,7 @@ const (
const (
Transactions2Block Type = "transactions-to-block"
TransactionsId Type = "transactions-id"
)
var AllSnapshotTypes = []Type{Headers, Bodies, Transactions}
@ -172,6 +174,15 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) {
return err
}
if bs.TxnIdsIdx != nil {
bs.TxnIdsIdx.Close()
bs.TxnIdsIdx = nil
}
bs.TxnIdsIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, TransactionsId)))
if err != nil {
return err
}
if bs.TxnHash2BlockNumIdx != nil {
bs.TxnHash2BlockNumIdx.Close()
bs.TxnHash2BlockNumIdx = nil
@ -296,6 +307,9 @@ func (s *RoSnapshots) closeIndices() {
if s.TxnHashIdx != nil {
s.TxnHashIdx.Close()
}
if s.TxnIdsIdx != nil {
s.TxnIdsIdx.Close()
}
if s.TxnHash2BlockNumIdx != nil {
s.TxnHash2BlockNumIdx.Close()
}
@ -658,20 +672,21 @@ func DumpBlocks(ctx context.Context, blockFrom, blockTo, blocksPerFile uint64, t
return nil
}
func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, snapshotDir string, chainDB kv.RoDB, workers int, lvl log.Lvl) error {
segmentFile := filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Bodies))
segmentFile := filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Transactions))
if _, err := DumpTxs(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return fmt.Errorf("DumpTxs: %w", err)
}
segmentFile = filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Bodies))
if err := DumpBodies(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return err
return fmt.Errorf("DumpBodies: %w", err)
}
segmentFile = filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Headers))
if err := DumpHeaders(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return err
return fmt.Errorf("DumpHeaders: %w", err)
}
segmentFile = filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Transactions))
if _, err := DumpTxs(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return err
}
return nil
}
@ -686,7 +701,7 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF
f, err := compress.NewCompressor(ctx, "Transactions", segmentFile, tmpDir, compress.MinPatternScore, workers)
if err != nil {
return 0, err
return 0, fmt.Errorf("NewCompressor: %w, %s", err, segmentFile)
}
defer f.Close()
@ -695,13 +710,50 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF
parseCtx := txpool.NewTxParseContext(*chainID)
parseCtx.WithSender(false)
slot := txpool.TxSlot{}
var sender [20]byte
parse := func(v, valueBuf []byte, senders []common.Address, j int) ([]byte, error) {
if _, err := parseCtx.ParseTransaction(v, 0, &slot, sender[:], false /* hasEnvelope */); err != nil {
return valueBuf, err
}
if len(senders) > 0 {
sender = senders[j]
}
valueBuf = valueBuf[:0]
valueBuf = append(valueBuf, slot.IdHash[:1]...)
valueBuf = append(valueBuf, sender[:]...)
valueBuf = append(valueBuf, v...)
return valueBuf, nil
}
valueBuf := make([]byte, 16*4096)
addSystemTx := func(tx kv.Tx, txId uint64) error {
binary.BigEndian.PutUint64(numBuf, txId)
tv, err := tx.GetOne(kv.EthTx, numBuf[:8])
if err != nil {
return err
}
if tv == nil {
if err := f.AddWord(nil); err != nil {
return fmt.Errorf("AddWord1: %d", err)
}
return nil
}
parseCtx.WithSender(false)
valueBuf, err = parse(tv, valueBuf, nil, 0)
if err != nil {
return err
}
if err := f.AddWord(valueBuf); err != nil {
return fmt.Errorf("AddWord2: %d", err)
}
return nil
}
firstIDSaved := false
from := dbutils.EncodeBlockNumber(blockFrom)
var lastBody types.BodyForStorage
var sender [20]byte
if err := kv.BigChunks(db, kv.HeaderCanonical, from, func(tx kv.Tx, k, v []byte) (bool, error) {
blockNum := binary.BigEndian.Uint64(k)
if blockNum >= blockTo {
@ -710,6 +762,9 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF
h := common.BytesToHash(v)
dataRLP := rawdb.ReadStorageBodyRLP(tx, h, blockNum)
if dataRLP == nil {
return false, fmt.Errorf("body not found: %d, %x", blockNum, h)
}
var body types.BodyForStorage
if e := rlp.DecodeBytes(dataRLP, &body); e != nil {
return false, e
@ -723,31 +778,33 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF
return false, err
}
binary.BigEndian.PutUint64(numBuf, body.BaseTxId)
if !firstIDSaved {
firstIDSaved = true
firstTxID = body.BaseTxId
}
j := 0
if err := tx.ForAmount(kv.EthTx, numBuf[:8], body.TxAmount, func(tk, tv []byte) error {
if err := addSystemTx(tx, body.BaseTxId); err != nil {
return false, err
}
count++
if prevTxID > 0 {
prevTxID++
} else {
prevTxID = body.BaseTxId
}
binary.BigEndian.PutUint64(numBuf, body.BaseTxId+1)
if err := tx.ForAmount(kv.EthTx, numBuf[:8], body.TxAmount-2, func(tk, tv []byte) error {
id := binary.BigEndian.Uint64(tk)
if prevTxID != 0 && id != prevTxID+1 {
panic(fmt.Sprintf("no gaps in tx ids are allowed: block %d does jump from %d to %d", blockNum, prevTxID, id))
}
prevTxID = id
parseCtx.WithSender(len(senders) == 0)
if _, err := parseCtx.ParseTransaction(tv, 0, &slot, sender[:], false /* hasEnvelope */); err != nil {
valueBuf, err = parse(tv, valueBuf, senders, j)
if err != nil {
return err
}
if len(senders) > 0 {
sender = senders[j]
}
valueBuf = valueBuf[:0]
valueBuf = append(valueBuf, slot.IdHash[:1]...)
valueBuf = append(valueBuf, sender[:]...)
valueBuf = append(valueBuf, tv...)
if err := f.AddWord(valueBuf); err != nil {
return err
}
@ -767,17 +824,23 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF
}
return nil
}); err != nil {
return false, fmt.Errorf("ForAmount: %w", err)
}
if err := addSystemTx(tx, body.BaseTxId+uint64(body.TxAmount)-1); err != nil {
return false, err
}
prevTxID++
count++
return true, nil
}); err != nil {
return 0, err
return 0, fmt.Errorf("BigChunks: %w", err)
}
if lastBody.BaseTxId+uint64(lastBody.TxAmount)-firstTxID != count {
return 0, fmt.Errorf("incorrect tx count: %d, expected: %d", count, lastBody.BaseTxId+uint64(lastBody.TxAmount)-firstTxID)
}
if err := f.Compress(); err != nil {
return 0, err
return 0, fmt.Errorf("compress: %w", err)
}
_, fileName := filepath.Split(segmentFile)
@ -841,7 +904,7 @@ func DumpHeaders(ctx context.Context, db kv.RoDB, segmentFilePath, tmpDir string
return err
}
if err := f.Compress(); err != nil {
return err
return fmt.Errorf("compress: %w", err)
}
return nil
@ -896,12 +959,14 @@ func DumpBodies(ctx context.Context, db kv.RoDB, segmentFilePath, tmpDir string,
return err
}
if err := f.Compress(); err != nil {
return err
return fmt.Errorf("compress: %w", err)
}
return nil
}
var EmptyTxHash = common.Hash{}
func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSnapshot, firstTxID, firstBlockNum, expectedCount uint64, segmentFilePath, tmpDir string, logEvery *time.Ticker, lvl log.Lvl) error {
dir, _ := filepath.Split(segmentFilePath)
@ -914,8 +979,8 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna
buf := make([]byte, 1024)
txnHashIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{
KeyCount: d.Count(),
Enums: true,
KeyCount: d.Count() - d.EmptyWordsCount(),
Enums: false,
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
@ -925,9 +990,21 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna
if err != nil {
return err
}
txnHash2BlockNumIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{
txnIdIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{
KeyCount: d.Count(),
Enums: true,
Enums: false,
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, TransactionsId)),
BaseDataID: firstTxID,
})
if err != nil {
return err
}
txnHash2BlockNumIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{
KeyCount: d.Count() - d.EmptyWordsCount(),
Enums: false,
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
@ -940,12 +1017,14 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna
RETRY:
txnHashIdx.NoLogs(true)
txnIdIdx.NoLogs(true)
txnHash2BlockNumIdx.NoLogs(true)
ch := forEachAsync(ctx, d)
type txHashWithOffet struct {
txnHash [32]byte
i, offset uint64
empty bool // block may have empty txn in the beginning or end of block. such txs have no hash, but have ID
err error
}
txsCh := make(chan txHashWithOffet, 1024)
@ -963,6 +1042,12 @@ RETRY:
txsCh2 <- txHashWithOffet{err: it.err}
return
}
if len(it.word) == 0 {
txsCh <- txHashWithOffet{empty: true, i: it.i, offset: it.offset}
txsCh2 <- txHashWithOffet{empty: true, i: it.i, offset: it.offset}
continue
}
if _, err := parseCtx.ParseTransaction(it.word[1+20:], 0, &slot, sender[:], true /* hasEnvelope */); err != nil {
txsCh <- txHashWithOffet{err: it.err}
txsCh2 <- txHashWithOffet{err: it.err}
@ -974,8 +1059,9 @@ RETRY:
}()
wg := sync.WaitGroup{}
errCh := make(chan error, 2)
errCh := make(chan error, 3)
defer close(errCh)
num := make([]byte, 8)
wg.Add(1)
go func() {
@ -987,8 +1073,18 @@ RETRY:
errCh <- it.err
return
}
j++
binary.BigEndian.PutUint64(num, it.i)
if err := txnIdIdx.AddKey(num, it.offset); err != nil {
errCh <- it.err
return
}
if it.empty {
continue
}
if err := txnHashIdx.AddKey(it.txnHash[:], it.offset); err != nil {
errCh <- it.err
return
}
select {
@ -997,14 +1093,22 @@ RETRY:
return
default:
}
j++
}
if j != expectedCount {
panic(fmt.Errorf("expect: %d, got %d\n", expectedCount, j))
}
errCh <- txnHashIdx.Build()
if err := txnHashIdx.Build(); err != nil {
errCh <- fmt.Errorf("txnHashIdx: %w", err)
} else {
errCh <- nil
}
if err := txnIdIdx.Build(); err != nil {
errCh <- fmt.Errorf("txnIdIdx: %w", err)
} else {
errCh <- nil
}
}()
wg.Add(1)
go func() {
@ -1034,6 +1138,10 @@ RETRY:
blockNum++
}
if it.empty {
continue
}
if err := txnHash2BlockNumIdx.AddKey(it.txnHash[:], blockNum); err != nil {
return err
}
@ -1053,17 +1161,22 @@ RETRY:
errCh <- err
return
}
errCh <- txnHash2BlockNumIdx.Build()
if err := txnHash2BlockNumIdx.Build(); err != nil {
errCh <- fmt.Errorf("txnHash2BlockNumIdx: %w", err)
} else {
errCh <- nil
}
}()
wg.Wait()
for i := 0; i < 2; i++ {
for i := 0; i < 3; i++ {
err = <-errCh
if err != nil {
if errors.Is(err, recsplit.ErrCollision) {
log.Warn("Building recsplit. Collision happened. It's ok. Restarting with another salt...", "err", err)
txnHashIdx.ResetNextSalt()
txnIdIdx.ResetNextSalt()
txnHash2BlockNumIdx.ResetNextSalt()
goto RETRY
}

@ -1 +1 @@
Subproject commit 6c87e8d78b1daf5cbc4f23e727084f1255a2b794
Subproject commit 61d1023d686b8d3c623d601495c8d2f0737d3538