This commit is contained in:
alex.sharov 2021-08-11 11:32:21 +07:00
parent 7a48420c8f
commit 5fb24053dd
2 changed files with 11 additions and 11 deletions

View File

@ -44,7 +44,7 @@ type Fetch struct {
sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network
statusData *sentry.StatusData // Status data used for "handshaking" with sentries
pool Pool // Transaction pool implementation
db kv.RoDB
coreDB kv.RoDB
wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests)
stateChangesClient remote.KVClient
}
@ -80,7 +80,7 @@ func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, genesisH
sentryClients: sentryClients,
statusData: statusData,
pool: pool,
db: db,
coreDB: db,
stateChangesClient: stateChangesClient,
}
}
@ -282,7 +282,7 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
return err
}
}
if err := f.db.View(ctx, func(tx kv.Tx) error {
if err := f.coreDB.View(ctx, func(tx kv.Tx) error {
return f.pool.Add(tx, txs)
}); err != nil {
return err
@ -443,7 +443,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient)
diff[string(addr[:])] = senderInfo{nonce: nonce, balance: balance}
}
if err := f.db.View(ctx, func(tx kv.Tx) error {
if err := f.coreDB.View(ctx, func(tx kv.Tx) error {
return f.pool.OnNewBlock(tx, diff, unwindTxs, minedTxs, req.ProtocolBaseFee, req.BlockBaseFee, req.BlockHeight)
}); err != nil {
log.Warn("onNewBlock", "err", err)

View File

@ -206,7 +206,7 @@ func (p *TxPool) IdHashIsLocal(hash []byte) bool {
}
func (p *TxPool) OnNewPeer(peerID PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) }
func (p *TxPool) Add(tx kv.Tx, newTxs TxSlots) error {
func (p *TxPool) Add(coreDB kv.Tx, newTxs TxSlots) error {
p.lock.Lock()
defer p.lock.Unlock()
if err := newTxs.Valid(); err != nil {
@ -218,7 +218,7 @@ func (p *TxPool) Add(tx kv.Tx, newTxs TxSlots) error {
return fmt.Errorf("non-zero base fee")
}
if err := setTxSenderID(tx, &p.senderID, p.senderIDs, p.senderInfo, newTxs); err != nil {
if err := setTxSenderID(coreDB, &p.senderID, p.senderIDs, p.senderInfo, newTxs); err != nil {
return err
}
if err := onNewTxs(p.senderInfo, newTxs, protocolBaseFee, blockBaseFee, p.pending, p.baseFee, p.queued, p.byHash, p.localsHistory); err != nil {
@ -292,7 +292,7 @@ func onNewTxs(senderInfo map[uint64]*senderInfo, newTxs TxSlots, protocolBaseFee
return nil
}
func (p *TxPool) OnNewBlock(tx kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, blockBaseFee, blockHeight uint64) error {
func (p *TxPool) OnNewBlock(coreDB kv.Tx, stateChanges map[string]senderInfo, unwindTxs, minedTxs TxSlots, protocolBaseFee, blockBaseFee, blockHeight uint64) error {
p.lock.Lock()
defer p.lock.Unlock()
if err := unwindTxs.Valid(); err != nil {
@ -306,10 +306,10 @@ func (p *TxPool) OnNewBlock(tx kv.Tx, stateChanges map[string]senderInfo, unwind
p.protocolBaseFee.Store(protocolBaseFee)
p.blockBaseFee.Store(blockBaseFee)
if err := setTxSenderID(tx, &p.senderID, p.senderIDs, p.senderInfo, unwindTxs); err != nil {
if err := setTxSenderID(coreDB, &p.senderID, p.senderIDs, p.senderInfo, unwindTxs); err != nil {
return err
}
if err := setTxSenderID(tx, &p.senderID, p.senderIDs, p.senderInfo, minedTxs); err != nil {
if err := setTxSenderID(coreDB, &p.senderID, p.senderIDs, p.senderInfo, minedTxs); err != nil {
return err
}
for addr, id := range p.senderIDs { // merge state changes
@ -357,7 +357,7 @@ func (p *TxPool) OnNewBlock(tx kv.Tx, stateChanges map[string]senderInfo, unwind
return nil
}
func setTxSenderID(tx kv.Tx, senderIDSequence *uint64, senderIDs map[string]uint64, sendersInfo map[uint64]*senderInfo, txs TxSlots) error {
func setTxSenderID(coreDB kv.Tx, senderIDSequence *uint64, senderIDs map[string]uint64, sendersInfo map[uint64]*senderInfo, txs TxSlots) error {
for i := range txs.txs {
addr := string(txs.senders.At(i))
@ -372,7 +372,7 @@ func setTxSenderID(tx kv.Tx, senderIDSequence *uint64, senderIDs map[string]uint
// load data from db if need
_, ok = sendersInfo[txs.txs[i].senderID]
if !ok {
encoded, err := tx.GetOne(kv.PlainState, txs.senders.At(i))
encoded, err := coreDB.GetOne(kv.PlainState, txs.senders.At(i))
if err != nil {
return err
}