From fed19d55914038fdda740a789123eacbf845480f Mon Sep 17 00:00:00 2001 From: primal_concrete_sledge Date: Wed, 23 Mar 2022 10:35:04 +0300 Subject: [PATCH] feat/rpcadaemon_logs_sub (#3751) --- cmd/rpcdaemon/commands/eth_filters.go | 35 ++++++++++++++ cmd/rpcdaemon/filters/filters.go | 68 +++++++++++++++++++++++++++ cmd/rpcdaemon/services/eth_backend.go | 23 +++++++++ 3 files changed, 126 insertions(+) diff --git a/cmd/rpcdaemon/commands/eth_filters.go b/cmd/rpcdaemon/commands/eth_filters.go index ef2a8399e..125213c1b 100644 --- a/cmd/rpcdaemon/commands/eth_filters.go +++ b/cmd/rpcdaemon/commands/eth_filters.go @@ -110,3 +110,38 @@ func (api *APIImpl) NewPendingTransactions(ctx context.Context) (*rpc.Subscripti return rpcSub, nil } + +// SubscribeLogs send a notification each time a new log appears. +func (api *APIImpl) SubscribeLogs(ctx context.Context) (*rpc.Subscription, error) { + if api.filters == nil { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + defer debug.LogPanic() + logs := make(chan *types.Log, 1) + defer close(logs) + id := api.filters.SubscribeLogs(logs) + defer api.filters.UnsubscribeLogs(id) + + for { + select { + case h := <-logs: + err := notifier.Notify(rpcSub.ID, h) + if err != nil { + log.Warn("error while notifying subscription", "err", err) + } + case <-rpcSub.Err(): + return + } + } + }() + + return rpcSub, nil +} diff --git a/cmd/rpcdaemon/filters/filters.go b/cmd/rpcdaemon/filters/filters.go index b1e1baf2b..e1208e3a4 100644 --- a/cmd/rpcdaemon/filters/filters.go +++ b/cmd/rpcdaemon/filters/filters.go @@ -11,6 +11,9 @@ import ( "sync" "time" + "github.com/ledgerwatch/erigon-lib/gointerfaces" + "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool" @@ -28,6 +31,7 @@ type ( PendingLogsSubID SubscriptionID PendingBlockSubID SubscriptionID PendingTxsSubID SubscriptionID + LogsSubID SubscriptionID ) type Filters struct { @@ -39,6 +43,7 @@ type Filters struct { pendingLogsSubs map[PendingLogsSubID]chan types.Logs pendingBlockSubs map[PendingBlockSubID]chan *types.Block pendingTxsSubs map[PendingTxsSubID]chan []types.Transaction + logsSubs map[LogsSubID]chan *types.Log onNewSnapshot func() } @@ -50,6 +55,7 @@ func New(ctx context.Context, ethBackend services.ApiBackend, txPool txpool.Txpo pendingTxsSubs: make(map[PendingTxsSubID]chan []types.Transaction), pendingLogsSubs: make(map[PendingLogsSubID]chan types.Logs), pendingBlockSubs: make(map[PendingBlockSubID]chan *types.Block), + logsSubs: make(map[LogsSubID]chan *types.Log), onNewSnapshot: onNewSnapshot, } @@ -79,6 +85,31 @@ func New(ctx context.Context, ethBackend services.ApiBackend, txPool txpool.Txpo } }() + go func() { + if ethBackend == nil { + return + } + for { + select { + case <-ctx.Done(): + return + default: + } + if err := ethBackend.SubscribeLogs(ctx, ff.OnNewLogs); err != nil { + select { + case <-ctx.Done(): + return + default: + } + if grpcutil.IsEndOfStream(err) || grpcutil.IsRetryLater(err) { + time.Sleep(3 * time.Second) + continue + } + log.Warn("rpc filters: error subscribing to logs", "err", err) + } + } + }() + if txPool != nil { go func() { for { @@ -318,6 +349,20 @@ func (ff *Filters) UnsubscribePendingTxs(id PendingTxsSubID) { delete(ff.pendingTxsSubs, id) } +func (ff *Filters) SubscribeLogs(out chan *types.Log) LogsSubID { + ff.mu.Lock() + defer ff.mu.Unlock() + id := LogsSubID(generateSubscriptionID()) + ff.logsSubs[id] = out + return id +} + +func (ff *Filters) UnsubscribeLogs(id LogsSubID) { + ff.mu.Lock() + defer ff.mu.Unlock() + delete(ff.logsSubs, id) +} + func (ff *Filters) OnNewEvent(event *remote.SubscribeReply) { ff.mu.RLock() defer ff.mu.RUnlock() @@ -394,6 +439,29 @@ func (ff *Filters) OnNewTx(reply *txpool.OnAddReply) { } } +func (ff *Filters) OnNewLogs(reply *remote.SubscribeLogsReply) { + lg := &types.Log{ + Address: gointerfaces.ConvertH160toAddress(reply.Address), + Data: reply.Data, + BlockNumber: reply.BlockNumber, + TxHash: gointerfaces.ConvertH256ToHash(reply.TransactionHash), + TxIndex: uint(reply.TransactionIndex), + BlockHash: gointerfaces.ConvertH256ToHash(reply.BlockHash), + Index: uint(reply.LogIndex), + Removed: reply.Removed, + } + t := make([]common.Hash, 0) + for _, v := range reply.Topics { + t = append(t, gointerfaces.ConvertH256ToHash(v)) + } + lg.Topics = t + ff.mu.RLock() + defer ff.mu.RUnlock() + for _, v := range ff.logsSubs { + v <- lg + } +} + func generateSubscriptionID() SubscriptionID { var id [32]byte diff --git a/cmd/rpcdaemon/services/eth_backend.go b/cmd/rpcdaemon/services/eth_backend.go index 9d52e7bd8..bb99942bc 100644 --- a/cmd/rpcdaemon/services/eth_backend.go +++ b/cmd/rpcdaemon/services/eth_backend.go @@ -32,6 +32,7 @@ type ApiBackend interface { ProtocolVersion(ctx context.Context) (uint64, error) ClientVersion(ctx context.Context) (string, error) Subscribe(ctx context.Context, cb func(*remote.SubscribeReply)) error + SubscribeLogs(ctx context.Context, cb func(*remote.SubscribeLogsReply)) error BlockWithSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error) EngineNewPayloadV1(ctx context.Context, payload *types2.ExecutionPayload) (*remote.EnginePayloadStatus, error) EngineForkchoiceUpdatedV1(ctx context.Context, request *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error) @@ -157,6 +158,28 @@ func (back *RemoteBackend) Subscribe(ctx context.Context, onNewEvent func(*remot return nil } +func (back *RemoteBackend) SubscribeLogs(ctx context.Context, onNewLogs func(reply *remote.SubscribeLogsReply)) error { + subscription, err := back.remoteEthBackend.SubscribeLogs(ctx, grpc.WaitForReady(true)) + if err != nil { + if s, ok := status.FromError(err); ok { + return errors.New(s.Message()) + } + return err + } + for { + logs, err := subscription.Recv() + if errors.Is(err, io.EOF) { + log.Info("rpcdaemon: the logs subscription channel was closed") + break + } + if err != nil { + return err + } + onNewLogs(logs) + } + return nil +} + func (back *RemoteBackend) TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) { return back.blockReader.TxnLookup(ctx, tx, txnHash) }