mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-05 02:24:29 +00:00
94f4ea805d
the root cause is that when `inMemoryExecution` lambda gets created in the `eth/backend.go`, it captures the reference of `backend.notifications`, and so the execution of side-forks actually adds notifications to there, and it all gets sent out to tx pool (and RPC daemon) at the end of the stage loop (regardless of whether there was forkchoice update or not) so we can create a separate notification, but then somehow flush it to the "main" nofitications when the in-memory exec state is flushed Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
146 lines
3.7 KiB
Go
146 lines
3.7 KiB
Go
package shards
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/ledgerwatch/erigon-lib/common"
|
|
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
|
|
"github.com/ledgerwatch/erigon/core/types"
|
|
)
|
|
|
|
type RpcEventType uint64
|
|
|
|
type NewSnapshotSubscription func() error
|
|
type HeaderSubscription func(headerRLP []byte) error
|
|
type PendingLogsSubscription func(types.Logs) error
|
|
type PendingBlockSubscription func(*types.Block) error
|
|
type PendingTxsSubscription func([]types.Transaction) error
|
|
type LogsSubscription func([]*remote.SubscribeLogsReply) error
|
|
|
|
// Events manages event subscriptions and dissimination. Thread-safe
|
|
type Events struct {
|
|
id int
|
|
headerSubscriptions map[int]chan [][]byte
|
|
newSnapshotSubscription map[int]chan struct{}
|
|
pendingLogsSubscriptions map[int]PendingLogsSubscription
|
|
pendingBlockSubscriptions map[int]PendingBlockSubscription
|
|
pendingTxsSubscriptions map[int]PendingTxsSubscription
|
|
logsSubscriptions map[int]chan []*remote.SubscribeLogsReply
|
|
hasLogSubscriptions bool
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
func NewEvents() *Events {
|
|
return &Events{
|
|
headerSubscriptions: map[int]chan [][]byte{},
|
|
pendingLogsSubscriptions: map[int]PendingLogsSubscription{},
|
|
pendingBlockSubscriptions: map[int]PendingBlockSubscription{},
|
|
pendingTxsSubscriptions: map[int]PendingTxsSubscription{},
|
|
logsSubscriptions: map[int]chan []*remote.SubscribeLogsReply{},
|
|
newSnapshotSubscription: map[int]chan struct{}{},
|
|
}
|
|
}
|
|
|
|
func (e *Events) AddHeaderSubscription() (chan [][]byte, func()) {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
ch := make(chan [][]byte, 8)
|
|
e.id++
|
|
id := e.id
|
|
e.headerSubscriptions[id] = ch
|
|
return ch, func() {
|
|
delete(e.headerSubscriptions, id)
|
|
close(ch)
|
|
}
|
|
}
|
|
|
|
func (e *Events) AddNewSnapshotSubscription() (chan struct{}, func()) {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
ch := make(chan struct{}, 8)
|
|
e.id++
|
|
id := e.id
|
|
e.newSnapshotSubscription[id] = ch
|
|
return ch, func() {
|
|
delete(e.newSnapshotSubscription, id)
|
|
close(ch)
|
|
}
|
|
}
|
|
|
|
func (e *Events) AddLogsSubscription() (chan []*remote.SubscribeLogsReply, func()) {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
ch := make(chan []*remote.SubscribeLogsReply, 8)
|
|
e.id++
|
|
id := e.id
|
|
e.logsSubscriptions[id] = ch
|
|
return ch, func() {
|
|
delete(e.logsSubscriptions, id)
|
|
close(ch)
|
|
}
|
|
}
|
|
|
|
func (e *Events) EmptyLogSubsctiption(empty bool) {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
e.hasLogSubscriptions = !empty
|
|
}
|
|
|
|
func (e *Events) HasLogSubsriptions() bool {
|
|
e.lock.RLock()
|
|
defer e.lock.RUnlock()
|
|
return e.hasLogSubscriptions
|
|
}
|
|
|
|
func (e *Events) AddPendingLogsSubscription(s PendingLogsSubscription) {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
e.pendingLogsSubscriptions[len(e.pendingLogsSubscriptions)] = s
|
|
}
|
|
|
|
func (e *Events) AddPendingBlockSubscription(s PendingBlockSubscription) {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
e.pendingBlockSubscriptions[len(e.pendingBlockSubscriptions)] = s
|
|
}
|
|
|
|
func (e *Events) OnNewSnapshot() {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
for _, ch := range e.newSnapshotSubscription {
|
|
common.PrioritizedSend(ch, struct{}{})
|
|
}
|
|
}
|
|
|
|
func (e *Events) OnNewHeader(newHeadersRlp [][]byte) {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
for _, ch := range e.headerSubscriptions {
|
|
common.PrioritizedSend(ch, newHeadersRlp)
|
|
}
|
|
}
|
|
|
|
func (e *Events) OnNewPendingLogs(logs types.Logs) {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
for i, sub := range e.pendingLogsSubscriptions {
|
|
if err := sub(logs); err != nil {
|
|
delete(e.pendingLogsSubscriptions, i)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Events) OnLogs(logs []*remote.SubscribeLogsReply) {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
for _, ch := range e.logsSubscriptions {
|
|
common.PrioritizedSend(ch, logs)
|
|
}
|
|
}
|
|
|
|
type Notifications struct {
|
|
Events *Events
|
|
Accumulator *Accumulator
|
|
StateChangesConsumer StateChangeConsumer
|
|
}
|