From 34e6ca3c3532af948592854c657bd63b380b91af Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 25 Nov 2021 00:13:17 +0700 Subject: [PATCH] Pool: support multi sentry (#191) * save * save * save --- txpool/fetch.go | 105 ++++++++++++++++++++++++++++++------------------ 1 file changed, 66 insertions(+), 39 deletions(-) diff --git a/txpool/fetch.go b/txpool/fetch.go index fc2683967..0af5b2d96 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -42,15 +42,18 @@ import ( // genesis hash and list of forks, but with zero max block and total difficulty // Sentry should have a logic not to overwrite statusData with messages from tx pool type Fetch struct { - ctx context.Context // Context used for cancellation and closing of the fetcher - sentryClients []direct.SentryClient // sentry clients that will be used for accessing the network - pool Pool // Transaction pool implementation - coreDB kv.RoDB - db kv.RwDB - wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests) - stateChangesClient StateChangesClient - stateChangesParseCtx *TxParseContext - pooledTxsParseCtx *TxParseContext + ctx context.Context // Context used for cancellation and closing of the fetcher + sentryClients []direct.SentryClient // sentry clients that will be used for accessing the network + pool Pool // Transaction pool implementation + coreDB kv.RoDB + db kv.RwDB + wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests) + stateChangesClient StateChangesClient + + stateChangesParseCtx *TxParseContext + stateChangesParseCtxLock sync.Mutex + pooledTxsParseCtx *TxParseContext + pooledTxsParseCtxLock sync.Mutex } type StateChangesClient interface { @@ -81,25 +84,28 @@ func (f *Fetch) SetWaitGroup(wg *sync.WaitGroup) { f.wg = wg } +func (f *Fetch) threadSafeParsePooledTxn(cb func(*TxParseContext) error) error { + f.pooledTxsParseCtxLock.Lock() + defer f.pooledTxsParseCtxLock.Unlock() + return cb(f.pooledTxsParseCtx) +} + +func (f *Fetch) threadSafeParseStateChangeTxn(cb func(*TxParseContext) error) error { + f.stateChangesParseCtxLock.Lock() + defer f.stateChangesParseCtxLock.Unlock() + return cb(f.stateChangesParseCtx) +} + // ConnectSentries initialises connection to the sentry func (f *Fetch) ConnectSentries() { - //TODO: fix race in parse ctx - 2 sentries causing it - go func(i int) { - f.receiveMessageLoop(f.sentryClients[i]) - }(0) - go func(i int) { - f.receivePeerLoop(f.sentryClients[i]) - }(0) - /* - for i := range f.sentryClients { - go func(i int) { - f.receiveMessageLoop(f.sentryClients[i]) - }(i) - go func(i int) { - f.receivePeerLoop(f.sentryClients[i]) - }(i) - } - */ + for i := range f.sentryClients { + go func(i int) { + f.receiveMessageLoop(f.sentryClients[i]) + }(i) + go func(i int) { + f.receivePeerLoop(f.sentryClients[i]) + }(i) + } } func (f *Fetch) ConnectCore() { go func() { @@ -321,24 +327,39 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes } case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_POOLED_TRANSACTIONS_66, sentry.MessageId_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_66: txs := TxSlots{} - f.pooledTxsParseCtx.ValidateHash(func(hash []byte) error { - known, err := f.pool.IdHashKnown(tx, hash) - if err != nil { - return err - } - if known { - return ErrRejected - } + if err := f.threadSafeParsePooledTxn(func(parseContext *TxParseContext) error { + parseContext.ValidateHash(func(hash []byte) error { + known, err := f.pool.IdHashKnown(tx, hash) + if err != nil { + return err + } + if known { + return ErrRejected + } + return nil + }) return nil - }) + }); err != nil { + return err + } switch req.Id { case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_66: - if _, err := ParsePooledTransactions65(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { + if err := f.threadSafeParsePooledTxn(func(parseContext *TxParseContext) error { + if _, err := ParsePooledTransactions65(req.Data, 0, parseContext, &txs); err != nil { + return err + } + return nil + }); err != nil { return err } case sentry.MessageId_POOLED_TRANSACTIONS_66: - if _, _, err := ParsePooledTransactions66(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { + if err := f.threadSafeParsePooledTxn(func(parseContext *TxParseContext) error { + if _, _, err := ParsePooledTransactions66(req.Data, 0, parseContext, &txs); err != nil { + return err + } + return nil + }); err != nil { return err } default: @@ -457,7 +478,10 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien minedTxs.Resize(uint(len(change.Txs))) for i := range change.Txs { minedTxs.txs[i] = &TxSlot{} - if _, err := f.stateChangesParseCtx.ParseTransaction(change.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i)); err != nil { + if err = f.threadSafeParseStateChangeTxn(func(parseContext *TxParseContext) error { + _, err := parseContext.ParseTransaction(change.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i)) + return err + }); err != nil { log.Warn("stream.Recv", "err", err) continue } @@ -467,7 +491,10 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien unwindTxs.Resize(uint(len(change.Txs))) for i := range change.Txs { unwindTxs.txs[i] = &TxSlot{} - if _, err := f.stateChangesParseCtx.ParseTransaction(change.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i)); err != nil { + if err = f.threadSafeParseStateChangeTxn(func(parseContext *TxParseContext) error { + _, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i)) + return err + }); err != nil { log.Warn("stream.Recv", "err", err) continue }