feat/rpcadaemon_logs_sub (#3751)

This commit is contained in:
primal_concrete_sledge 2022-03-23 10:35:04 +03:00 committed by GitHub
parent 74a7d7c75a
commit fed19d5591
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 126 additions and 0 deletions

View File

@ -110,3 +110,38 @@ func (api *APIImpl) NewPendingTransactions(ctx context.Context) (*rpc.Subscripti
return rpcSub, nil
}
// SubscribeLogs send a notification each time a new log appears.
func (api *APIImpl) SubscribeLogs(ctx context.Context) (*rpc.Subscription, error) {
if api.filters == nil {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
defer debug.LogPanic()
logs := make(chan *types.Log, 1)
defer close(logs)
id := api.filters.SubscribeLogs(logs)
defer api.filters.UnsubscribeLogs(id)
for {
select {
case h := <-logs:
err := notifier.Notify(rpcSub.ID, h)
if err != nil {
log.Warn("error while notifying subscription", "err", err)
}
case <-rpcSub.Err():
return
}
}
}()
return rpcSub, nil
}

View File

@ -11,6 +11,9 @@ import (
"sync"
"time"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
@ -28,6 +31,7 @@ type (
PendingLogsSubID SubscriptionID
PendingBlockSubID SubscriptionID
PendingTxsSubID SubscriptionID
LogsSubID SubscriptionID
)
type Filters struct {
@ -39,6 +43,7 @@ type Filters struct {
pendingLogsSubs map[PendingLogsSubID]chan types.Logs
pendingBlockSubs map[PendingBlockSubID]chan *types.Block
pendingTxsSubs map[PendingTxsSubID]chan []types.Transaction
logsSubs map[LogsSubID]chan *types.Log
onNewSnapshot func()
}
@ -50,6 +55,7 @@ func New(ctx context.Context, ethBackend services.ApiBackend, txPool txpool.Txpo
pendingTxsSubs: make(map[PendingTxsSubID]chan []types.Transaction),
pendingLogsSubs: make(map[PendingLogsSubID]chan types.Logs),
pendingBlockSubs: make(map[PendingBlockSubID]chan *types.Block),
logsSubs: make(map[LogsSubID]chan *types.Log),
onNewSnapshot: onNewSnapshot,
}
@ -79,6 +85,31 @@ func New(ctx context.Context, ethBackend services.ApiBackend, txPool txpool.Txpo
}
}()
go func() {
if ethBackend == nil {
return
}
for {
select {
case <-ctx.Done():
return
default:
}
if err := ethBackend.SubscribeLogs(ctx, ff.OnNewLogs); err != nil {
select {
case <-ctx.Done():
return
default:
}
if grpcutil.IsEndOfStream(err) || grpcutil.IsRetryLater(err) {
time.Sleep(3 * time.Second)
continue
}
log.Warn("rpc filters: error subscribing to logs", "err", err)
}
}
}()
if txPool != nil {
go func() {
for {
@ -318,6 +349,20 @@ func (ff *Filters) UnsubscribePendingTxs(id PendingTxsSubID) {
delete(ff.pendingTxsSubs, id)
}
func (ff *Filters) SubscribeLogs(out chan *types.Log) LogsSubID {
ff.mu.Lock()
defer ff.mu.Unlock()
id := LogsSubID(generateSubscriptionID())
ff.logsSubs[id] = out
return id
}
func (ff *Filters) UnsubscribeLogs(id LogsSubID) {
ff.mu.Lock()
defer ff.mu.Unlock()
delete(ff.logsSubs, id)
}
func (ff *Filters) OnNewEvent(event *remote.SubscribeReply) {
ff.mu.RLock()
defer ff.mu.RUnlock()
@ -394,6 +439,29 @@ func (ff *Filters) OnNewTx(reply *txpool.OnAddReply) {
}
}
func (ff *Filters) OnNewLogs(reply *remote.SubscribeLogsReply) {
lg := &types.Log{
Address: gointerfaces.ConvertH160toAddress(reply.Address),
Data: reply.Data,
BlockNumber: reply.BlockNumber,
TxHash: gointerfaces.ConvertH256ToHash(reply.TransactionHash),
TxIndex: uint(reply.TransactionIndex),
BlockHash: gointerfaces.ConvertH256ToHash(reply.BlockHash),
Index: uint(reply.LogIndex),
Removed: reply.Removed,
}
t := make([]common.Hash, 0)
for _, v := range reply.Topics {
t = append(t, gointerfaces.ConvertH256ToHash(v))
}
lg.Topics = t
ff.mu.RLock()
defer ff.mu.RUnlock()
for _, v := range ff.logsSubs {
v <- lg
}
}
func generateSubscriptionID() SubscriptionID {
var id [32]byte

View File

@ -32,6 +32,7 @@ type ApiBackend interface {
ProtocolVersion(ctx context.Context) (uint64, error)
ClientVersion(ctx context.Context) (string, error)
Subscribe(ctx context.Context, cb func(*remote.SubscribeReply)) error
SubscribeLogs(ctx context.Context, cb func(*remote.SubscribeLogsReply)) error
BlockWithSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error)
EngineNewPayloadV1(ctx context.Context, payload *types2.ExecutionPayload) (*remote.EnginePayloadStatus, error)
EngineForkchoiceUpdatedV1(ctx context.Context, request *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error)
@ -157,6 +158,28 @@ func (back *RemoteBackend) Subscribe(ctx context.Context, onNewEvent func(*remot
return nil
}
func (back *RemoteBackend) SubscribeLogs(ctx context.Context, onNewLogs func(reply *remote.SubscribeLogsReply)) error {
subscription, err := back.remoteEthBackend.SubscribeLogs(ctx, grpc.WaitForReady(true))
if err != nil {
if s, ok := status.FromError(err); ok {
return errors.New(s.Message())
}
return err
}
for {
logs, err := subscription.Recv()
if errors.Is(err, io.EOF) {
log.Info("rpcdaemon: the logs subscription channel was closed")
break
}
if err != nil {
return err
}
onNewLogs(logs)
}
return nil
}
func (back *RemoteBackend) TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) {
return back.blockReader.TxnLookup(ctx, tx, txnHash)
}