Combine stage5 and stage6 (#679)

* Handle bucket error

* Replace with 0

* SetMaxDBs

* Set MaxDb before opening'

* Merge stage5 and stage6

* Fix lint

* Make downloader tests not parallel
This commit is contained in:
ledgerwatch 2020-06-20 10:07:22 +01:00 committed by GitHub
parent e3755a0df2
commit 5812a649ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 23 additions and 79 deletions

View File

@ -385,7 +385,6 @@ func bucketStats(chaindata string) {
case ethdb.Bolt:
db, err := bolt.Open(chaindata, 0600, &bolt.Options{ReadOnly: true})
check(err)
//bucketList := [][]byte{dbutils.IntermediateTrieHashBucket}
fmt.Printf(",BranchPageN,BranchOverflowN,LeafPageN,LeafOverflowN,KeyN,Depth,BranchAlloc,BranchInuse,LeafAlloc,LeafInuse,BucketN,InlineBucketN,InlineBucketInuse\n")
_ = db.View(func(tx *bolt.Tx) error {
@ -401,14 +400,19 @@ func bucketStats(chaindata string) {
case ethdb.Lmdb:
env, err := lmdb.NewEnv()
check(err)
err = env.SetMaxDBs(100)
check(err)
err = env.Open(chaindata, lmdb.Readonly, 0664)
check(err)
fmt.Printf(",BranchPageN,LeafPageN,OverflowN,Entries\n")
_ = env.View(func(tx *lmdb.Txn) error {
for _, bucket := range bucketList {
dbi, bucketErr := tx.OpenDBI(string(bucket), lmdb.Readonly)
check(bucketErr)
dbi, bucketErr := tx.OpenDBI(string(bucket), 0)
if bucketErr != nil {
fmt.Printf("opening bucket %s: %v\n", bucket, bucketErr)
continue
}
bs, statErr := tx.Stat(dbi)
check(statErr)
fmt.Printf("%s,%d,%d,%d,%d\n", string(bucket),
@ -2409,6 +2413,7 @@ func testUnwind5(chaindata string, rewind uint64) error {
return nil
}
/*
func testStage6(chaindata string, reset bool) error {
db := ethdb.MustOpen(chaindata)
defer db.Close()
@ -2463,6 +2468,7 @@ func testUnwind6(chaindata string, rewind uint64) error {
close(ch)
return nil
}
*/
func printStages(chaindata string) error {
db := ethdb.MustOpen(chaindata)
@ -2693,21 +2699,6 @@ func main() {
fmt.Printf("Error: %v\n", err)
}
}
if *action == "stage6" {
if err := testStage6(*chaindata, false /* reset */); err != nil {
fmt.Printf("Error: %v\n", err)
}
}
if *action == "reset6" {
if err := testStage6(*chaindata, true /* reset */); err != nil {
fmt.Printf("Error: %v\n", err)
}
}
if *action == "unwind6" {
if err := testUnwind6(*chaindata, uint64(*rewind)); err != nil {
fmt.Printf("Error: %v\n", err)
}
}
if *action == "stageLoop" {
if err := testStageLoop(*chaindata); err != nil {
fmt.Printf("Error: %v\n", err)

View File

@ -515,8 +515,6 @@ func TestCanonicalSynchronisation64Light(t *testing.T) {
}
func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -543,7 +541,6 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
//func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
func testThrottling(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -633,8 +630,6 @@ func TestForkedSync64Full(t *testing.T) { testForkedSync(t, 64, FullSync) }
func TestForkedSync64Light(t *testing.T) { testForkedSync(t, 64, LightSync) }
func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -669,8 +664,6 @@ func TestHeavyForkedSync64Full(t *testing.T) { testHeavyForkedSync(t, 64, FullSy
func TestHeavyForkedSync64Light(t *testing.T) { testHeavyForkedSync(t, 64, LightSync) }
func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -706,8 +699,6 @@ func TestBoundedForkedSync64Full(t *testing.T) { testBoundedForkedSync(t, 64, Fu
func TestBoundedForkedSync64Light(t *testing.T) { testBoundedForkedSync(t, 64, LightSync) }
func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -742,8 +733,6 @@ func TestBoundedHeavyForkedSync64Full(t *testing.T) { testBoundedHeavyForkedSync
func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSync(t, 64, LightSync) }
func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -769,8 +758,6 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
// Tests that an inactive downloader will not accept incoming block headers and
// bodies.
func TestInactiveDownloader62(t *testing.T) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -787,8 +774,6 @@ func TestInactiveDownloader62(t *testing.T) {
// Tests that an inactive downloader will not accept incoming block headers,
// bodies and receipts.
func TestInactiveDownloader63(t *testing.T) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -816,8 +801,6 @@ func TestCancel64Full(t *testing.T) { testCancel(t, 64, FullSync) }
func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
func testCancel(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -851,8 +834,6 @@ func TestMultiSynchronisation64Full(t *testing.T) { testMultiSynchronisation(t,
func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
@ -882,8 +863,6 @@ func TestMultiProtoSynchronisation64Full(t *testing.T) { testMultiProtoSync(t, 6
func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -923,8 +902,6 @@ func TestEmptyShortCircuit64Full(t *testing.T) { testEmptyShortCircuit(t, 64, Fu
func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
@ -978,8 +955,6 @@ func TestMissingHeaderAttack64Full(t *testing.T) { testMissingHeaderAttack(t, 64
func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -1012,8 +987,6 @@ func TestShiftedHeaderAttack64Full(t *testing.T) { testShiftedHeaderAttack(t, 64
//func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 64, LightSync) }
func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -1046,8 +1019,6 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
func TestInvalidHeaderRollback64Light(t *testing.T) { testInvalidHeaderRollback(t, 64, LightSync) }
func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -1144,8 +1115,6 @@ func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack
func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -1163,8 +1132,6 @@ func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDr
func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) }
func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
t.Parallel()
// Define the disconnection requirement for individual hash fetch errors
tests := []struct {
result error
@ -1224,8 +1191,6 @@ func TestSyncProgress64Full(t *testing.T) { testSyncProgress(t, 64, FullSync) }
func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -1310,8 +1275,6 @@ func TestForkedSyncProgress64Full(t *testing.T) { testForkedSyncProgress(t, 64,
func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -1388,8 +1351,6 @@ func TestFailedSyncProgress64Full(t *testing.T) { testFailedSyncProgress(t, 64,
func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -1464,8 +1425,6 @@ func TestFakedSyncProgress64Full(t *testing.T) { testFakedSyncProgress(t, 64, Fu
func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
defer tester.peerDb.Close()
@ -1534,8 +1493,6 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
// This test reproduces an issue where unexpected deliveries would
// block indefinitely if they arrived at the right time.
func TestDeliverHeadersHang(t *testing.T) {
t.Parallel()
testCases := []struct {
protocol int
syncMode SyncMode
@ -1549,7 +1506,6 @@ func TestDeliverHeadersHang(t *testing.T) {
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("protocol %d mode %v", tc.protocol, tc.syncMode), func(t *testing.T) {
t.Parallel()
testDeliverHeadersHang(t, tc.protocol, tc.syncMode)
})
}
@ -1713,8 +1669,6 @@ func TestCheckpointEnforcement64Fast(t *testing.T) { testCheckpointEnforcement(
func TestCheckpointEnforcement64Light(t *testing.T) { testCheckpointEnforcement(t, 64, LightSync) }
func testCheckpointEnforcement(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
// Create a new tester with a particular hard coded checkpoint block
tester := newTester()
defer tester.terminate()

View File

@ -11,6 +11,7 @@ import (
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types/accounts"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
@ -40,14 +41,23 @@ func SpawnHashStateStage(s *StageState, stateDB ethdb.Database, datadir string,
return err
}
}
if err := updateIntermediateHashes(s, stateDB, s.BlockNumber, syncHeadNumber, datadir, quit); err != nil {
return err
}
return s.DoneAndUpdate(stateDB, syncHeadNumber)
}
func UnwindHashStateStage(u *UnwindState, s *StageState, stateDB ethdb.Database, datadir string, quit chan struct{}) error {
if err := unwindHashStateStageImpl(u, s, stateDB, datadir, quit); err != nil {
func UnwindHashStateStage(u *UnwindState, s *StageState, db ethdb.Database, datadir string, quit chan struct{}) error {
if err := unwindHashStateStageImpl(u, s, db, datadir, quit); err != nil {
return err
}
if err := u.Done(stateDB); err != nil {
hash := rawdb.ReadCanonicalHash(db, u.UnwindPoint)
syncHeadHeader := rawdb.ReadHeader(db, hash, u.UnwindPoint)
expectedRootHash := syncHeadHeader.Root
if err := unwindIntermediateHashesStageImpl(u, s, db, datadir, expectedRootHash, quit); err != nil {
return err
}
if err := u.Done(db); err != nil {
return fmt.Errorf("unwind HashState: reset: %v", err)
}
return nil

View File

@ -74,16 +74,6 @@ func PrepareStagedSync(
return UnwindHashStateStage(u, s, stateDB, datadir, quitCh)
},
},
{
ID: stages.IntermediateHashes,
Description: "Generating intermediate hashes and validating final hash",
ExecFunc: func(s *StageState, u Unwinder) error {
return SpawnIntermediateHashesStage(s, stateDB, datadir, quitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
return UnwindIntermediateHashesStage(u, s, stateDB, datadir, quitCh)
},
},
{
ID: stages.AccountHistoryIndex,
Description: "Generating account history index",

View File

@ -33,8 +33,7 @@ const (
Bodies // Block bodies are downloaded, TxHash and UncleHash are getting verified
Senders // "From" recovered from signatures, bodies re-written
Execution // Executing each block w/o buildinf a trie
HashState // Apply Keccak256 to all the keys in the state
IntermediateHashes // Generate intermediate hashes
HashState // Apply Keccak256 to all the keys in the state, generate intermediate hashes
AccountHistoryIndex // Generating history index for accounts
StorageHistoryIndex // Generating history index for storage
TxLookup // Generating transactions lookup index