From cc9fb8e21d579e3ddcfea684841f4f99f8a57dce Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Wed, 18 May 2022 16:33:37 +0200 Subject: [PATCH] eth/catalyst, miner: build the execution payload async (#24866) * eth/catalyst: build the execution payload async * miner: added comment, added test case * eth/catalyst: miner: move async block production to miner * eth/catalyst, miner: support generate seal block async * miner: rework GetSealingBlockAsync to use a passed channel * miner: apply rjl's diff * eth/catalyst: nitpicks Co-authored-by: Gary Rong --- eth/catalyst/api.go | 30 +++++---------- eth/catalyst/api_test.go | 82 ++++++++++++++++++++++++++++++++++++++-- eth/catalyst/queue.go | 54 +++++++++++++++++++++++--- miner/miner.go | 30 ++++++++++++--- miner/worker.go | 35 +++++++++-------- miner/worker_test.go | 8 +++- 6 files changed, 186 insertions(+), 53 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 81de68fbe..ab24ea0f5 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -197,18 +197,19 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa // sealed by the beacon client. The payload will be requested later, and we // might replace it arbitrarily many times in between. if payloadAttributes != nil { - log.Info("Creating new payload for sealing") - start := time.Now() - - data, err := api.assembleBlock(update.HeadBlockHash, payloadAttributes) + // Create an empty block first which can be used as a fallback + empty, err := api.eth.Miner().GetSealingBlockSync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, true) if err != nil { - log.Error("Failed to create sealing payload", "err", err) - return valid(nil), err // valid setHead, invalid payload + return valid(nil), err + } + // Send a request to generate a full block in the background. + // The result can be obtained via the returned channel. + resCh, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false) + if err != nil { + return valid(nil), err } id := computePayloadId(update.HeadBlockHash, payloadAttributes) - api.localBlocks.put(id, data) - - log.Info("Created payload for sealing", "id", id, "elapsed", time.Since(start)) + api.localBlocks.put(id, &payload{empty: empty, result: resCh}) return valid(&id), nil } return valid(nil), nil @@ -344,14 +345,3 @@ func (api *ConsensusAPI) invalid(err error) beacon.PayloadStatusV1 { errorMsg := err.Error() return beacon.PayloadStatusV1{Status: beacon.INVALID, LatestValidHash: ¤tHash, ValidationError: &errorMsg} } - -// assembleBlock creates a new block and returns the "execution -// data" required for beacon clients to process the new block. -func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) { - log.Info("Producing block", "parentHash", parentHash) - block, err := api.eth.Miner().GetSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random) - if err != nil { - return nil, err - } - return beacon.BlockToExecutableData(block), nil -} diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 657819a11..0b631739a 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -93,7 +93,7 @@ func TestEth2AssembleBlock(t *testing.T) { blockParams := beacon.PayloadAttributesV1{ Timestamp: blocks[9].Time() + 5, } - execData, err := api.assembleBlock(blocks[9].Hash(), &blockParams) + execData, err := assembleBlock(api, blocks[9].Hash(), &blockParams) if err != nil { t.Fatalf("error producing block, err=%v", err) } @@ -114,7 +114,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) { blockParams := beacon.PayloadAttributesV1{ Timestamp: blocks[8].Time() + 5, } - execData, err := api.assembleBlock(blocks[8].Hash(), &blockParams) + execData, err := assembleBlock(api, blocks[8].Hash(), &blockParams) if err != nil { t.Fatalf("error producing block, err=%v", err) } @@ -273,7 +273,7 @@ func TestEth2NewBlock(t *testing.T) { tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) ethservice.TxPool().AddLocal(tx) - execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{ + execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{ Timestamp: parent.Time() + 5, }) if err != nil { @@ -313,7 +313,7 @@ func TestEth2NewBlock(t *testing.T) { ) parent = preMergeBlocks[len(preMergeBlocks)-1] for i := 0; i < 10; i++ { - execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{ + execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{ Timestamp: parent.Time() + 6, }) if err != nil { @@ -530,3 +530,77 @@ func TestExchangeTransitionConfig(t *testing.T) { t.Fatalf("expected no error on valid config, got %v", err) } } + +func TestEmptyBlocks(t *testing.T) { + genesis, preMergeBlocks := generatePreMergeChain(10) + n, ethservice := startEthService(t, genesis, preMergeBlocks) + ethservice.Merger().ReachTTD() + defer n.Close() + var ( + api = NewConsensusAPI(ethservice) + parent = ethservice.BlockChain().CurrentBlock() + // This EVM code generates a log when the contract is created. + logCode = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") + ) + for i := 0; i < 10; i++ { + statedb, _ := ethservice.BlockChain().StateAt(parent.Root()) + nonce := statedb.GetNonce(testAddr) + tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) + ethservice.TxPool().AddLocal(tx) + + params := beacon.PayloadAttributesV1{ + Timestamp: parent.Time() + 1, + Random: crypto.Keccak256Hash([]byte{byte(i)}), + SuggestedFeeRecipient: parent.Coinbase(), + } + + fcState := beacon.ForkchoiceStateV1{ + HeadBlockHash: parent.Hash(), + SafeBlockHash: common.Hash{}, + FinalizedBlockHash: common.Hash{}, + } + resp, err := api.ForkchoiceUpdatedV1(fcState, ¶ms) + if err != nil { + t.Fatalf("error preparing payload, err=%v", err) + } + if resp.PayloadStatus.Status != beacon.VALID { + t.Fatalf("error preparing payload, invalid status: %v", resp.PayloadStatus.Status) + } + payload, err := api.GetPayloadV1(*resp.PayloadID) + if err != nil { + t.Fatalf("can't get payload: %v", err) + } + // TODO(493456442, marius) this test can be flaky since we rely on a 100ms + // allowance for block generation internally. + if len(payload.Transactions) == 0 { + t.Fatalf("payload should not be empty") + } + execResp, err := api.NewPayloadV1(*payload) + if err != nil { + t.Fatalf("can't execute payload: %v", err) + } + if execResp.Status != beacon.VALID { + t.Fatalf("invalid status: %v", execResp.Status) + } + fcState = beacon.ForkchoiceStateV1{ + HeadBlockHash: payload.BlockHash, + SafeBlockHash: payload.ParentHash, + FinalizedBlockHash: payload.ParentHash, + } + if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { + t.Fatalf("Failed to insert block: %v", err) + } + if ethservice.BlockChain().CurrentBlock().NumberU64() != payload.Number { + t.Fatalf("Chain head should be updated") + } + parent = ethservice.BlockChain().CurrentBlock() + } +} + +func assembleBlock(api *ConsensusAPI, parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) { + block, err := api.eth.Miner().GetSealingBlockSync(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random, false) + if err != nil { + return nil, err + } + return beacon.BlockToExecutableData(block), nil +} diff --git a/eth/catalyst/queue.go b/eth/catalyst/queue.go index ffb2f56bf..ff8edc120 100644 --- a/eth/catalyst/queue.go +++ b/eth/catalyst/queue.go @@ -18,6 +18,7 @@ package catalyst import ( "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/beacon" @@ -34,11 +35,52 @@ const maxTrackedPayloads = 10 // latest one; but have a slight wiggle room for non-ideal conditions. const maxTrackedHeaders = 10 +// payload wraps the miner's block production channel, allowing the mined block +// to be retrieved later upon the GetPayload engine API call. +type payload struct { + lock sync.Mutex + done bool + empty *types.Block + block *types.Block + result chan *types.Block +} + +// resolve extracts the generated full block from the given channel if possible +// or fallback to empty block as an alternative. +func (req *payload) resolve() *beacon.ExecutableDataV1 { + // this function can be called concurrently, prevent any + // concurrency issue in the first place. + req.lock.Lock() + defer req.lock.Unlock() + + // Try to resolve the full block first if it's not obtained + // yet. The returned block can be nil if the generation fails. + + if !req.done { + timeout := time.NewTimer(500 * time.Millisecond) + defer timeout.Stop() + + select { + case req.block = <-req.result: + req.done = true + case <-timeout.C: + // TODO(rjl49345642, Marius), should we keep this + // 100ms timeout allowance? Why not just use the + // default and then fallback to empty directly? + } + } + + if req.block != nil { + return beacon.BlockToExecutableData(req.block) + } + return beacon.BlockToExecutableData(req.empty) +} + // payloadQueueItem represents an id->payload tuple to store until it's retrieved // or evicted. type payloadQueueItem struct { - id beacon.PayloadID - payload *beacon.ExecutableDataV1 + id beacon.PayloadID + data *payload } // payloadQueue tracks the latest handful of constructed payloads to be retrieved @@ -57,14 +99,14 @@ func newPayloadQueue() *payloadQueue { } // put inserts a new payload into the queue at the given id. -func (q *payloadQueue) put(id beacon.PayloadID, data *beacon.ExecutableDataV1) { +func (q *payloadQueue) put(id beacon.PayloadID, data *payload) { q.lock.Lock() defer q.lock.Unlock() copy(q.payloads[1:], q.payloads) q.payloads[0] = &payloadQueueItem{ - id: id, - payload: data, + id: id, + data: data, } } @@ -78,7 +120,7 @@ func (q *payloadQueue) get(id beacon.PayloadID) *beacon.ExecutableDataV1 { return nil // no more items } if item.id == id { - return item.payload + return item.data.resolve() } } return nil diff --git a/miner/miner.go b/miner/miner.go index 20e12c240..16c3bf19d 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -235,14 +235,32 @@ func (miner *Miner) DisablePreseal() { miner.worker.disablePreseal() } -// GetSealingBlock retrieves a sealing block based on the given parameters. -// The returned block is not sealed but all other fields should be filled. -func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) { - return miner.worker.getSealingBlock(parent, timestamp, coinbase, random) -} - // SubscribePendingLogs starts delivering logs from pending transactions // to the given channel. func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription { return miner.worker.pendingLogsFeed.Subscribe(ch) } + +// GetSealingBlockAsync requests to generate a sealing block according to the +// given parameters. Regardless of whether the generation is successful or not, +// there is always a result that will be returned through the result channel. +// The difference is that if the execution fails, the returned result is nil +// and the concrete error is dropped silently. +func (miner *Miner) GetSealingBlockAsync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, error) { + resCh, _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs) + if err != nil { + return nil, err + } + return resCh, nil +} + +// GetSealingBlockSync creates a sealing block according to the given parameters. +// If the generation is failed or the underlying work is already closed, an error +// will be returned. +func (miner *Miner) GetSealingBlockSync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, error) { + resCh, errCh, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs) + if err != nil { + return nil, err + } + return <-resCh, <-errCh +} diff --git a/miner/worker.go b/miner/worker.go index 31022e7e1..ae1b61d42 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -170,8 +170,8 @@ type newWorkReq struct { // getWorkReq represents a request for getting a new sealing work with provided parameters. type getWorkReq struct { params *generateParams - err error - result chan *types.Block + result chan *types.Block // non-blocking channel + err chan error } // intervalAdjust represents a resubmitting interval adjustment. @@ -536,12 +536,12 @@ func (w *worker) mainLoop() { case req := <-w.getWorkCh: block, err := w.generateWork(req.params) if err != nil { - req.err = err + req.err <- err req.result <- nil } else { + req.err <- nil req.result <- block } - case ev := <-w.chainSideCh: // Short circuit for duplicate side blocks if _, exist := w.localUncles[ev.Block.Hash()]; exist { @@ -969,6 +969,7 @@ type generateParams struct { random common.Hash // The randomness generated by beacon chain, empty before the merge noUncle bool // Flag whether the uncle block inclusion is allowed noExtra bool // Flag whether the extra field assignment is allowed + noTxs bool // Flag whether an empty block without any transaction is expected } // prepareWork constructs the sealing task according to the given parameters, @@ -1090,8 +1091,9 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { } defer work.discard() - w.fillTransactions(nil, work) - + if !params.noTxs { + w.fillTransactions(nil, work) + } return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) } @@ -1128,7 +1130,6 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { work.discard() return } - w.commit(work.copy(), w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover @@ -1177,7 +1178,13 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti } // getSealingBlock generates the sealing block based on the given parameters. -func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) { +// The generation result will be passed back via the given channel no matter +// the generation itself succeeds or not. +func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, chan error, error) { + var ( + resCh = make(chan *types.Block, 1) + errCh = make(chan error, 1) + ) req := &getWorkReq{ params: &generateParams{ timestamp: timestamp, @@ -1187,18 +1194,16 @@ func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase random: random, noUncle: true, noExtra: true, + noTxs: noTxs, }, - result: make(chan *types.Block, 1), + result: resCh, + err: errCh, } select { case w.getWorkCh <- req: - block := <-req.result - if block == nil { - return nil, req.err - } - return block, nil + return resCh, errCh, nil case <-w.exitCh: - return nil, errors.New("miner closed") + return nil, nil, errors.New("miner closed") } } diff --git a/miner/worker_test.go b/miner/worker_test.go index dd029433b..55361349b 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -638,7 +638,9 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co // This API should work even when the automatic sealing is not enabled for _, c := range cases { - block, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random) + resChan, errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false) + block := <-resChan + err := <-errChan if c.expectErr { if err == nil { t.Error("Expect error but get nil") @@ -654,7 +656,9 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co // This API should work even when the automatic sealing is enabled w.start() for _, c := range cases { - block, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random) + resChan, errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false) + block := <-resChan + err := <-errChan if c.expectErr { if err == nil { t.Error("Expect error but get nil")