diff --git a/go.mod b/go.mod index 4ff54c24f..70f5ca8d1 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/valyala/histogram v1.2.0 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.10 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect diff --git a/go.sum b/go.sum index ff6e87f8d..feee6ab88 100644 --- a/go.sum +++ b/go.sum @@ -163,6 +163,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/kv/mdbx/kv_mdbx.go b/kv/mdbx/kv_mdbx.go index 1aae79097..99a1fa1fe 100644 --- a/kv/mdbx/kv_mdbx.go +++ b/kv/mdbx/kv_mdbx.go @@ -31,10 +31,12 @@ import ( "github.com/c2h5oh/datasize" stack2 "github.com/go-stack/stack" - "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" "github.com/torquem-ch/mdbx-go/mdbx" "go.uber.org/atomic" + "golang.org/x/sync/semaphore" + + "github.com/ledgerwatch/erigon-lib/kv" ) const NonExistingDBI kv.DBI = 999_999_999 @@ -57,7 +59,7 @@ type MdbxOpts struct { syncPeriod time.Duration augumentLimit uint64 pageSize uint64 - roTxsLimiter chan struct{} + roTxsLimiter *semaphore.Weighted } func testKVPath() string { @@ -86,7 +88,7 @@ func (opts MdbxOpts) Label(label kv.Label) MdbxOpts { return opts } -func (opts MdbxOpts) RoTxsLimiter(l chan struct{}) MdbxOpts { +func (opts MdbxOpts) RoTxsLimiter(l *semaphore.Weighted) MdbxOpts { opts.roTxsLimiter = l return opts } @@ -254,7 +256,7 @@ func (opts MdbxOpts) Open() (kv.RwDB, error) { } if opts.roTxsLimiter == nil { - opts.roTxsLimiter = make(chan struct{}, runtime.GOMAXPROCS(-1)) + opts.roTxsLimiter = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(-1))) } db := &MdbxKV{ opts: opts, @@ -328,7 +330,7 @@ type MdbxKV struct { buckets kv.TableCfg opts MdbxOpts txSize uint64 - roTxsLimiter chan struct{} // does limit amount of concurrent Ro transactions - in most casess runtime.NumCPU() is good value for this channel capacity - this channel can be shared with other components (like Decompressor) + roTxsLimiter *semaphore.Weighted // does limit amount of concurrent Ro transactions - in most casess runtime.NumCPU() is good value for this channel capacity - this channel can be shared with other components (like Decompressor) closed atomic.Bool } @@ -392,10 +394,16 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) { if db.closed.Load() { return nil, fmt.Errorf("db closed") } - select { - case <-ctx.Done(): + + // will return nil err if context is cancelled (may appear to acquire the semaphore) + if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil { + return nil, semErr + } + + // if context cancelled as we acquire the sempahore, it may succeed without blocking + // in this case we should return + if ctx.Err() != nil { return nil, ctx.Err() - case db.roTxsLimiter <- struct{}{}: } defer func() { @@ -405,7 +413,7 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) { if txn == nil { // on error, or if there is whatever reason that we don't return a tx, // we need to free up the limiter slot, otherwise it could lead to deadlocks - <-db.roTxsLimiter + db.roTxsLimiter.Release(1) } }() @@ -770,10 +778,7 @@ func (tx *MdbxTx) Commit() error { tx.tx = nil tx.db.wg.Done() if tx.readOnly { - select { - case <-tx.db.roTxsLimiter: - default: - } + tx.db.roTxsLimiter.Release(1) } else { runtime.UnlockOSThread() } @@ -828,10 +833,7 @@ func (tx *MdbxTx) Rollback() { tx.tx = nil tx.db.wg.Done() if tx.readOnly { - select { - case <-tx.db.roTxsLimiter: - default: - } + tx.db.roTxsLimiter.Release(1) } else { runtime.UnlockOSThread() }