Pool: support multi sentry (#191)

* save

* save

* save
This commit is contained in:
Alex Sharov 2021-11-25 00:13:17 +07:00 committed by GitHub
parent 6d88f06d4d
commit 34e6ca3c35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -42,15 +42,18 @@ import (
// genesis hash and list of forks, but with zero max block and total difficulty // genesis hash and list of forks, but with zero max block and total difficulty
// Sentry should have a logic not to overwrite statusData with messages from tx pool // Sentry should have a logic not to overwrite statusData with messages from tx pool
type Fetch struct { type Fetch struct {
ctx context.Context // Context used for cancellation and closing of the fetcher ctx context.Context // Context used for cancellation and closing of the fetcher
sentryClients []direct.SentryClient // sentry clients that will be used for accessing the network sentryClients []direct.SentryClient // sentry clients that will be used for accessing the network
pool Pool // Transaction pool implementation pool Pool // Transaction pool implementation
coreDB kv.RoDB coreDB kv.RoDB
db kv.RwDB db kv.RwDB
wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests) wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests)
stateChangesClient StateChangesClient stateChangesClient StateChangesClient
stateChangesParseCtx *TxParseContext
pooledTxsParseCtx *TxParseContext stateChangesParseCtx *TxParseContext
stateChangesParseCtxLock sync.Mutex
pooledTxsParseCtx *TxParseContext
pooledTxsParseCtxLock sync.Mutex
} }
type StateChangesClient interface { type StateChangesClient interface {
@ -81,25 +84,28 @@ func (f *Fetch) SetWaitGroup(wg *sync.WaitGroup) {
f.wg = wg f.wg = wg
} }
func (f *Fetch) threadSafeParsePooledTxn(cb func(*TxParseContext) error) error {
f.pooledTxsParseCtxLock.Lock()
defer f.pooledTxsParseCtxLock.Unlock()
return cb(f.pooledTxsParseCtx)
}
func (f *Fetch) threadSafeParseStateChangeTxn(cb func(*TxParseContext) error) error {
f.stateChangesParseCtxLock.Lock()
defer f.stateChangesParseCtxLock.Unlock()
return cb(f.stateChangesParseCtx)
}
// ConnectSentries initialises connection to the sentry // ConnectSentries initialises connection to the sentry
func (f *Fetch) ConnectSentries() { func (f *Fetch) ConnectSentries() {
//TODO: fix race in parse ctx - 2 sentries causing it for i := range f.sentryClients {
go func(i int) { go func(i int) {
f.receiveMessageLoop(f.sentryClients[i]) f.receiveMessageLoop(f.sentryClients[i])
}(0) }(i)
go func(i int) { go func(i int) {
f.receivePeerLoop(f.sentryClients[i]) f.receivePeerLoop(f.sentryClients[i])
}(0) }(i)
/* }
for i := range f.sentryClients {
go func(i int) {
f.receiveMessageLoop(f.sentryClients[i])
}(i)
go func(i int) {
f.receivePeerLoop(f.sentryClients[i])
}(i)
}
*/
} }
func (f *Fetch) ConnectCore() { func (f *Fetch) ConnectCore() {
go func() { go func() {
@ -321,24 +327,39 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
} }
case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_POOLED_TRANSACTIONS_66, sentry.MessageId_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_66: case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_POOLED_TRANSACTIONS_66, sentry.MessageId_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_66:
txs := TxSlots{} txs := TxSlots{}
f.pooledTxsParseCtx.ValidateHash(func(hash []byte) error { if err := f.threadSafeParsePooledTxn(func(parseContext *TxParseContext) error {
known, err := f.pool.IdHashKnown(tx, hash) parseContext.ValidateHash(func(hash []byte) error {
if err != nil { known, err := f.pool.IdHashKnown(tx, hash)
return err if err != nil {
} return err
if known { }
return ErrRejected if known {
} return ErrRejected
}
return nil
})
return nil return nil
}) }); err != nil {
return err
}
switch req.Id { switch req.Id {
case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_66: case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_65, sentry.MessageId_TRANSACTIONS_66:
if _, err := ParsePooledTransactions65(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { if err := f.threadSafeParsePooledTxn(func(parseContext *TxParseContext) error {
if _, err := ParsePooledTransactions65(req.Data, 0, parseContext, &txs); err != nil {
return err
}
return nil
}); err != nil {
return err return err
} }
case sentry.MessageId_POOLED_TRANSACTIONS_66: case sentry.MessageId_POOLED_TRANSACTIONS_66:
if _, _, err := ParsePooledTransactions66(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { if err := f.threadSafeParsePooledTxn(func(parseContext *TxParseContext) error {
if _, _, err := ParsePooledTransactions66(req.Data, 0, parseContext, &txs); err != nil {
return err
}
return nil
}); err != nil {
return err return err
} }
default: default:
@ -457,7 +478,10 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
minedTxs.Resize(uint(len(change.Txs))) minedTxs.Resize(uint(len(change.Txs)))
for i := range change.Txs { for i := range change.Txs {
minedTxs.txs[i] = &TxSlot{} minedTxs.txs[i] = &TxSlot{}
if _, err := f.stateChangesParseCtx.ParseTransaction(change.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i)); err != nil { if err = f.threadSafeParseStateChangeTxn(func(parseContext *TxParseContext) error {
_, err := parseContext.ParseTransaction(change.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i))
return err
}); err != nil {
log.Warn("stream.Recv", "err", err) log.Warn("stream.Recv", "err", err)
continue continue
} }
@ -467,7 +491,10 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
unwindTxs.Resize(uint(len(change.Txs))) unwindTxs.Resize(uint(len(change.Txs)))
for i := range change.Txs { for i := range change.Txs {
unwindTxs.txs[i] = &TxSlot{} unwindTxs.txs[i] = &TxSlot{}
if _, err := f.stateChangesParseCtx.ParseTransaction(change.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i)); err != nil { if err = f.threadSafeParseStateChangeTxn(func(parseContext *TxParseContext) error {
_, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i))
return err
}); err != nil {
log.Warn("stream.Recv", "err", err) log.Warn("stream.Recv", "err", err)
continue continue
} }