mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 19:50:36 +00:00
fix RPC notification, add boilerplate for tests (#1936)
* fix RPC notification, add boilerplate for tests * Fix test Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
parent
c4f746f54f
commit
7eefcbe6a6
@ -319,7 +319,7 @@ func NewStagedSync(
|
||||
controlServer.sendBodyRequest,
|
||||
controlServer.penalize,
|
||||
controlServer.updateHead,
|
||||
controlServer,
|
||||
controlServer.BroadcastNewBlock,
|
||||
bodyDownloadTimeout,
|
||||
*controlServer.chainConfig,
|
||||
batchSize,
|
||||
|
@ -70,7 +70,7 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey, func()) {
|
||||
diskdb := ethdb.NewMemDatabase()
|
||||
|
||||
key, _ := crypto.GenerateKey()
|
||||
txCacher := NewTxSenderCacher(runtime.NumCPU())
|
||||
txCacher := NewTxSenderCacher(1)
|
||||
pool := NewTxPool(TestTxPoolConfig, params.TestChainConfig, diskdb, txCacher)
|
||||
//nolint:errcheck
|
||||
pool.Start(1000000000, 0)
|
||||
|
@ -504,7 +504,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, blockNumb
|
||||
}
|
||||
notifyFrom := finishAtBefore
|
||||
if len(v) > 0 {
|
||||
notifyFrom = binary.BigEndian.Uint64(v)
|
||||
n := binary.BigEndian.Uint64(v)
|
||||
if n != 0 {
|
||||
notifyFrom = binary.BigEndian.Uint64(v)
|
||||
}
|
||||
}
|
||||
|
||||
err = d.stagedSyncState.Run(d.stateDB, nil)
|
||||
|
@ -487,7 +487,7 @@ func DefaultUnwindOrder() UnwindOrder {
|
||||
3, 4,
|
||||
// Unwinding of IHashes needs to happen after unwinding HashState
|
||||
6, 5,
|
||||
7, 8, 9, 10, 11,
|
||||
7, 8, 9, 10, 11, 13,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,9 +3,10 @@ package stagedsync
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
|
||||
"github.com/ledgerwatch/turbo-geth/ethdb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -736,7 +737,7 @@ func TestSnapshotUnwindOrderEqualDefault(t *testing.T) {
|
||||
for _, i := range snUnwindOrder {
|
||||
snUnwindIDs = append(snUnwindIDs, stagesWithSnapshots[len(stagesWithSnapshots)-i-2].ID)
|
||||
}
|
||||
for _, i := range unwindOrder {
|
||||
for _, i := range unwindOrder[:len(unwindOrder)-1] {
|
||||
unwindIDs = append(unwindIDs, defaultStages[len(defaultStages)-i-2].ID)
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,4 @@ import (
|
||||
"github.com/ledgerwatch/turbo-geth/core/types"
|
||||
)
|
||||
|
||||
type BlockPropagator interface {
|
||||
BroadcastNewBlock(ctx context.Context, block *types.Block, td *big.Int)
|
||||
}
|
||||
type BlockPropagator func(ctx context.Context, block *types.Block, td *big.Int)
|
||||
|
@ -128,7 +128,7 @@ func (bd *BodyDownload) RequestMoreBodies(db ethdb.Tx, blockNum uint64, currentT
|
||||
log.Error("Failed to ReadTd", "err", err, "number", block.NumberU64()-1, "hash", block.ParentHash())
|
||||
} else if parent != nil {
|
||||
td = new(big.Int).Add(block.Difficulty(), parent)
|
||||
go blockPropagator.BroadcastNewBlock(context.Background(), block, td)
|
||||
go blockPropagator(context.Background(), block, td)
|
||||
} else {
|
||||
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
|
||||
}
|
||||
|
114
turbo/stages/sentry_mock_test.go
Normal file
114
turbo/stages/sentry_mock_test.go
Normal file
@ -0,0 +1,114 @@
|
||||
package stages
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/big"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/c2h5oh/datasize"
|
||||
"github.com/holiman/uint256"
|
||||
"github.com/ledgerwatch/turbo-geth/common"
|
||||
"github.com/ledgerwatch/turbo-geth/consensus/ethash"
|
||||
"github.com/ledgerwatch/turbo-geth/core"
|
||||
"github.com/ledgerwatch/turbo-geth/core/types"
|
||||
"github.com/ledgerwatch/turbo-geth/core/vm"
|
||||
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
|
||||
"github.com/ledgerwatch/turbo-geth/ethdb"
|
||||
"github.com/ledgerwatch/turbo-geth/params"
|
||||
"github.com/ledgerwatch/turbo-geth/turbo/stages/bodydownload"
|
||||
"github.com/ledgerwatch/turbo-geth/turbo/stages/headerdownload"
|
||||
)
|
||||
|
||||
// passing tmpdir because it is renponsibility of the caller to clean it up
|
||||
func testStagedSync(tmpdir string) *stagedsync.StagedSync {
|
||||
ctx := context.Background()
|
||||
memDb := ethdb.NewMemDatabase()
|
||||
defer memDb.Close()
|
||||
db := memDb.RwKV()
|
||||
sm := ethdb.DefaultStorageMode
|
||||
engine := ethash.NewFaker()
|
||||
hd := headerdownload.NewHeaderDownload(1024 /* anchorLimit */, 1024 /* linkLimit */, engine)
|
||||
chainConfig := params.AllEthashProtocolChanges
|
||||
sendHeaderRequest := func(context.Context, *headerdownload.HeaderRequest) []byte {
|
||||
return nil
|
||||
}
|
||||
propagateNewBlockHashes := func(context.Context, []headerdownload.Announce) {
|
||||
}
|
||||
penalize := func(context.Context, []headerdownload.PenaltyItem) {
|
||||
}
|
||||
batchSize := 1 * datasize.MB
|
||||
increment := uint64(0)
|
||||
bd := bodydownload.NewBodyDownload(1024 /* outstandingLimit */, engine)
|
||||
sendBodyRequest := func(context.Context, *bodydownload.BodyRequest) []byte {
|
||||
return nil
|
||||
}
|
||||
updateHead := func(ctx context.Context, head uint64, hash common.Hash, td *uint256.Int) {
|
||||
}
|
||||
blockPropagator := func(ctx context.Context, block *types.Block, td *big.Int) {
|
||||
}
|
||||
blockDowloadTimeout := 10
|
||||
txCacher := core.NewTxSenderCacher(1)
|
||||
txPoolConfig := core.DefaultTxPoolConfig
|
||||
txPoolConfig.Journal = ""
|
||||
txPoolConfig.StartOnInit = true
|
||||
txPool := core.NewTxPool(txPoolConfig, chainConfig, memDb, txCacher)
|
||||
return NewStagedSync(ctx, sm,
|
||||
stagedsync.StageHeadersCfg(
|
||||
db,
|
||||
hd,
|
||||
*chainConfig,
|
||||
sendHeaderRequest,
|
||||
propagateNewBlockHashes,
|
||||
penalize,
|
||||
batchSize,
|
||||
increment,
|
||||
),
|
||||
stagedsync.StageBodiesCfg(
|
||||
db,
|
||||
bd,
|
||||
sendBodyRequest,
|
||||
penalize,
|
||||
updateHead,
|
||||
blockPropagator,
|
||||
blockDowloadTimeout,
|
||||
*chainConfig,
|
||||
batchSize,
|
||||
),
|
||||
stagedsync.StageSendersCfg(db, chainConfig),
|
||||
stagedsync.StageExecuteBlocksCfg(
|
||||
db,
|
||||
sm.Receipts,
|
||||
sm.CallTraces,
|
||||
batchSize,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
chainConfig,
|
||||
engine,
|
||||
&vm.Config{NoReceipts: !sm.Receipts},
|
||||
tmpdir,
|
||||
),
|
||||
stagedsync.StageHashStateCfg(db, tmpdir),
|
||||
stagedsync.StageTrieCfg(db, true, true, tmpdir),
|
||||
stagedsync.StageHistoryCfg(db, tmpdir),
|
||||
stagedsync.StageLogIndexCfg(db, tmpdir),
|
||||
stagedsync.StageCallTracesCfg(db, 0, batchSize, tmpdir, chainConfig, engine),
|
||||
stagedsync.StageTxLookupCfg(db, tmpdir),
|
||||
stagedsync.StageTxPoolCfg(db, txPool),
|
||||
stagedsync.StageFinishCfg(db, tmpdir),
|
||||
)
|
||||
}
|
||||
|
||||
func TestEmptyStageSync(t *testing.T) {
|
||||
tmpdir, err := ioutil.TempDir("", "stagesync-test")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
defer os.RemoveAll(tmpdir) // clean up
|
||||
testStagedSync(tmpdir)
|
||||
}
|
Loading…
Reference in New Issue
Block a user