diff --git a/direct/sentry_client.go b/direct/sentry_client.go index f0f1bf4d6..ec30ca0f3 100644 --- a/direct/sentry_client.go +++ b/direct/sentry_client.go @@ -130,12 +130,7 @@ func (c *SentryClientRemote) HandShake(ctx context.Context, in *emptypb.Empty, o return reply, nil } func (c *SentryClientRemote) SetStatus(ctx context.Context, in *sentry.StatusData, opts ...grpc.CallOption) (*sentry.SetStatusReply, error) { - reply, err := c.SentryClient.SetStatus(ctx, in, opts...) - if err != nil { - return nil, err - } - - return reply, nil + return c.SentryClient.SetStatus(ctx, in, opts...) } func (c *SentryClientRemote) Messages(ctx context.Context, in *sentry.MessagesRequest, opts ...grpc.CallOption) (sentry.Sentry_MessagesClient, error) { diff --git a/txpool/fetch.go b/txpool/fetch.go index e738a15ff..0392eb36e 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -24,7 +24,6 @@ import ( "sync" "time" - "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" @@ -33,6 +32,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" ) // Fetch connects to sentry and implements eth/65 or eth/66 protocol regarding the transaction @@ -42,7 +42,6 @@ import ( type Fetch struct { ctx context.Context // Context used for cancellation and closing of the fetcher sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network - statusData *sentry.StatusData // Status data used for "handshaking" with sentries pool Pool // Transaction pool implementation coreDB kv.RoDB wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests) @@ -50,35 +49,22 @@ type Fetch struct { } type Timings struct { - propagateAllNewTxsEvery time.Duration - syncToNewPeersEvery time.Duration - broadcastLocalTransactionsEvery time.Duration + syncToNewPeersEvery time.Duration + logEvery time.Duration } var DefaultTimings = Timings{ - propagateAllNewTxsEvery: 5 * time.Second, - broadcastLocalTransactionsEvery: 2 * time.Minute, - syncToNewPeersEvery: 2 * time.Minute, + syncToNewPeersEvery: 2 * time.Minute, + logEvery: 30 * time.Second, } // NewFetch creates a new fetch object that will work with given sentry clients. Since the // SentryClient here is an interface, it is suitable for mocking in tests (mock will need // to implement all the functions of the SentryClient interface). -func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, genesisHash [32]byte, networkId uint64, forks []uint64, pool Pool, stateChangesClient remote.KVClient, db kv.RoDB) *Fetch { - statusData := &sentry.StatusData{ - NetworkId: networkId, - TotalDifficulty: gointerfaces.ConvertUint256IntToH256(uint256.NewInt(0)), - BestHash: gointerfaces.ConvertHashToH256(genesisHash), - MaxBlock: 0, - ForkData: &sentry.Forks{ - Genesis: gointerfaces.ConvertHashToH256(genesisHash), - Forks: forks, - }, - } +func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, pool Pool, stateChangesClient remote.KVClient, db kv.RoDB) *Fetch { return &Fetch{ ctx: ctx, sentryClients: sentryClients, - statusData: statusData, pool: pool, coreDB: db, stateChangesClient: stateChangesClient, @@ -108,7 +94,15 @@ func (f *Fetch) ConnectCore() { return default: } - f.handleStateChanges(f.ctx, f.stateChangesClient) + if err := f.handleStateChanges(f.ctx, f.stateChangesClient); err != nil { + s, ok := status.FromError(err) + retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) + if retryLater { + time.Sleep(time.Second) + continue + } + log.Warn("[txpool.handleStateChanges]", "err", err) + } } }() } @@ -120,8 +114,7 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) { return default: } - _, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true)) - if err != nil { + if _, err := sentryClient.HandShake(f.ctx, &emptypb.Empty{}, grpc.WaitForReady(true)); err != nil { s, ok := status.FromError(err) retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) if retryLater { @@ -301,8 +294,7 @@ func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) { return default: } - _, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true)) - if err != nil { + if _, err := sentryClient.HandShake(f.ctx, &emptypb.Empty{}, grpc.WaitForReady(true)); err != nil { s, ok := status.FromError(err) retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) if retryLater { @@ -371,41 +363,19 @@ func (f *Fetch) handleNewPeer(req *sentry.PeersReply) error { return nil } -func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) { +func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) error { streamCtx, cancel := context.WithCancel(ctx) defer cancel() stream, err := client.StateChanges(streamCtx, &remote.StateChangeRequest{WithStorage: false, WithTransactions: true}, grpc.WaitForReady(true)) if err != nil { - select { - case <-ctx.Done(): - return - default: - } - s, ok := status.FromError(err) - terminated := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) - if terminated { - return - } - time.Sleep(time.Second) - log.Warn("state changes", "err", err) + return err } for req, err := stream.Recv(); ; req, err = stream.Recv() { if err != nil { - select { - case <-ctx.Done(): - return - default: - } - s, ok := status.FromError(err) - terminated := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) - if terminated { - return - } - log.Warn("stream.Recv", "err", err) - return + return err } if req == nil { - return + return nil } parseCtx := NewTxParseContext() diff --git a/txpool/fetch_test.go b/txpool/fetch_test.go index ff6cdceda..61d06e151 100644 --- a/txpool/fetch_test.go +++ b/txpool/fetch_test.go @@ -28,7 +28,6 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" "github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/erigon-lib/kv/memdb" - "github.com/ledgerwatch/log/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -38,15 +37,11 @@ func TestFetch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var genesisHash [32]byte - var networkId uint64 = 1 - forks := []uint64{1, 5, 10} - m := NewMockSentry(ctx) sentryClient := direct.NewSentryClientDirect(direct.ETH66, m) pool := &PoolMock{} - fetch := NewFetch(ctx, []sentry.SentryClient{sentryClient}, genesisHash, networkId, forks, pool, &remote.KVClientMock{}, nil) + fetch := NewFetch(ctx, []sentry.SentryClient{sentryClient}, pool, &remote.KVClientMock{}, nil) var wg sync.WaitGroup fetch.SetWaitGroup(&wg) m.StreamWg.Add(2) @@ -69,13 +64,11 @@ func TestFetch(t *testing.T) { } func TestSendTxPropagate(t *testing.T) { - logger := log.New() - ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() t.Run("few remote byHash", func(t *testing.T) { m := NewMockSentry(ctx) - send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger) + send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil) send.BroadcastRemotePooledTxs(toHashes([32]byte{1}, [32]byte{42})) calls := m.SendMessageToRandomPeersCalls() @@ -86,7 +79,7 @@ func TestSendTxPropagate(t *testing.T) { }) t.Run("much remote byHash", func(t *testing.T) { m := NewMockSentry(ctx) - send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger) + send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil) list := make(Hashes, p2pTxPacketLimit*3) for i := 0; i < len(list); i += 32 { b := []byte(fmt.Sprintf("%x", i)) @@ -106,7 +99,7 @@ func TestSendTxPropagate(t *testing.T) { m.SendMessageToAllFunc = func(contextMoqParam context.Context, outboundMessageData *sentry.OutboundMessageData) (*sentry.SentPeers, error) { return &sentry.SentPeers{Peers: make([]*types.H512, 5)}, nil } - send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger) + send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil) send.BroadcastLocalPooledTxs(toHashes([32]byte{1}, [32]byte{42})) calls := m.SendMessageToAllCalls() @@ -121,7 +114,7 @@ func TestSendTxPropagate(t *testing.T) { m.SendMessageToAllFunc = func(contextMoqParam context.Context, outboundMessageData *sentry.OutboundMessageData) (*sentry.SentPeers, error) { return &sentry.SentPeers{Peers: make([]*types.H512, 5)}, nil } - send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger) + send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil) expectPeers := toPeerIDs(1, 2, 42) send.PropagatePooledTxsToPeersList(expectPeers, toHashes([32]byte{1}, [32]byte{42})) @@ -140,8 +133,6 @@ func TestOnNewBlock(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() db := memdb.NewTestDB(t) - var genesisHash [32]byte - var networkId uint64 = 1 i := 0 stream := &remote.KV_StateChangesClientMock{ @@ -159,8 +150,9 @@ func TestOnNewBlock(t *testing.T) { }, } pool := &PoolMock{} - fetch := NewFetch(ctx, nil, genesisHash, networkId, nil, pool, stateChanges, db) - fetch.handleStateChanges(ctx, stateChanges) + fetch := NewFetch(ctx, nil, pool, stateChanges, db) + err := fetch.handleStateChanges(ctx, stateChanges) + assert.ErrorIs(t, io.EOF, err) assert.Equal(t, 1, len(pool.OnNewBlockCalls())) assert.Equal(t, 3, len(pool.OnNewBlockCalls()[0].MinedTxs.txs)) } diff --git a/txpool/pool.go b/txpool/pool.go index ba661f7bd..8270d18dd 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -29,6 +29,7 @@ import ( "github.com/hashicorp/golang-lru/simplelru" "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/log/v3" "go.uber.org/atomic" ) @@ -160,6 +161,11 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) { }, nil } +func (p *TxPool) logStats() { + p.lock.RLock() + defer p.lock.RUnlock() + log.Info(fmt.Sprintf("[txpool] queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.pending.Len(), PendingSubPoolLimit)) +} func (p *TxPool) GetRlp(hash []byte) []byte { p.lock.RLock() defer p.lock.RUnlock() @@ -837,15 +843,12 @@ func (p *WorstQueue) Pop() interface{} { // promote/demote transactions // reorgs func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Send, timings Timings) { - propagateAllNewTxsEvery := time.NewTicker(timings.propagateAllNewTxsEvery) - defer propagateAllNewTxsEvery.Stop() + logEvery := time.NewTicker(timings.logEvery) + defer logEvery.Stop() syncToNewPeersEvery := time.NewTicker(timings.syncToNewPeersEvery) defer syncToNewPeersEvery.Stop() - broadcastLocalTransactionsEvery := time.NewTicker(timings.broadcastLocalTransactionsEvery) - defer broadcastLocalTransactionsEvery.Stop() - localTxHashes := make([]byte, 0, 128) remoteTxHashes := make([]byte, 0, 128) @@ -853,6 +856,8 @@ func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Sen select { case <-ctx.Done(): return + case <-logEvery.C: + p.logStats() case h := <-newTxs: // first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers localTxHashes = localTxHashes[:0] @@ -879,6 +884,7 @@ func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Sen } } +// commitIsLocalHistory - use u64 sequence as keys to preserve order func commitIsLocalHistory(db kv.RwDB, commited time.Time, localsHistory *simplelru.LRU) error { if db == nil || time.Since(commited) < 30*time.Second { return nil diff --git a/txpool/send.go b/txpool/send.go index a1e91cc72..a8591feda 100644 --- a/txpool/send.go +++ b/txpool/send.go @@ -38,16 +38,14 @@ type Send struct { sentryClients []SentryClient // sentry clients that will be used for accessing the network pool Pool - logger log.Logger - wg *sync.WaitGroup + wg *sync.WaitGroup } -func NewSend(ctx context.Context, sentryClients []SentryClient, pool Pool, logger log.Logger) *Send { +func NewSend(ctx context.Context, sentryClients []SentryClient, pool Pool) *Send { return &Send{ ctx: ctx, pool: pool, sentryClients: sentryClients, - logger: logger.New("at", "TxPool.Send"), } } @@ -102,7 +100,7 @@ func (f *Send) BroadcastLocalPooledTxs(txs Hashes) (sentToPeers int) { peers, err := sentryClient.SendMessageToAll(f.ctx, req65, &grpc.EmptyCallOption{}) if err != nil { - f.logger.Warn("sentry response", "err", err) + log.Warn("[txpool.send] BroadcastLocalPooledTxs", "err", err) } avgPeersPerSent65 += len(peers.Peers) @@ -115,7 +113,7 @@ func (f *Send) BroadcastLocalPooledTxs(txs Hashes) (sentToPeers int) { } peers, err := sentryClient.SendMessageToAll(f.ctx, req66, &grpc.EmptyCallOption{}) if err != nil { - f.logger.Warn("sentry response", "err", err) + log.Warn("[txpool.send] BroadcastLocalPooledTxs", "err", err) } avgPeersPerSent66 += len(peers.Peers) } @@ -161,7 +159,7 @@ func (f *Send) BroadcastRemotePooledTxs(txs Hashes) { } if _, err := sentryClient.SendMessageToRandomPeers(f.ctx, req65, &grpc.EmptyCallOption{}); err != nil { - f.logger.Warn("sentry response", "err", err) + log.Warn("[txpool.send] BroadcastRemotePooledTxs", "err", err) } case direct.ETH66: @@ -175,7 +173,7 @@ func (f *Send) BroadcastRemotePooledTxs(txs Hashes) { } } if _, err := sentryClient.SendMessageToRandomPeers(f.ctx, req66, &grpc.EmptyCallOption{}); err != nil { - f.logger.Warn("sentry response", "err", err) + log.Warn("[txpool.send] BroadcastRemotePooledTxs", "err", err) } } } @@ -217,7 +215,7 @@ func (f *Send) PropagatePooledTxsToPeersList(peers []PeerID, txs []byte) { } if _, err := sentryClient.SendMessageById(f.ctx, req65, &grpc.EmptyCallOption{}); err != nil { - f.logger.Warn("sentry response", "err", err) + log.Warn("[txpool.send] PropagatePooledTxsToPeersList", "err", err) } case direct.ETH66: @@ -229,7 +227,7 @@ func (f *Send) PropagatePooledTxsToPeersList(peers []PeerID, txs []byte) { }, } if _, err := sentryClient.SendMessageById(f.ctx, req66, &grpc.EmptyCallOption{}); err != nil { - f.logger.Warn("sentry response", "err", err) + log.Warn("[txpool.send] PropagatePooledTxsToPeersList", "err", err) } } }