fix(#4543): BeginRo use semaphore (#520)

This commit is contained in:
Max Revitt 2022-07-13 13:37:45 +01:00 committed by GitHub
parent 21c6baf287
commit d629e31df7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 17 deletions

1
go.mod
View File

@ -41,6 +41,7 @@ require (
github.com/valyala/histogram v1.2.0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect

2
go.sum
View File

@ -163,6 +163,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@ -31,10 +31,12 @@ import (
"github.com/c2h5oh/datasize"
stack2 "github.com/go-stack/stack"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"github.com/torquem-ch/mdbx-go/mdbx"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"
"github.com/ledgerwatch/erigon-lib/kv"
)
const NonExistingDBI kv.DBI = 999_999_999
@ -57,7 +59,7 @@ type MdbxOpts struct {
syncPeriod time.Duration
augumentLimit uint64
pageSize uint64
roTxsLimiter chan struct{}
roTxsLimiter *semaphore.Weighted
}
func testKVPath() string {
@ -86,7 +88,7 @@ func (opts MdbxOpts) Label(label kv.Label) MdbxOpts {
return opts
}
func (opts MdbxOpts) RoTxsLimiter(l chan struct{}) MdbxOpts {
func (opts MdbxOpts) RoTxsLimiter(l *semaphore.Weighted) MdbxOpts {
opts.roTxsLimiter = l
return opts
}
@ -254,7 +256,7 @@ func (opts MdbxOpts) Open() (kv.RwDB, error) {
}
if opts.roTxsLimiter == nil {
opts.roTxsLimiter = make(chan struct{}, runtime.GOMAXPROCS(-1))
opts.roTxsLimiter = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(-1)))
}
db := &MdbxKV{
opts: opts,
@ -328,7 +330,7 @@ type MdbxKV struct {
buckets kv.TableCfg
opts MdbxOpts
txSize uint64
roTxsLimiter chan struct{} // does limit amount of concurrent Ro transactions - in most casess runtime.NumCPU() is good value for this channel capacity - this channel can be shared with other components (like Decompressor)
roTxsLimiter *semaphore.Weighted // does limit amount of concurrent Ro transactions - in most casess runtime.NumCPU() is good value for this channel capacity - this channel can be shared with other components (like Decompressor)
closed atomic.Bool
}
@ -392,10 +394,16 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
if db.closed.Load() {
return nil, fmt.Errorf("db closed")
}
select {
case <-ctx.Done():
// will return nil err if context is cancelled (may appear to acquire the semaphore)
if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
return nil, semErr
}
// if context cancelled as we acquire the sempahore, it may succeed without blocking
// in this case we should return
if ctx.Err() != nil {
return nil, ctx.Err()
case db.roTxsLimiter <- struct{}{}:
}
defer func() {
@ -405,7 +413,7 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
if txn == nil {
// on error, or if there is whatever reason that we don't return a tx,
// we need to free up the limiter slot, otherwise it could lead to deadlocks
<-db.roTxsLimiter
db.roTxsLimiter.Release(1)
}
}()
@ -770,10 +778,7 @@ func (tx *MdbxTx) Commit() error {
tx.tx = nil
tx.db.wg.Done()
if tx.readOnly {
select {
case <-tx.db.roTxsLimiter:
default:
}
tx.db.roTxsLimiter.Release(1)
} else {
runtime.UnlockOSThread()
}
@ -828,10 +833,7 @@ func (tx *MdbxTx) Rollback() {
tx.tx = nil
tx.db.wg.Done()
if tx.readOnly {
select {
case <-tx.db.roTxsLimiter:
default:
}
tx.db.roTxsLimiter.Release(1)
} else {
runtime.UnlockOSThread()
}