stop writing to closed channel panic (#6763)

This commit is contained in:
hexoscott 2023-02-06 11:17:32 +00:00 committed by GitHub
parent 966be04e6d
commit 80a37eb209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 39 deletions

View File

@ -268,30 +268,17 @@ func convertPayloadStatus(payloadStatus *engineapi.PayloadStatus) *remote.Engine
}
func (s *EthBackendServer) stageLoopIsBusy() bool {
waiter := make(chan struct{})
defer libcommon.SafeClose(waiter)
busy := true
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
wait, ok := s.hd.BeaconRequestList.WaitForWaiting(ctx)
if !ok {
select {
case <-time.After(1 * time.Second):
// timed out so just call done
wg.Done()
case <-waiter:
// state is now waiting so we're not busy
busy = false
wg.Done()
case <-wait:
case <-ctx.Done():
}
}()
}
s.hd.BeaconRequestList.WaitForWaiting(waiter)
wg.Wait()
return busy
return !s.hd.BeaconRequestList.IsWaiting()
}
// EngineNewPayload validates and possibly executes payload

View File

@ -1,8 +1,8 @@
package engineapi
import (
"context"
"sync"
"sync/atomic"
"github.com/emirpasic/gods/maps/treemap"
@ -10,6 +10,7 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon/core/types"
"go.uber.org/atomic"
)
// This is the status of a newly execute block.
@ -53,10 +54,16 @@ type RequestList struct {
requestId int
requests *treemap.Map // map[int]*RequestWithStatus
interrupt Interrupt
waiting uint32
waiting atomic.Uint32
syncCond *sync.Cond
// waiterMtx is used to place locks around `waiters` so the slice can be read/modified safely
// and also serves as a lock around the `waiting` field so that this can't be modified whilst it
// is being read. waiters is modified as part of updating waiter so we need to ensure these
// two things lock at the same time
waiterMtx sync.Mutex
waiters []chan struct{}
waiters []chan struct{}
}
func NewRequestList() *RequestList {
@ -65,6 +72,7 @@ func NewRequestList() *RequestList {
syncCond: sync.NewCond(&sync.Mutex{}),
waiterMtx: sync.Mutex{},
waiters: make([]chan struct{}, 0),
waiting: atomic.Uint32{},
}
return rl
}
@ -122,8 +130,14 @@ func (rl *RequestList) WaitForRequest(onlyNew bool, noWait bool) (interrupt Inte
rl.syncCond.L.Lock()
defer rl.syncCond.L.Unlock()
rl.waiterMtx.Lock()
rl.updateWaiting(1)
defer rl.updateWaiting(0)
rl.waiterMtx.Unlock()
defer func() {
rl.waiterMtx.Lock()
rl.updateWaiting(0)
rl.waiterMtx.Unlock()
}()
for {
interrupt = rl.interrupt
@ -143,33 +157,36 @@ func (rl *RequestList) WaitForRequest(onlyNew bool, noWait bool) (interrupt Inte
}
func (rl *RequestList) IsWaiting() bool {
return atomic.LoadUint32(&rl.waiting) != 0
return rl.waiting.Load() != 0
}
// update waiting should always be called from a locked context using rl.waiterMtx as it
// updates rl.waiters in certain scenarios
func (rl *RequestList) updateWaiting(val uint32) {
rl.waiterMtx.Lock()
defer rl.waiterMtx.Unlock()
atomic.StoreUint32(&rl.waiting, val)
rl.waiting.Store(val)
if val == 1 {
// something might be waiting to be notified of the waiting state being ready
for _, c := range rl.waiters {
c <- struct{}{}
for i, c := range rl.waiters {
close(c)
rl.waiters[i] = nil
}
rl.waiters = make([]chan struct{}, 0)
rl.waiters = rl.waiters[:0]
}
}
func (rl *RequestList) WaitForWaiting(c chan struct{}) {
func (rl *RequestList) WaitForWaiting(ctx context.Context) (chan struct{}, bool) {
rl.waiterMtx.Lock()
defer rl.waiterMtx.Unlock()
val := atomic.LoadUint32(&rl.waiting)
if val == 1 {
// we are already waiting so just send to the channel and quit
c <- struct{}{}
isWaiting := rl.waiting.Load()
if isWaiting == 1 {
// we are already waiting so just return
return nil, true
} else {
// we need to register a waiter now to be notified when we are ready
c := make(chan struct{}, 1)
rl.waiters = append(rl.waiters, c)
return c, false
}
}