From b163d3a8776a976e91df5ab93c27fa97a35232e1 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Fri, 2 Jun 2023 10:55:40 +0700 Subject: [PATCH] e3: release some e4 parts (#7629) --- core/state/history_reader_v3.go | 12 +++++-- core/state/temporal/kv_temporal.go | 6 ++++ eth/stagedsync/exec3.go | 2 +- go.mod | 2 +- go.sum | 4 +-- tests/state_test.go | 5 +-- tests/state_test_util.go | 36 +++++++++++++++---- turbo/stages/blockchain_test.go | 55 ++++++++++++++++++++++-------- turbo/stages/genesis_test.go | 5 +-- turbo/stages/sentry_mock_test.go | 14 ++++---- 10 files changed, 102 insertions(+), 39 deletions(-) diff --git a/core/state/history_reader_v3.go b/core/state/history_reader_v3.go index 96f42d67f..0de590599 100644 --- a/core/state/history_reader_v3.go +++ b/core/state/history_reader_v3.go @@ -8,6 +8,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/core/state/temporal" "github.com/ledgerwatch/erigon/core/types/accounts" + "github.com/ledgerwatch/erigon/eth/ethconfig" ) // HistoryReaderV3 Implements StateReader and StateWriter @@ -50,9 +51,14 @@ func (hr *HistoryReaderV3) ReadAccountData(address libcommon.Address) (*accounts } func (hr *HistoryReaderV3) ReadAccountStorage(address libcommon.Address, incarnation uint64, key *libcommon.Hash) ([]byte, error) { - acc := make([]byte, 20+8) - copy(acc, address.Bytes()) - binary.BigEndian.PutUint64(acc[20:], incarnation) + var acc []byte + if ethconfig.EnableHistoryV4InTest { + acc = address.Bytes() + } else { + acc = make([]byte, 20+8) + copy(acc, address.Bytes()) + binary.BigEndian.PutUint64(acc[20:], incarnation) + } enc, _, err := hr.ttx.DomainGetAsOf(temporal.StorageDomain, acc, key.Bytes(), hr.txNum) if hr.trace { fmt.Printf("ReadAccountStorage [%x] [%x] => [%x]\n", address, *key, enc) diff --git a/core/state/temporal/kv_temporal.go b/core/state/temporal/kv_temporal.go index 638bcf2de..f855c0f3a 100644 --- a/core/state/temporal/kv_temporal.go +++ b/core/state/temporal/kv_temporal.go @@ -145,6 +145,8 @@ func (db *DB) BeginTemporalRw(ctx context.Context) (kv.RwTx, error) { tx := &Tx{MdbxTx: kvTx.(*mdbx.MdbxTx), db: db} tx.aggCtx = db.agg.MakeContext() + db.agg.StartUnbufferedWrites() + db.agg.SetTx(tx.MdbxTx) return tx, nil } func (db *DB) BeginRw(ctx context.Context) (kv.RwTx, error) { @@ -170,6 +172,8 @@ func (db *DB) BeginTemporalRwNosync(ctx context.Context) (kv.RwTx, error) { tx := &Tx{MdbxTx: kvTx.(*mdbx.MdbxTx), db: db} tx.aggCtx = db.agg.MakeContext() + db.agg.StartUnbufferedWrites() + db.agg.SetTx(tx.MdbxTx) return tx, nil } func (db *DB) BeginRwNosync(ctx context.Context) (kv.RwTx, error) { @@ -204,6 +208,8 @@ func (tx *Tx) autoClose() { for _, closer := range tx.resourcesToClose { closer.Close() } + tx.db.agg.FinishWrites() + tx.db.agg.SetTx(nil) if tx.aggCtx != nil { tx.aggCtx.Close() } diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 610b90e17..2569ebd20 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -167,7 +167,7 @@ func ExecV3(ctx context.Context, defer applyTx.Rollback() } else { if blockSnapshots.Cfg().Enabled { - defer blockSnapshots.EnableMadvNormal().DisableReadAhead() + //defer blockSnapshots.EnableMadvNormal().DisableReadAhead() } } diff --git a/go.mod b/go.mod index f46ae5154..15d1c3c5a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon go 1.19 require ( - github.com/ledgerwatch/erigon-lib v0.0.0-20230602025103-2ae7b51de9a5 + github.com/ledgerwatch/erigon-lib v0.0.0-20230602033528-6cde9b90462a github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336 github.com/ledgerwatch/log/v3 v3.8.0 github.com/ledgerwatch/secp256k1 v1.0.0 diff --git a/go.sum b/go.sum index e6a293cc0..097fe7acc 100644 --- a/go.sum +++ b/go.sum @@ -445,8 +445,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20230602025103-2ae7b51de9a5 h1:vXNmfwDhJnkr4qnfbn4ep5pMVE2WDUk3PLtChC+WqPI= -github.com/ledgerwatch/erigon-lib v0.0.0-20230602025103-2ae7b51de9a5/go.mod h1:R1Wsn0BxmEUZOIcAeGJmaqiXSdegEQ/+GfdjFruu+jQ= +github.com/ledgerwatch/erigon-lib v0.0.0-20230602033528-6cde9b90462a h1:l3e31IOpNvqH6LMipirdcyRKZXZS2xMJAd19EmumLzE= +github.com/ledgerwatch/erigon-lib v0.0.0-20230602033528-6cde9b90462a/go.mod h1:R1Wsn0BxmEUZOIcAeGJmaqiXSdegEQ/+GfdjFruu+jQ= github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336 h1:Yxmt4Wyd0RCLr7UJJAl0ApCP/f5qkWfvHfgPbnI8ghM= github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230404044759-5dec854ce336/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/ledgerwatch/log/v3 v3.8.0 h1:gCpp7uGtIerEz1jKVPeDnbIopFPud9ZnCpBLlLBGqPU= diff --git a/tests/state_test.go b/tests/state_test.go index a8c79b2c4..db3a64a6e 100644 --- a/tests/state_test.go +++ b/tests/state_test.go @@ -27,9 +27,9 @@ import ( "runtime" "testing" - "github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/eth/tracers/logger" + "github.com/ledgerwatch/erigon/turbo/stages" "github.com/ledgerwatch/log/v3" ) @@ -51,7 +51,8 @@ func TestState(t *testing.T) { st.skipLoad(`.*vmPerformance/loop.*`) st.walk(t, stateTestDir, func(t *testing.T, name string, test *StateTest) { - db := memdb.NewTestDB(t) + m := stages.Mock(t) + db := m.DB for _, subtest := range test.Subtests() { subtest := subtest key := fmt.Sprintf("%s/%d", subtest.Fork, subtest.Index) diff --git a/tests/state_test_util.go b/tests/state_test_util.go index dec99bdcb..58cf69808 100644 --- a/tests/state_test_util.go +++ b/tests/state_test_util.go @@ -26,6 +26,7 @@ import ( "strings" "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon/eth/ethconfig" "golang.org/x/crypto/sha3" "github.com/ledgerwatch/erigon-lib/chain" @@ -192,8 +193,21 @@ func (t *StateTest) RunNoVerify(tx kv.RwTx, subtest StateSubtest, vmconfig vm.Co if err != nil { return nil, libcommon.Hash{}, UnsupportedForkError{subtest.Fork} } - statedb := state.New(state.NewPlainStateReader(tx)) - w := state.NewPlainStateWriter(tx, nil, writeBlockNr) + + var r state.StateReader + if ethconfig.EnableHistoryV4InTest { + panic("implement me") + } else { + r = state.NewPlainStateReader(tx) + } + statedb := state.New(r) + + var w state.StateWriter + if ethconfig.EnableHistoryV4InTest { + panic("implement me") + } else { + w = state.NewPlainStateWriter(tx, nil, writeBlockNr) + } var baseFee *big.Int if config.IsLondon(0) { @@ -293,12 +307,16 @@ func (t *StateTest) RunNoVerify(tx kv.RwTx, subtest StateSubtest, vmconfig vm.Co if err != nil { return nil, libcommon.Hash{}, fmt.Errorf("error calculating state root: %w", err) } - return statedb, root, nil } func MakePreState(rules *chain.Rules, tx kv.RwTx, accounts types.GenesisAlloc, blockNr uint64) (*state.IntraBlockState, error) { - r := state.NewPlainStateReader(tx) + var r state.StateReader + if ethconfig.EnableHistoryV4InTest { + panic("implement me") + } else { + r = state.NewPlainStateReader(tx) + } statedb := state.New(r) for addr, a := range accounts { statedb.SetCode(addr, a.Code) @@ -325,11 +343,17 @@ func MakePreState(rules *chain.Rules, tx kv.RwTx, accounts types.GenesisAlloc, b } } + var w state.StateWriter + if ethconfig.EnableHistoryV4InTest { + panic("implement me") + } else { + w = state.NewPlainStateWriter(tx, nil, blockNr+1) + } // Commit and re-open to start with a clean state. - if err := statedb.FinalizeTx(rules, state.NewPlainStateWriter(tx, nil, blockNr+1)); err != nil { + if err := statedb.FinalizeTx(rules, w); err != nil { return nil, err } - if err := statedb.CommitBlock(rules, state.NewPlainStateWriter(tx, nil, blockNr+1)); err != nil { + if err := statedb.CommitBlock(rules, w); err != nil { return nil, err } return statedb, nil diff --git a/turbo/stages/blockchain_test.go b/turbo/stages/blockchain_test.go index d82f901a7..403e45d1c 100644 --- a/turbo/stages/blockchain_test.go +++ b/turbo/stages/blockchain_test.go @@ -809,20 +809,45 @@ func doModesTest(t *testing.T, pm prune.Mode) error { require.Equal(uint64(0), receiptsAvailable) } - if pm.History.Enabled() { - afterPrune := uint64(0) - err := tx.ForEach(kv.AccountsHistory, nil, func(k, _ []byte) error { - n := binary.BigEndian.Uint64(k[length.Addr:]) - require.Greater(n, pm.History.PruneTo(head)) - afterPrune++ - return nil - }) - require.Greater(afterPrune, uint64(0)) - assert.NoError(t, err) + if m.HistoryV3 { + //TODO: e3 not implemented Prune feature yet + /* + if pm.History.Enabled() { + it, err := tx.(kv.TemporalTx).HistoryRange(temporal.AccountsHistory, 0, int(pm.History.PruneTo(head)), order.Asc, -1) + require.NoError(err) + count, err := iter.CountKV(it) + require.NoError(err) + require.Zero(count) + + it, err = tx.(kv.TemporalTx).HistoryRange(temporal.AccountsHistory, int(pm.History.PruneTo(head)), -1, order.Asc, -1) + require.NoError(err) + count, err = iter.CountKV(it) + require.NoError(err) + require.Equal(3, count) + } else { + it, err := tx.(kv.TemporalTx).HistoryRange(temporal.AccountsHistory, 0, -1, order.Asc, -1) + require.NoError(err) + count, err := iter.CountKV(it) + require.NoError(err) + require.Equal(3, count) + } + */ } else { - found, err := bitmapdb.Get64(tx, kv.AccountsHistory, address[:], 0, 1024) - require.NoError(err) - require.Equal(uint64(0), found.Minimum()) + if pm.History.Enabled() { + afterPrune := uint64(0) + err := tx.ForEach(kv.AccountsHistory, nil, func(k, _ []byte) error { + n := binary.BigEndian.Uint64(k[length.Addr:]) + require.Greater(n, pm.History.PruneTo(head)) + afterPrune++ + return nil + }) + require.Greater(afterPrune, uint64(0)) + assert.NoError(t, err) + } else { + found, err := bitmapdb.Get64(tx, kv.AccountsHistory, address[:], 0, 1024) + require.NoError(err) + require.Equal(uint64(0), found.Minimum()) + } } br, _ := m.NewBlocksIO() @@ -1042,7 +1067,7 @@ func TestDoubleAccountRemoval(t *testing.T) { assert.NoError(t, err) err = m.DB.View(m.Ctx, func(tx kv.Tx) error { - st := state.New(state.NewDbStateReader(tx)) + st := state.New(m.NewStateReader(tx)) assert.NoError(t, err) assert.False(t, st.Exist(theAddr), "Contract should've been removed") return nil @@ -1798,7 +1823,7 @@ func TestDeleteRecreateSlotsAcrossManyBlocks(t *testing.T) { } err = m.DB.View(m.Ctx, func(tx kv.Tx) error { - statedb := state.New(state.NewDbStateReader(tx)) + statedb := state.New(m.NewStateReader(tx)) // If all is correct, then slot 1 and 2 are zero key1 := libcommon.HexToHash("01") var got uint256.Int diff --git a/turbo/stages/genesis_test.go b/turbo/stages/genesis_test.go index 2c482e964..c4e60d52b 100644 --- a/turbo/stages/genesis_test.go +++ b/turbo/stages/genesis_test.go @@ -25,10 +25,11 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/ledgerwatch/erigon-lib/chain" libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/state/temporal" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/params" @@ -141,7 +142,7 @@ func TestSetupGenesis(t *testing.T) { for _, test := range tests { test := test t.Run(test.name, func(t *testing.T) { - db := memdb.NewTestDB(t) + _, _, db, _ := temporal.NewTestDB(t, context.Background(), datadir.New(tmpdir), nil, log.New()) config, genesis, err := test.fn(db) // Check the return values. if !reflect.DeepEqual(err, test.wantErr) { diff --git a/turbo/stages/sentry_mock_test.go b/turbo/stages/sentry_mock_test.go index d558a5ab4..3e708162e 100644 --- a/turbo/stages/sentry_mock_test.go +++ b/turbo/stages/sentry_mock_test.go @@ -57,7 +57,7 @@ func TestHeaderStep(t *testing.T) { } m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceed - initialCycle := true + initialCycle := stages.MockInsertAsInitialCycle if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, initialCycle, m.Log, m.BlockSnapshots, nil); err != nil { t.Fatal(err) } @@ -95,7 +95,7 @@ func TestMineBlockWith1Tx(t *testing.T) { } m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed - initialCycle := true + initialCycle := stages.MockInsertAsInitialCycle if _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, initialCycle, log.New(), m.BlockSnapshots, nil); err != nil { t.Fatal(err) } @@ -514,7 +514,7 @@ func TestForkchoiceToGenesis(t *testing.T) { } m.SendForkChoiceRequest(&forkChoiceMessage) - initialCycle := false + initialCycle := stages.MockInsertAsInitialCycle headBlockHash, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, initialCycle, m.Log, m.BlockSnapshots, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) @@ -536,7 +536,7 @@ func TestBogusForkchoice(t *testing.T) { } m.SendForkChoiceRequest(&forkChoiceMessage) - initialCycle := false + initialCycle := stages.MockInsertAsInitialCycle headBlockHash, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, initialCycle, m.Log, m.BlockSnapshots, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) @@ -571,7 +571,7 @@ func TestPoSDownloader(t *testing.T) { // Send a payload whose parent isn't downloaded yet m.SendPayloadRequest(chain.TopBlock) - initialCycle := false + initialCycle := stages.MockInsertAsInitialCycle headBlockHash, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, initialCycle, m.Log, m.BlockSnapshots, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) @@ -639,7 +639,7 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) { payloadMessage := types.NewBlockFromStorage(invalidTip.Hash(), invalidTip, chain.TopBlock.Transactions(), nil, nil) m.SendPayloadRequest(payloadMessage) - initialCycle := false + initialCycle := stages.MockInsertAsInitialCycle headBlockHash, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, initialCycle, m.Log, m.BlockSnapshots, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) @@ -725,7 +725,7 @@ func TestPOSWrongTrieRootReorgs(t *testing.T) { //------------------------------------------ m.SendPayloadRequest(chain0.TopBlock) - initialCycle := false + initialCycle := stages.MockInsertAsInitialCycle headBlockHash, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, initialCycle, m.Log, m.BlockSnapshots, nil) require.NoError(err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err)