mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-01 00:31:21 +00:00
save
This commit is contained in:
parent
4c765516ba
commit
256dfd818a
@ -193,6 +193,9 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
|
||||
func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMessage, sentryClient sentry.SentryClient) error {
|
||||
switch req.Id {
|
||||
case sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65:
|
||||
if !f.pool.Started() {
|
||||
return nil
|
||||
}
|
||||
hashCount, pos, err := ParseHashesCount(req.Data, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err)
|
||||
@ -228,6 +231,9 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
|
||||
}
|
||||
}
|
||||
case sentry.MessageId_GET_POOLED_TRANSACTIONS_66, sentry.MessageId_GET_POOLED_TRANSACTIONS_65:
|
||||
if !f.pool.Started() {
|
||||
return nil
|
||||
}
|
||||
//TODO: handleInboundMessage is single-threaded - means it can accept as argument couple buffers (or analog of txParseContext). Protobuf encoding will copy data anyway, but DirectClient doesn't
|
||||
var encodedRequest []byte
|
||||
messageId := sentry.MessageId_POOLED_TRANSACTIONS_66
|
||||
|
@ -188,7 +188,6 @@ func ParsePooledTransactions65(payload []byte, pos int, ctx *TxParseContext, txS
|
||||
pos, err = ctx.ParseTransaction(payload, pos, txSlots.txs[i], txSlots.senders.At(i))
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrRejected) {
|
||||
fmt.Printf("rejected\n")
|
||||
continue
|
||||
}
|
||||
return 0, err
|
||||
@ -217,7 +216,6 @@ func ParsePooledTransactions66(payload []byte, pos int, ctx *TxParseContext, txS
|
||||
pos, err = ctx.ParseTransaction(payload, pos, txSlots.txs[i], txSlots.senders.At(i))
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrRejected) {
|
||||
fmt.Printf("rejected\n")
|
||||
continue
|
||||
}
|
||||
return requestID, 0, err
|
||||
|
@ -146,6 +146,11 @@ func (sc *SendersCache) forEach(f func(info *senderInfo)) {
|
||||
f(sc.senderInfo[i])
|
||||
}
|
||||
}
|
||||
func (sc *SendersCache) len() int {
|
||||
sc.lock.RLock()
|
||||
defer sc.lock.RUnlock()
|
||||
return len(sc.senderInfo)
|
||||
}
|
||||
|
||||
func (sc *SendersCache) evict() int {
|
||||
sc.lock.Lock()
|
||||
@ -502,7 +507,6 @@ func (p *TxPool) setBaseFee(protocolBaseFee, pendingBaseFee uint64) (uint64, uin
|
||||
if hasNewVal {
|
||||
p.pendingBaseFee.Store(pendingBaseFee)
|
||||
}
|
||||
log.Debug("set base fee", "protocol", protocolBaseFee, "pending", pendingBaseFee)
|
||||
return protocolBaseFee, p.pendingBaseFee.Load()
|
||||
}
|
||||
|
||||
@ -510,7 +514,7 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un
|
||||
if err := senders.onNewBlock(coreDB, stateChanges, unwindTxs, minedTxs, blockHeight); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("[txpool.onNewBlock]", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight)
|
||||
log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "protocolBaseFee", protocolBaseFee, "blockHeight", blockHeight)
|
||||
protocolBaseFee, pendingBaseFee = p.setBaseFee(protocolBaseFee, pendingBaseFee)
|
||||
if err := unwindTxs.Valid(); err != nil {
|
||||
return err
|
||||
@ -521,7 +525,6 @@ func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, un
|
||||
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
//log.Debug("[txpool.onNewBlock]", "senderInfo", len(p.senderInfo))
|
||||
if err := onNewBlock(senders, unwindTxs, minedTxs.txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1019,7 +1022,9 @@ func BroadcastLoop(ctx context.Context, db kv.RwDB, p *TxPool, senders *SendersC
|
||||
case <-evictSendersEvery.C:
|
||||
// evict sendersInfo without txs
|
||||
count := senders.evict()
|
||||
log.Debug("evicted senders", "amount", count)
|
||||
if count > 0 {
|
||||
log.Debug("evicted senders", "amount", count)
|
||||
}
|
||||
if db != nil {
|
||||
if err := db.Update(ctx, func(tx kv.RwTx) error {
|
||||
return p.flushIsLocalHistory(tx)
|
||||
|
Loading…
Reference in New Issue
Block a user