This commit is contained in:
alex.sharov 2021-08-21 15:24:03 +07:00
parent 256dfd818a
commit 8170635b5c
2 changed files with 35 additions and 29 deletions

View File

@ -40,13 +40,15 @@ import (
// genesis hash and list of forks, but with zero max block and total difficulty
// Sentry should have a logic not to overwrite statusData with messages from tx pool
type Fetch struct {
ctx context.Context // Context used for cancellation and closing of the fetcher
sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network
pool Pool // Transaction pool implementation
senders *SendersCache
coreDB kv.RoDB
wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests)
stateChangesClient remote.KVClient
ctx context.Context // Context used for cancellation and closing of the fetcher
sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network
pool Pool // Transaction pool implementation
senders *SendersCache
coreDB kv.RoDB
wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests)
stateChangesClient remote.KVClient
stateChangesParseCtx *TxParseContext
pooledTxsParseCtx *TxParseContext
}
type Timings struct {
@ -63,13 +65,17 @@ var DefaultTimings = Timings{
// SentryClient here is an interface, it is suitable for mocking in tests (mock will need
// to implement all the functions of the SentryClient interface).
func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, pool Pool, senders *SendersCache, stateChangesClient remote.KVClient, db kv.RoDB) *Fetch {
pooledTxsParseCtx := NewTxParseContext()
pooledTxsParseCtx.Reject(func(hash []byte) bool { return pool.IdHashKnown(hash) })
return &Fetch{
ctx: ctx,
sentryClients: sentryClients,
pool: pool,
senders: senders,
coreDB: db,
stateChangesClient: stateChangesClient,
ctx: ctx,
sentryClients: sentryClients,
pool: pool,
senders: senders,
coreDB: db,
stateChangesClient: stateChangesClient,
stateChangesParseCtx: NewTxParseContext(),
pooledTxsParseCtx: pooledTxsParseCtx,
}
}
@ -281,19 +287,13 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
if !f.pool.Started() {
return nil
}
parseCtx := NewTxParseContext()
parseCtx.Reject(func(hash []byte) bool {
//fmt.Printf("check: %t\n", f.pool.IdHashKnown(hash))
return f.pool.IdHashKnown(hash)
})
txs := TxSlots{}
if req.Id == sentry.MessageId_GET_POOLED_TRANSACTIONS_66 {
if _, err := ParsePooledTransactions65(req.Data, 0, parseCtx, &txs); err != nil {
if _, err := ParsePooledTransactions65(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil {
return err
}
} else {
if _, _, err := ParsePooledTransactions66(req.Data, 0, parseCtx, &txs); err != nil {
if _, _, err := ParsePooledTransactions66(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil {
return err
}
}
@ -403,13 +403,12 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
return nil
}
parseCtx := NewTxParseContext()
var unwindTxs, minedTxs TxSlots
if req.Direction == remote.Direction_FORWARD {
minedTxs.Growth(len(req.Txs))
for i := range req.Txs {
minedTxs.txs[i] = &TxSlot{}
if _, err := parseCtx.ParseTransaction(req.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i)); err != nil {
if _, err := f.stateChangesParseCtx.ParseTransaction(req.Txs[i], 0, minedTxs.txs[i], minedTxs.senders.At(i)); err != nil {
log.Warn("stream.Recv", "err", err)
continue
}
@ -419,7 +418,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
unwindTxs.Growth(len(req.Txs))
for i := range req.Txs {
unwindTxs.txs[i] = &TxSlot{}
if _, err := parseCtx.ParseTransaction(req.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i)); err != nil {
if _, err := f.stateChangesParseCtx.ParseTransaction(req.Txs[i], 0, unwindTxs.txs[i], unwindTxs.senders.At(i)); err != nil {
log.Warn("stream.Recv", "err", err)
continue
}

View File

@ -291,6 +291,10 @@ func loadSenders(coreDB kv.Tx, toLoad map[uint64]string) (map[uint64]*senderInfo
if err != nil {
return nil, err
}
if len(encoded) == 0 {
diff[id] = newSenderInfo(0, *uint256.NewInt(0))
continue
}
nonce, balance, err := DecodeSender(encoded)
if err != nil {
return nil, err
@ -586,7 +590,9 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr
localsHistory.Add(i.Tx.idHash, struct{}{})
}
})
//log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs))
if j > 0 {
log.Info("remove mined", "removed", j, "minedTxsLen", len(minedTxs))
}
// This can be thought of a reverse operation from the one described before.
// When a block that was deemed "the best" of its height, is no longer deemed "the best", the
@ -654,13 +660,13 @@ func onNewBlock(senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, pr
func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, queued *SubPool, discard func(tx *metaTx)) {
for _, tx := range minedTxs {
sender := senders.get(tx.senderID)
if sender.txNonce2Tx.Len() > 0 {
log.Debug("[txpool] removing mined", "senderID", tx.senderID, "sender.txNonce2Tx.len()", sender.txNonce2Tx.Len())
}
//if sender.txNonce2Tx.Len() > 0 {
//log.Debug("[txpool] removing mined", "senderID", tx.senderID, "sender.txNonce2Tx.len()", sender.txNonce2Tx.Len())
//}
// delete mined transactions from everywhere
sender.txNonce2Tx.Ascend(func(i btree.Item) bool {
it := i.(*nonce2TxItem)
log.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", it.metaTx.Tx.nonce, "sender.nonce", sender.nonce)
//log.Debug("[txpool] removing mined, cmp nonces", "tx.nonce", it.metaTx.Tx.nonce, "sender.nonce", sender.nonce)
if it.metaTx.Tx.nonce > sender.nonce {
return false
}
@ -679,6 +685,7 @@ func removeMined(senders *SendersCache, minedTxs []*TxSlot, pending, baseFee, qu
queued.UnsafeRemove(it.metaTx)
discard(it.metaTx)
default:
fmt.Printf("aaaaaaa\n")
//already removed
}
return true