Merge pull request #31 from ledgerwatch/pool17

remove statusData from fetch
This commit is contained in:
Alex Sharov 2021-08-14 16:16:44 +07:00 committed by GitHub
commit 99d57caee2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 88 deletions

View File

@ -130,12 +130,7 @@ func (c *SentryClientRemote) HandShake(ctx context.Context, in *emptypb.Empty, o
return reply, nil
}
func (c *SentryClientRemote) SetStatus(ctx context.Context, in *sentry.StatusData, opts ...grpc.CallOption) (*sentry.SetStatusReply, error) {
reply, err := c.SentryClient.SetStatus(ctx, in, opts...)
if err != nil {
return nil, err
}
return reply, nil
return c.SentryClient.SetStatus(ctx, in, opts...)
}
func (c *SentryClientRemote) Messages(ctx context.Context, in *sentry.MessagesRequest, opts ...grpc.CallOption) (sentry.Sentry_MessagesClient, error) {

View File

@ -24,7 +24,6 @@ import (
"sync"
"time"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
@ -33,6 +32,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
)
// Fetch connects to sentry and implements eth/65 or eth/66 protocol regarding the transaction
@ -42,7 +42,6 @@ import (
type Fetch struct {
ctx context.Context // Context used for cancellation and closing of the fetcher
sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network
statusData *sentry.StatusData // Status data used for "handshaking" with sentries
pool Pool // Transaction pool implementation
coreDB kv.RoDB
wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests)
@ -50,35 +49,22 @@ type Fetch struct {
}
type Timings struct {
propagateAllNewTxsEvery time.Duration
syncToNewPeersEvery time.Duration
broadcastLocalTransactionsEvery time.Duration
syncToNewPeersEvery time.Duration
logEvery time.Duration
}
var DefaultTimings = Timings{
propagateAllNewTxsEvery: 5 * time.Second,
broadcastLocalTransactionsEvery: 2 * time.Minute,
syncToNewPeersEvery: 2 * time.Minute,
syncToNewPeersEvery: 2 * time.Minute,
logEvery: 30 * time.Second,
}
// NewFetch creates a new fetch object that will work with given sentry clients. Since the
// SentryClient here is an interface, it is suitable for mocking in tests (mock will need
// to implement all the functions of the SentryClient interface).
func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, genesisHash [32]byte, networkId uint64, forks []uint64, pool Pool, stateChangesClient remote.KVClient, db kv.RoDB) *Fetch {
statusData := &sentry.StatusData{
NetworkId: networkId,
TotalDifficulty: gointerfaces.ConvertUint256IntToH256(uint256.NewInt(0)),
BestHash: gointerfaces.ConvertHashToH256(genesisHash),
MaxBlock: 0,
ForkData: &sentry.Forks{
Genesis: gointerfaces.ConvertHashToH256(genesisHash),
Forks: forks,
},
}
func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, pool Pool, stateChangesClient remote.KVClient, db kv.RoDB) *Fetch {
return &Fetch{
ctx: ctx,
sentryClients: sentryClients,
statusData: statusData,
pool: pool,
coreDB: db,
stateChangesClient: stateChangesClient,
@ -108,7 +94,15 @@ func (f *Fetch) ConnectCore() {
return
default:
}
f.handleStateChanges(f.ctx, f.stateChangesClient)
if err := f.handleStateChanges(f.ctx, f.stateChangesClient); err != nil {
s, ok := status.FromError(err)
retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)
if retryLater {
time.Sleep(time.Second)
continue
}
log.Warn("[txpool.handleStateChanges]", "err", err)
}
}
}()
}
@ -120,8 +114,7 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
return
default:
}
_, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true))
if err != nil {
if _, err := sentryClient.HandShake(f.ctx, &emptypb.Empty{}, grpc.WaitForReady(true)); err != nil {
s, ok := status.FromError(err)
retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)
if retryLater {
@ -301,8 +294,7 @@ func (f *Fetch) receivePeerLoop(sentryClient sentry.SentryClient) {
return
default:
}
_, err := sentryClient.SetStatus(f.ctx, f.statusData, grpc.WaitForReady(true))
if err != nil {
if _, err := sentryClient.HandShake(f.ctx, &emptypb.Empty{}, grpc.WaitForReady(true)); err != nil {
s, ok := status.FromError(err)
retryLater := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)
if retryLater {
@ -371,41 +363,19 @@ func (f *Fetch) handleNewPeer(req *sentry.PeersReply) error {
return nil
}
func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) {
func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) error {
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.StateChanges(streamCtx, &remote.StateChangeRequest{WithStorage: false, WithTransactions: true}, grpc.WaitForReady(true))
if err != nil {
select {
case <-ctx.Done():
return
default:
}
s, ok := status.FromError(err)
terminated := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF)
if terminated {
return
}
time.Sleep(time.Second)
log.Warn("state changes", "err", err)
return err
}
for req, err := stream.Recv(); ; req, err = stream.Recv() {
if err != nil {
select {
case <-ctx.Done():
return
default:
}
s, ok := status.FromError(err)
terminated := (ok && s.Code() == codes.Canceled) || errors.Is(err, io.EOF)
if terminated {
return
}
log.Warn("stream.Recv", "err", err)
return
return err
}
if req == nil {
return
return nil
}
parseCtx := NewTxParseContext()

View File

@ -28,7 +28,6 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/log/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
@ -38,15 +37,11 @@ func TestFetch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var genesisHash [32]byte
var networkId uint64 = 1
forks := []uint64{1, 5, 10}
m := NewMockSentry(ctx)
sentryClient := direct.NewSentryClientDirect(direct.ETH66, m)
pool := &PoolMock{}
fetch := NewFetch(ctx, []sentry.SentryClient{sentryClient}, genesisHash, networkId, forks, pool, &remote.KVClientMock{}, nil)
fetch := NewFetch(ctx, []sentry.SentryClient{sentryClient}, pool, &remote.KVClientMock{}, nil)
var wg sync.WaitGroup
fetch.SetWaitGroup(&wg)
m.StreamWg.Add(2)
@ -69,13 +64,11 @@ func TestFetch(t *testing.T) {
}
func TestSendTxPropagate(t *testing.T) {
logger := log.New()
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
t.Run("few remote byHash", func(t *testing.T) {
m := NewMockSentry(ctx)
send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger)
send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil)
send.BroadcastRemotePooledTxs(toHashes([32]byte{1}, [32]byte{42}))
calls := m.SendMessageToRandomPeersCalls()
@ -86,7 +79,7 @@ func TestSendTxPropagate(t *testing.T) {
})
t.Run("much remote byHash", func(t *testing.T) {
m := NewMockSentry(ctx)
send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger)
send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil)
list := make(Hashes, p2pTxPacketLimit*3)
for i := 0; i < len(list); i += 32 {
b := []byte(fmt.Sprintf("%x", i))
@ -106,7 +99,7 @@ func TestSendTxPropagate(t *testing.T) {
m.SendMessageToAllFunc = func(contextMoqParam context.Context, outboundMessageData *sentry.OutboundMessageData) (*sentry.SentPeers, error) {
return &sentry.SentPeers{Peers: make([]*types.H512, 5)}, nil
}
send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger)
send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil)
send.BroadcastLocalPooledTxs(toHashes([32]byte{1}, [32]byte{42}))
calls := m.SendMessageToAllCalls()
@ -121,7 +114,7 @@ func TestSendTxPropagate(t *testing.T) {
m.SendMessageToAllFunc = func(contextMoqParam context.Context, outboundMessageData *sentry.OutboundMessageData) (*sentry.SentPeers, error) {
return &sentry.SentPeers{Peers: make([]*types.H512, 5)}, nil
}
send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil, logger)
send := NewSend(ctx, []SentryClient{direct.NewSentryClientDirect(direct.ETH66, m)}, nil)
expectPeers := toPeerIDs(1, 2, 42)
send.PropagatePooledTxsToPeersList(expectPeers, toHashes([32]byte{1}, [32]byte{42}))
@ -140,8 +133,6 @@ func TestOnNewBlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db := memdb.NewTestDB(t)
var genesisHash [32]byte
var networkId uint64 = 1
i := 0
stream := &remote.KV_StateChangesClientMock{
@ -159,8 +150,9 @@ func TestOnNewBlock(t *testing.T) {
},
}
pool := &PoolMock{}
fetch := NewFetch(ctx, nil, genesisHash, networkId, nil, pool, stateChanges, db)
fetch.handleStateChanges(ctx, stateChanges)
fetch := NewFetch(ctx, nil, pool, stateChanges, db)
err := fetch.handleStateChanges(ctx, stateChanges)
assert.ErrorIs(t, io.EOF, err)
assert.Equal(t, 1, len(pool.OnNewBlockCalls()))
assert.Equal(t, 3, len(pool.OnNewBlockCalls()[0].MinedTxs.txs))
}

View File

@ -29,6 +29,7 @@ import (
"github.com/hashicorp/golang-lru/simplelru"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"
)
@ -160,6 +161,11 @@ func New(newTxs chan Hashes, db kv.RwDB) (*TxPool, error) {
}, nil
}
func (p *TxPool) logStats() {
p.lock.RLock()
defer p.lock.RUnlock()
log.Info(fmt.Sprintf("[txpool] queues size: pending=%d/%d, baseFee=%d/%d, queued=%d/%d", p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.pending.Len(), PendingSubPoolLimit))
}
func (p *TxPool) GetRlp(hash []byte) []byte {
p.lock.RLock()
defer p.lock.RUnlock()
@ -837,15 +843,12 @@ func (p *WorstQueue) Pop() interface{} {
// promote/demote transactions
// reorgs
func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Send, timings Timings) {
propagateAllNewTxsEvery := time.NewTicker(timings.propagateAllNewTxsEvery)
defer propagateAllNewTxsEvery.Stop()
logEvery := time.NewTicker(timings.logEvery)
defer logEvery.Stop()
syncToNewPeersEvery := time.NewTicker(timings.syncToNewPeersEvery)
defer syncToNewPeersEvery.Stop()
broadcastLocalTransactionsEvery := time.NewTicker(timings.broadcastLocalTransactionsEvery)
defer broadcastLocalTransactionsEvery.Stop()
localTxHashes := make([]byte, 0, 128)
remoteTxHashes := make([]byte, 0, 128)
@ -853,6 +856,8 @@ func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Sen
select {
case <-ctx.Done():
return
case <-logEvery.C:
p.logStats()
case h := <-newTxs:
// first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers
localTxHashes = localTxHashes[:0]
@ -879,6 +884,7 @@ func BroadcastLoop(ctx context.Context, p *TxPool, newTxs chan Hashes, send *Sen
}
}
// commitIsLocalHistory - use u64 sequence as keys to preserve order
func commitIsLocalHistory(db kv.RwDB, commited time.Time, localsHistory *simplelru.LRU) error {
if db == nil || time.Since(commited) < 30*time.Second {
return nil

View File

@ -38,16 +38,14 @@ type Send struct {
sentryClients []SentryClient // sentry clients that will be used for accessing the network
pool Pool
logger log.Logger
wg *sync.WaitGroup
wg *sync.WaitGroup
}
func NewSend(ctx context.Context, sentryClients []SentryClient, pool Pool, logger log.Logger) *Send {
func NewSend(ctx context.Context, sentryClients []SentryClient, pool Pool) *Send {
return &Send{
ctx: ctx,
pool: pool,
sentryClients: sentryClients,
logger: logger.New("at", "TxPool.Send"),
}
}
@ -102,7 +100,7 @@ func (f *Send) BroadcastLocalPooledTxs(txs Hashes) (sentToPeers int) {
peers, err := sentryClient.SendMessageToAll(f.ctx, req65, &grpc.EmptyCallOption{})
if err != nil {
f.logger.Warn("sentry response", "err", err)
log.Warn("[txpool.send] BroadcastLocalPooledTxs", "err", err)
}
avgPeersPerSent65 += len(peers.Peers)
@ -115,7 +113,7 @@ func (f *Send) BroadcastLocalPooledTxs(txs Hashes) (sentToPeers int) {
}
peers, err := sentryClient.SendMessageToAll(f.ctx, req66, &grpc.EmptyCallOption{})
if err != nil {
f.logger.Warn("sentry response", "err", err)
log.Warn("[txpool.send] BroadcastLocalPooledTxs", "err", err)
}
avgPeersPerSent66 += len(peers.Peers)
}
@ -161,7 +159,7 @@ func (f *Send) BroadcastRemotePooledTxs(txs Hashes) {
}
if _, err := sentryClient.SendMessageToRandomPeers(f.ctx, req65, &grpc.EmptyCallOption{}); err != nil {
f.logger.Warn("sentry response", "err", err)
log.Warn("[txpool.send] BroadcastRemotePooledTxs", "err", err)
}
case direct.ETH66:
@ -175,7 +173,7 @@ func (f *Send) BroadcastRemotePooledTxs(txs Hashes) {
}
}
if _, err := sentryClient.SendMessageToRandomPeers(f.ctx, req66, &grpc.EmptyCallOption{}); err != nil {
f.logger.Warn("sentry response", "err", err)
log.Warn("[txpool.send] BroadcastRemotePooledTxs", "err", err)
}
}
}
@ -217,7 +215,7 @@ func (f *Send) PropagatePooledTxsToPeersList(peers []PeerID, txs []byte) {
}
if _, err := sentryClient.SendMessageById(f.ctx, req65, &grpc.EmptyCallOption{}); err != nil {
f.logger.Warn("sentry response", "err", err)
log.Warn("[txpool.send] PropagatePooledTxsToPeersList", "err", err)
}
case direct.ETH66:
@ -229,7 +227,7 @@ func (f *Send) PropagatePooledTxsToPeersList(peers []PeerID, txs []byte) {
},
}
if _, err := sentryClient.SendMessageById(f.ctx, req66, &grpc.EmptyCallOption{}); err != nil {
f.logger.Warn("sentry response", "err", err)
log.Warn("[txpool.send] PropagatePooledTxsToPeersList", "err", err)
}
}
}