From 51b120abdd73a78d65577e2f5cbe6caffa5c6477 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Fri, 14 Oct 2022 23:11:37 +0100 Subject: [PATCH] [txpool] Best function to accept onTopOf argument and indicate whenever it is too early to gather tx for a block (#685) Co-authored-by: Alexey Sharp --- txpool/pool.go | 10 +++++++--- txpool/txpool_grpc_server.go | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index c55125202..0ccbef124 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -605,7 +605,11 @@ func (p *TxPool) Started() bool { return p.started.Load() } // Best - returns top `n` elements of pending queue // id doesn't perform full copy of txs, hovewer underlying elements are immutable -func (p *TxPool) Best(n uint16, txs *types.TxsRlp, tx kv.Tx) error { +func (p *TxPool) Best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf uint64) (bool, error) { + // First wait for the corresponding block to arrive + if p.lastSeenBlock.Load() < onTopOf { + return false, nil // Too early + } p.lock.RLock() defer p.lock.RUnlock() @@ -621,7 +625,7 @@ func (p *TxPool) Best(n uint16, txs *types.TxsRlp, tx kv.Tx) error { } rlpTx, sender, isLocal, err := p.getRlpLocked(tx, mt.Tx.IDHash[:]) if err != nil { - return err + return false, err } if len(rlpTx) == 0 { p.pending.Remove(mt) @@ -633,7 +637,7 @@ func (p *TxPool) Best(n uint16, txs *types.TxsRlp, tx kv.Tx) error { j++ } txs.Resize(uint(j)) - return nil + return true, nil } func (p *TxPool) CountContent() (int, int, int) { diff --git a/txpool/txpool_grpc_server.go b/txpool/txpool_grpc_server.go index 8507b6df1..40ae62722 100644 --- a/txpool/txpool_grpc_server.go +++ b/txpool/txpool_grpc_server.go @@ -50,7 +50,7 @@ var TxPoolAPIVersion = &types2.VersionReply{Major: 1, Minor: 0, Patch: 0} type txPool interface { ValidateSerializedTxn(serializedTxn []byte) error - Best(n uint16, txs *types.TxsRlp, tx kv.Tx) error + Best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf uint64) (bool, error) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) AddLocalTxs(ctx context.Context, newTxs types.TxSlots, tx kv.Tx) ([]DiscardReason, error) deprecatedForEach(_ context.Context, f func(rlp, sender []byte, t SubPoolType), tx kv.Tx) @@ -154,7 +154,7 @@ func (s *GrpcServer) Pending(ctx context.Context, _ *emptypb.Empty) (*txpool_pro reply := &txpool_proto.PendingReply{} reply.Txs = make([]*txpool_proto.PendingReply_Tx, 0, 32) txSlots := types.TxsRlp{} - if err := s.txPool.Best(math.MaxInt16, &txSlots, tx); err != nil { + if _, err := s.txPool.Best(math.MaxInt16, &txSlots, tx, 0 /* onTopOf */); err != nil { return nil, err } var senderArr [20]byte