fix integration (#999)

This commit is contained in:
Alex Sharov 2020-08-29 15:27:20 +07:00 committed by GitHub
parent 8c9a55bb21
commit 5ce73d438a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 63 deletions

View File

@ -85,7 +85,6 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
}
}
log.Info("cycle1: begin transaction")
tx, errBegin := db.Begin()
if errBegin != nil {
return errBegin
@ -95,47 +94,10 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
bc, st, progress := newSync(ch, db, tx, changeSetHook)
defer bc.Stop()
log.Info("cycle1: commit transaction")
if _, err := tx.Commit(); err != nil {
if err := tx.CommitAndBegin(); err != nil {
return err
}
st.BeforeStageRun(stages.Execution, func() error {
if hasTx, ok := tx.(ethdb.HasTx); ok && hasTx.Tx() != nil {
return nil
}
log.Info("cycle: begin transaction")
var errTx error
tx, errTx = tx.Begin()
return errTx
})
st.BeforeStageRun(stages.TxPool, func() error {
log.Info("cycle: commit transaction")
var errTx error
_, errTx = tx.Commit()
return errTx
})
st.OnBeforeUnwind(func(id stages.SyncStage) error {
if hasTx, ok := tx.(ethdb.HasTx); ok && hasTx.Tx() != nil {
return nil
}
if id < stages.Bodies || id >= stages.TxPool {
return nil
}
log.Info("cycle unwind: begin transaction")
var errTx error
tx, errTx = tx.Begin()
return errTx
})
st.BeforeStageUnwind(stages.TxPool, func() error {
if hasTx, ok := tx.(ethdb.HasTx); ok && hasTx.Tx() == nil {
return nil
}
log.Info("cycle unwind: commit transaction")
_, errCommit := tx.Commit()
return errCommit
})
st.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders)
senderStageProgress := progress(stages.Senders).BlockNumber
@ -152,6 +114,10 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
default:
}
if err := tx.CommitAndBegin(); err != nil {
return err
}
// All stages forward to `execStage + unwindEvery` block
execToBlock := progress(stages.Execution).BlockNumber + unwindEvery
if execToBlock > stopAt {
@ -172,7 +138,7 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
}
for blockN := range expectedAccountChanges {
if err := checkChangeSet(db, blockN, expectedAccountChanges[blockN], expectedStorageChanges[blockN]); err != nil {
if err := checkChangeSet(tx, blockN, expectedAccountChanges[blockN], expectedStorageChanges[blockN]); err != nil {
return err
}
delete(expectedAccountChanges, blockN)
@ -187,7 +153,7 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
execStage := progress(stages.Execution)
to := execStage.BlockNumber - unwind
if err := st.UnwindTo(to, db); err != nil {
if err := st.UnwindTo(to, tx); err != nil {
return err
}
}
@ -195,8 +161,8 @@ func syncBySmallSteps(ctx context.Context, chaindata string) error {
return nil
}
func checkChangeSet(db *ethdb.ObjectDatabase, blockNum uint64, expectedAccountChanges []byte, expectedStorageChanges []byte) error {
dbAccountChanges, err := db.GetChangeSetByBlock(false /* storage */, blockNum)
func checkChangeSet(db ethdb.Getter, blockNum uint64, expectedAccountChanges []byte, expectedStorageChanges []byte) error {
dbAccountChanges, err := ethdb.GetChangeSetByBlock(db, false /* storage */, blockNum)
if err != nil {
return err
}
@ -219,7 +185,7 @@ func checkChangeSet(db *ethdb.ObjectDatabase, blockNum uint64, expectedAccountCh
return fmt.Errorf("check change set failed")
}
dbStorageChanges, err := db.GetChangeSetByBlock(true /* storage */, blockNum)
dbStorageChanges, err := ethdb.GetChangeSetByBlock(db, true /* storage */, blockNum)
if err != nil {
return err
}

View File

@ -107,7 +107,7 @@ func CheckChangeSets(genesis *core.Genesis, blockNum uint64, chaindata string, h
return err
}
dbAccountChanges, err := historyDb.GetChangeSetByBlock(false /* storage */, blockNum)
dbAccountChanges, err := ethdb.GetChangeSetByBlock(historyDb, false /* storage */, blockNum)
if err != nil {
return err
}
@ -142,7 +142,7 @@ func CheckChangeSets(genesis *core.Genesis, blockNum uint64, chaindata string, h
}
}
dbStorageChanges, err := historyDb.GetChangeSetByBlock(true /* storage */, blockNum)
dbStorageChanges, err := ethdb.GetChangeSetByBlock(historyDb, true /* storage */, blockNum)
if err != nil {
return err
}

View File

@ -20,6 +20,7 @@ package ethdb
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"
@ -188,26 +189,13 @@ func (db *ObjectDatabase) GetIndexChunk(bucket string, key []byte, timestamp uin
return dat, err
}
// getChangeSetByBlockNoLock returns changeset by block and dbi
func (db *ObjectDatabase) GetChangeSetByBlock(storage bool, timestamp uint64) ([]byte, error) {
func GetChangeSetByBlock(db Getter, storage bool, timestamp uint64) ([]byte, error) {
key := dbutils.EncodeTimestamp(timestamp)
var dat []byte
err := db.kv.View(context.Background(), func(tx Tx) error {
v, err := tx.Get(dbutils.ChangeSetByIndexBucket(storage), key)
if err != nil {
return err
}
if v != nil {
dat = make([]byte, len(v))
copy(dat, v)
}
return nil
})
if err != nil {
v, err := db.Get(dbutils.ChangeSetByIndexBucket(storage), key)
if err != nil && !errors.Is(ErrKeyNotFound, err) {
return nil, err
}
return dat, nil
return v, nil
}
func (db *ObjectDatabase) Walk(bucket string, startkey []byte, fixedbits int, walker func(k, v []byte) (bool, error)) error {