mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-07 11:32:20 +00:00
94f4ea805d
the root cause is that when `inMemoryExecution` lambda gets created in the `eth/backend.go`, it captures the reference of `backend.notifications`, and so the execution of side-forks actually adds notifications to there, and it all gets sent out to tx pool (and RPC daemon) at the end of the stage loop (regardless of whether there was forkchoice update or not) so we can create a separate notification, but then somehow flush it to the "main" nofitications when the in-memory exec state is flushed Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
64 lines
2.2 KiB
Go
64 lines
2.2 KiB
Go
package commands
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
|
|
"github.com/ledgerwatch/erigon-lib/direct"
|
|
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
|
|
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcservices"
|
|
"github.com/ledgerwatch/erigon/common"
|
|
"github.com/ledgerwatch/erigon/core"
|
|
"github.com/ledgerwatch/erigon/core/types"
|
|
"github.com/ledgerwatch/erigon/eth/protocols/eth"
|
|
"github.com/ledgerwatch/erigon/ethdb/privateapi"
|
|
"github.com/ledgerwatch/erigon/rlp"
|
|
"github.com/ledgerwatch/erigon/turbo/rpchelper"
|
|
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
|
|
"github.com/ledgerwatch/erigon/turbo/stages"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestEthSubscribe(t *testing.T) {
|
|
m, require := stages.Mock(t), require.New(t)
|
|
chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 7, func(i int, b *core.BlockGen) {
|
|
b.SetCoinbase(common.Address{1})
|
|
}, false /* intermediateHashes */)
|
|
require.NoError(err)
|
|
|
|
b, err := rlp.EncodeToBytes(ð.BlockHeadersPacket66{
|
|
RequestId: 1,
|
|
BlockHeadersPacket: chain.Headers,
|
|
})
|
|
require.NoError(err)
|
|
|
|
m.ReceiveWg.Add(1)
|
|
for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_BLOCK_HEADERS_66, Data: b, PeerId: m.PeerId}) {
|
|
require.NoError(err)
|
|
}
|
|
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
|
|
|
|
ctx := context.Background()
|
|
backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false)
|
|
backendClient := direct.NewEthBackendClientDirect(backendServer)
|
|
backend := rpcservices.NewRemoteBackend(backendClient, m.DB, snapshotsync.NewBlockReader())
|
|
ff := rpchelper.New(ctx, backend, nil, nil, func() {})
|
|
|
|
newHeads := make(chan *types.Header)
|
|
id := ff.SubscribeNewHeads(newHeads)
|
|
defer ff.UnsubscribeHeads(id)
|
|
|
|
initialCycle := true
|
|
highestSeenHeader := chain.TopBlock.NumberU64()
|
|
if _, err := stages.StageLoopStep(m.Ctx, m.ChainConfig, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
for i := uint64(1); i <= highestSeenHeader; i++ {
|
|
header := <-newHeads
|
|
fmt.Printf("Got header %d\n", header.Number.Uint64())
|
|
require.Equal(i, header.Number.Uint64())
|
|
}
|
|
}
|