ledgerwatch 9ea6398524
Fixes to subscribe logs (#3769)
* Fixes to subscribe logs

* Add criteria to logs subscription

* Skeleton of RPC daemon event log distribution

* Simplify

* Send aggregated filter to Erigon

* Change API

* Print

* Fixes

* Fix topics filtering

* Fill txHash and blockHash

* Timing logs, fill tx index

* Print

* More print

* Print

* Asynchronous sending of log events to RPC daemon

* Remove prints

* Only extract logs if there are subscribers

* Check empty when RPC daemon is removed

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
2022-03-26 18:21:31 +00:00

169 lines
4.1 KiB
Go

package privateapi
import (
"sync"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon/core/types"
)
type RpcEventType uint64
type NewSnapshotSubscription func() error
type HeaderSubscription func(headerRLP []byte) error
type PendingLogsSubscription func(types.Logs) error
type PendingBlockSubscription func(*types.Block) error
type PendingTxsSubscription func([]types.Transaction) error
type LogsSubscription func([]*remote.SubscribeLogsReply) error
// Events manages event subscriptions and dissimination. Thread-safe
type Events struct {
id int
headerSubscriptions map[int]chan [][]byte
newSnapshotSubscription map[int]chan struct{}
pendingLogsSubscriptions map[int]PendingLogsSubscription
pendingBlockSubscriptions map[int]PendingBlockSubscription
pendingTxsSubscriptions map[int]PendingTxsSubscription
logsSubscriptions map[int]chan []*remote.SubscribeLogsReply
hasLogSubscriptions bool
lock sync.RWMutex
}
func NewEvents() *Events {
return &Events{
headerSubscriptions: map[int]chan [][]byte{},
pendingLogsSubscriptions: map[int]PendingLogsSubscription{},
pendingBlockSubscriptions: map[int]PendingBlockSubscription{},
pendingTxsSubscriptions: map[int]PendingTxsSubscription{},
logsSubscriptions: map[int]chan []*remote.SubscribeLogsReply{},
newSnapshotSubscription: map[int]chan struct{}{},
}
}
func (e *Events) AddHeaderSubscription() (chan [][]byte, func()) {
e.lock.Lock()
defer e.lock.Unlock()
ch := make(chan [][]byte, 8)
e.id++
id := e.id
e.headerSubscriptions[id] = ch
return ch, func() {
delete(e.headerSubscriptions, id)
close(ch)
}
}
func (e *Events) AddNewSnapshotSubscription() (chan struct{}, func()) {
e.lock.Lock()
defer e.lock.Unlock()
ch := make(chan struct{}, 8)
e.id++
id := e.id
e.newSnapshotSubscription[id] = ch
return ch, func() {
delete(e.newSnapshotSubscription, id)
close(ch)
}
}
func (e *Events) AddLogsSubscription() (chan []*remote.SubscribeLogsReply, func()) {
e.lock.Lock()
defer e.lock.Unlock()
ch := make(chan []*remote.SubscribeLogsReply, 8)
e.id++
id := e.id
e.logsSubscriptions[id] = ch
return ch, func() {
delete(e.logsSubscriptions, id)
close(ch)
}
}
func (e *Events) EmptyLogSubsctiption(empty bool) {
e.lock.Lock()
defer e.lock.Unlock()
e.hasLogSubscriptions = !empty
}
func (e *Events) HasLogSubsriptions() bool {
e.lock.RLock()
defer e.lock.RUnlock()
return e.hasLogSubscriptions
}
func (e *Events) AddPendingLogsSubscription(s PendingLogsSubscription) {
e.lock.Lock()
defer e.lock.Unlock()
e.pendingLogsSubscriptions[len(e.pendingLogsSubscriptions)] = s
}
func (e *Events) AddPendingBlockSubscription(s PendingBlockSubscription) {
e.lock.Lock()
defer e.lock.Unlock()
e.pendingBlockSubscriptions[len(e.pendingBlockSubscriptions)] = s
}
func (e *Events) OnNewSnapshot() {
e.lock.Lock()
defer e.lock.Unlock()
for _, ch := range e.newSnapshotSubscription {
select {
case ch <- struct{}{}:
default: //if channel is full (slow consumer), drop old messages
for i := 0; i < cap(ch)/2; i++ {
select {
case <-ch:
default:
}
}
ch <- struct{}{}
}
}
}
func (e *Events) OnNewHeader(newHeadersRlp [][]byte) {
e.lock.Lock()
defer e.lock.Unlock()
for _, ch := range e.headerSubscriptions {
select {
case ch <- newHeadersRlp:
default: //if channel is full (slow consumer), drop old messages
for i := 0; i < cap(ch)/2; i++ {
select {
case <-ch:
default:
}
}
ch <- newHeadersRlp
}
}
}
func (e *Events) OnNewPendingLogs(logs types.Logs) {
e.lock.Lock()
defer e.lock.Unlock()
for i, sub := range e.pendingLogsSubscriptions {
if err := sub(logs); err != nil {
delete(e.pendingLogsSubscriptions, i)
}
}
}
func (e *Events) OnLogs(logs []*remote.SubscribeLogsReply) {
e.lock.Lock()
defer e.lock.Unlock()
for _, ch := range e.logsSubscriptions {
select {
case ch <- logs:
default: //if channel is full (slow consumer), drop old messages
for i := 0; i < cap(ch)/2; i++ {
select {
case <-ch:
default:
}
}
ch <- logs
}
}
}