Fork choice is waiting fix (#6711)

This commit is contained in:
hexoscott 2023-01-30 08:52:29 +01:00 committed by GitHub
parent 8c2713b0b8
commit f13016c7ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 18 deletions

View File

@ -10,14 +10,15 @@ import (
"time" "time"
"github.com/holiman/uint256" "github.com/holiman/uint256"
"github.com/ledgerwatch/log/v3"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common" libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types" types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus/serenity" "github.com/ledgerwatch/erigon/consensus/serenity"
@ -267,20 +268,32 @@ func convertPayloadStatus(payloadStatus *engineapi.PayloadStatus) *remote.Engine
} }
func (s *EthBackendServer) stageLoopIsBusy() bool { func (s *EthBackendServer) stageLoopIsBusy() bool {
for i := 0; i < 20; i++ { waiter := make(chan struct{})
if !s.hd.BeaconRequestList.IsWaiting() { defer libcommon.SafeClose(waiter)
// This might happen, for example, in the following scenario:
// 1) CL sends NewPayload and immediately after that ForkChoiceUpdated.
// 2) We happily process NewPayload and stage loop is at the end.
// 3) We start processing ForkChoiceUpdated,
// but the stage loop hasn't moved yet from the end to the beginning of HeadersPOS
// and thus requestList.WaitForRequest() is not called yet.
// TODO(yperbasis): find a more elegant solution busy := true
time.Sleep(5 * time.Millisecond) wg := sync.WaitGroup{}
wg.Add(1)
go func() {
select {
case <-time.After(1 * time.Second):
// timed out so just call done
fmt.Println("hexo: timed out")
wg.Done()
case <-waiter:
// state is now waiting so we're not busy
fmt.Println("hexo: finished waiting")
busy = false
wg.Done()
} }
} }()
return !s.hd.BeaconRequestList.IsWaiting()
s.hd.BeaconRequestList.WaitForWaiting(waiter)
wg.Wait()
return busy
} }
// EngineNewPayload validates and possibly executes payload // EngineNewPayload validates and possibly executes payload

View File

@ -5,6 +5,7 @@ import (
"sync/atomic" "sync/atomic"
"github.com/emirpasic/gods/maps/treemap" "github.com/emirpasic/gods/maps/treemap"
libcommon "github.com/ledgerwatch/erigon-lib/common" libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
@ -54,12 +55,16 @@ type RequestList struct {
interrupt Interrupt interrupt Interrupt
waiting uint32 waiting uint32
syncCond *sync.Cond syncCond *sync.Cond
waiterMtx sync.Mutex
waiters []chan struct{}
} }
func NewRequestList() *RequestList { func NewRequestList() *RequestList {
rl := &RequestList{ rl := &RequestList{
requests: treemap.NewWithIntComparator(), requests: treemap.NewWithIntComparator(),
syncCond: sync.NewCond(&sync.Mutex{}), syncCond: sync.NewCond(&sync.Mutex{}),
waiterMtx: sync.Mutex{},
waiters: make([]chan struct{}, 0),
} }
return rl return rl
} }
@ -117,8 +122,8 @@ func (rl *RequestList) WaitForRequest(onlyNew bool, noWait bool) (interrupt Inte
rl.syncCond.L.Lock() rl.syncCond.L.Lock()
defer rl.syncCond.L.Unlock() defer rl.syncCond.L.Unlock()
atomic.StoreUint32(&rl.waiting, 1) rl.updateWaiting(1)
defer atomic.StoreUint32(&rl.waiting, 0) defer rl.updateWaiting(0)
for { for {
interrupt = rl.interrupt interrupt = rl.interrupt
@ -141,6 +146,33 @@ func (rl *RequestList) IsWaiting() bool {
return atomic.LoadUint32(&rl.waiting) != 0 return atomic.LoadUint32(&rl.waiting) != 0
} }
func (rl *RequestList) updateWaiting(val uint32) {
rl.waiterMtx.Lock()
defer rl.waiterMtx.Unlock()
atomic.StoreUint32(&rl.waiting, val)
if val == 1 {
// something might be waiting to be notified of the waiting state being ready
for _, c := range rl.waiters {
c <- struct{}{}
}
rl.waiters = make([]chan struct{}, 0)
}
}
func (rl *RequestList) WaitForWaiting(c chan struct{}) {
rl.waiterMtx.Lock()
defer rl.waiterMtx.Unlock()
val := atomic.LoadUint32(&rl.waiting)
if val == 1 {
// we are already waiting so just send to the channel and quit
c <- struct{}{}
} else {
// we need to register a waiter now to be notified when we are ready
rl.waiters = append(rl.waiters, c)
}
}
func (rl *RequestList) Interrupt(kind Interrupt) { func (rl *RequestList) Interrupt(kind Interrupt) {
rl.syncCond.L.Lock() rl.syncCond.L.Lock()
defer rl.syncCond.L.Unlock() defer rl.syncCond.L.Unlock()