package blocks import ( "context" ethereum "github.com/ledgerwatch/erigon" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/cmd/devnet/devnet" "github.com/ledgerwatch/erigon/cmd/devnet/requests" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/jsonrpc" "github.com/ledgerwatch/log/v3" ) type BlockHandler interface { Handle(ctx context.Context, node devnet.Node, block *requests.Block, transaction *jsonrpc.RPCTransaction) error } type BlockHandlerFunc func(ctx context.Context, node devnet.Node, block *requests.Block, transaction *jsonrpc.RPCTransaction) error func (f BlockHandlerFunc) Handle(ctx context.Context, node devnet.Node, block *requests.Block, transaction *jsonrpc.RPCTransaction) error { return f(ctx, node, block, transaction) } type BlockMap map[libcommon.Hash]*requests.Block type waitResult struct { err error blockMap BlockMap } type blockWaiter struct { result chan waitResult hashes chan map[libcommon.Hash]struct{} waitHashes map[libcommon.Hash]struct{} headersSub ethereum.Subscription handler BlockHandler logger log.Logger } type Waiter interface { Await(libcommon.Hash) (*requests.Block, error) AwaitMany(...libcommon.Hash) (BlockMap, error) } type waitError struct { err error } func (w waitError) Await(libcommon.Hash) (*requests.Block, error) { return nil, w.err } func (w waitError) AwaitMany(...libcommon.Hash) (BlockMap, error) { return nil, w.err } type wait struct { waiter *blockWaiter } func (w wait) Await(hash libcommon.Hash) (*requests.Block, error) { w.waiter.hashes <- map[libcommon.Hash]struct{}{hash: {}} res := <-w.waiter.result if len(res.blockMap) > 0 { for _, block := range res.blockMap { return block, res.err } } return nil, res.err } func (w wait) AwaitMany(hashes ...libcommon.Hash) (BlockMap, error) { if len(hashes) == 0 { return nil, nil } hashMap := map[libcommon.Hash]struct{}{} for _, hash := range hashes { hashMap[hash] = struct{}{} } w.waiter.hashes <- hashMap res := <-w.waiter.result return res.blockMap, res.err } func BlockWaiter(ctx context.Context, handler BlockHandler) (Waiter, context.CancelFunc) { ctx, cancel := context.WithCancel(ctx) node := devnet.SelectBlockProducer(ctx) waiter := &blockWaiter{ result: make(chan waitResult, 1), hashes: make(chan map[libcommon.Hash]struct{}, 1), handler: handler, logger: devnet.Logger(ctx), } var err error headers := make(chan types.Header) waiter.headersSub, err = node.Subscribe(ctx, requests.Methods.ETHNewHeads, headers) if err != nil { defer close(waiter.result) return waitError{err}, cancel } go waiter.receive(ctx, node, headers) return wait{waiter}, cancel } func (c *blockWaiter) receive(ctx context.Context, node devnet.Node, headers chan types.Header) { blockMap := map[libcommon.Hash]*requests.Block{} defer close(c.result) for header := range headers { select { case <-ctx.Done(): c.headersSub.Unsubscribe() c.result <- waitResult{blockMap: blockMap, err: ctx.Err()} return default: } block, err := node.GetBlockByNumber(ctx, rpc.AsBlockNumber(header.Number), true) if err != nil { c.logger.Error("Block waiter failed to get block", "err", err) continue } if len(block.Transactions) > 0 && c.waitHashes == nil { c.waitHashes = <-c.hashes } for i := range block.Transactions { tx := block.Transactions[i] // avoid implicit memory aliasing if _, ok := c.waitHashes[tx.Hash]; ok { c.logger.Info("Tx included into block", "txHash", tx.Hash, "blockNum", block.Number) blockMap[tx.Hash] = block delete(c.waitHashes, tx.Hash) if len(c.waitHashes) == 0 { c.headersSub.Unsubscribe() res := waitResult{ err: c.handler.Handle(ctx, node, block, tx), } if res.err == nil { res.blockMap = blockMap } c.result <- res return } } } } }