From f13016c7ec03a3ef04e9b18a65d9887b036c2191 Mon Sep 17 00:00:00 2001 From: hexoscott <70711990+hexoscott@users.noreply.github.com> Date: Mon, 30 Jan 2023 08:52:29 +0100 Subject: [PATCH] Fork choice is waiting fix (#6711) --- ethdb/privateapi/ethbackend.go | 41 ++++++++++++++++++++++----------- turbo/engineapi/request_list.go | 40 ++++++++++++++++++++++++++++---- 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/ethdb/privateapi/ethbackend.go b/ethdb/privateapi/ethbackend.go index b93ace6f9..a70c76c20 100644 --- a/ethdb/privateapi/ethbackend.go +++ b/ethdb/privateapi/ethbackend.go @@ -10,14 +10,15 @@ import ( "time" "github.com/holiman/uint256" + "github.com/ledgerwatch/log/v3" + "google.golang.org/protobuf/types/known/emptypb" + "github.com/ledgerwatch/erigon-lib/chain" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/log/v3" - "google.golang.org/protobuf/types/known/emptypb" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus/serenity" @@ -267,20 +268,32 @@ func convertPayloadStatus(payloadStatus *engineapi.PayloadStatus) *remote.Engine } func (s *EthBackendServer) stageLoopIsBusy() bool { - for i := 0; i < 20; i++ { - if !s.hd.BeaconRequestList.IsWaiting() { - // This might happen, for example, in the following scenario: - // 1) CL sends NewPayload and immediately after that ForkChoiceUpdated. - // 2) We happily process NewPayload and stage loop is at the end. - // 3) We start processing ForkChoiceUpdated, - // but the stage loop hasn't moved yet from the end to the beginning of HeadersPOS - // and thus requestList.WaitForRequest() is not called yet. + waiter := make(chan struct{}) + defer libcommon.SafeClose(waiter) - // TODO(yperbasis): find a more elegant solution - time.Sleep(5 * time.Millisecond) + busy := true + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + select { + case <-time.After(1 * time.Second): + // timed out so just call done + fmt.Println("hexo: timed out") + wg.Done() + case <-waiter: + // state is now waiting so we're not busy + fmt.Println("hexo: finished waiting") + busy = false + wg.Done() } - } - return !s.hd.BeaconRequestList.IsWaiting() + }() + + s.hd.BeaconRequestList.WaitForWaiting(waiter) + + wg.Wait() + + return busy } // EngineNewPayload validates and possibly executes payload diff --git a/turbo/engineapi/request_list.go b/turbo/engineapi/request_list.go index b9dd7cdae..7c622d1c1 100644 --- a/turbo/engineapi/request_list.go +++ b/turbo/engineapi/request_list.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "github.com/emirpasic/gods/maps/treemap" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" @@ -54,12 +55,16 @@ type RequestList struct { interrupt Interrupt waiting uint32 syncCond *sync.Cond + waiterMtx sync.Mutex + waiters []chan struct{} } func NewRequestList() *RequestList { rl := &RequestList{ - requests: treemap.NewWithIntComparator(), - syncCond: sync.NewCond(&sync.Mutex{}), + requests: treemap.NewWithIntComparator(), + syncCond: sync.NewCond(&sync.Mutex{}), + waiterMtx: sync.Mutex{}, + waiters: make([]chan struct{}, 0), } return rl } @@ -117,8 +122,8 @@ func (rl *RequestList) WaitForRequest(onlyNew bool, noWait bool) (interrupt Inte rl.syncCond.L.Lock() defer rl.syncCond.L.Unlock() - atomic.StoreUint32(&rl.waiting, 1) - defer atomic.StoreUint32(&rl.waiting, 0) + rl.updateWaiting(1) + defer rl.updateWaiting(0) for { interrupt = rl.interrupt @@ -141,6 +146,33 @@ func (rl *RequestList) IsWaiting() bool { return atomic.LoadUint32(&rl.waiting) != 0 } +func (rl *RequestList) updateWaiting(val uint32) { + rl.waiterMtx.Lock() + defer rl.waiterMtx.Unlock() + atomic.StoreUint32(&rl.waiting, val) + + if val == 1 { + // something might be waiting to be notified of the waiting state being ready + for _, c := range rl.waiters { + c <- struct{}{} + } + rl.waiters = make([]chan struct{}, 0) + } +} + +func (rl *RequestList) WaitForWaiting(c chan struct{}) { + rl.waiterMtx.Lock() + defer rl.waiterMtx.Unlock() + val := atomic.LoadUint32(&rl.waiting) + if val == 1 { + // we are already waiting so just send to the channel and quit + c <- struct{}{} + } else { + // we need to register a waiter now to be notified when we are ready + rl.waiters = append(rl.waiters, c) + } +} + func (rl *RequestList) Interrupt(kind Interrupt) { rl.syncCond.L.Lock() defer rl.syncCond.L.Unlock()