mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-28 14:47:16 +00:00
semaphore for remote kv and reduce semaphore max count (#639)
This commit is contained in:
parent
aad257bc0c
commit
6f6b03d7f0
@ -281,7 +281,7 @@ func (opts MdbxOpts) Open() (kv.RwDB, error) {
|
||||
}
|
||||
|
||||
if opts.roTxsLimiter == nil {
|
||||
opts.roTxsLimiter = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(-1)))
|
||||
opts.roTxsLimiter = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(-1)) - 1) // 1 less than max to allow unlocking to happen
|
||||
}
|
||||
db := &MdbxKV{
|
||||
opts: opts,
|
||||
@ -420,6 +420,15 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
|
||||
return nil, fmt.Errorf("db closed")
|
||||
}
|
||||
|
||||
// don't try to acquire if the context is already done
|
||||
done := ctx.Done()
|
||||
select {
|
||||
case <-done:
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
// otherwise carry on
|
||||
}
|
||||
|
||||
// will return nil err if context is cancelled (may appear to acquire the semaphore)
|
||||
if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
|
||||
return nil, semErr
|
||||
|
@ -4,15 +4,18 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/gointerfaces"
|
||||
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
|
||||
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
)
|
||||
|
||||
// generate the messages and services
|
||||
@ -25,10 +28,11 @@ type remoteOpts struct {
|
||||
}
|
||||
|
||||
type RemoteKV struct {
|
||||
remoteKV remote.KVClient
|
||||
log log.Logger
|
||||
buckets kv.TableCfg
|
||||
opts remoteOpts
|
||||
remoteKV remote.KVClient
|
||||
log log.Logger
|
||||
buckets kv.TableCfg
|
||||
opts remoteOpts
|
||||
roTxsLimiter *semaphore.Weighted
|
||||
}
|
||||
|
||||
type remoteTx struct {
|
||||
@ -66,10 +70,11 @@ func (opts remoteOpts) WithBucketsConfig(f mdbx.TableCfgFunc) remoteOpts {
|
||||
|
||||
func (opts remoteOpts) Open() (*RemoteKV, error) {
|
||||
db := &RemoteKV{
|
||||
opts: opts,
|
||||
remoteKV: opts.remoteKV,
|
||||
log: log.New("remote_db", opts.DialAddress),
|
||||
buckets: kv.TableCfg{},
|
||||
opts: opts,
|
||||
remoteKV: opts.remoteKV,
|
||||
log: log.New("remote_db", opts.DialAddress),
|
||||
buckets: kv.TableCfg{},
|
||||
roTxsLimiter: semaphore.NewWeighted(int64(runtime.GOMAXPROCS(-1)) - 1), // 1 less than max to allow unlocking
|
||||
}
|
||||
customBuckets := opts.bucketsCfg(kv.ChaindataTablesCfg)
|
||||
for name, cfg := range customBuckets { // copy map to avoid changing global variable
|
||||
@ -115,13 +120,24 @@ func (db *RemoteKV) EnsureVersionCompatibility() bool {
|
||||
|
||||
func (db *RemoteKV) Close() {}
|
||||
|
||||
func (db *RemoteKV) BeginRo(ctx context.Context) (kv.Tx, error) {
|
||||
func (db *RemoteKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
|
||||
return nil, semErr
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// ensure we release the semaphore on error
|
||||
if txn == nil {
|
||||
db.roTxsLimiter.Release(1)
|
||||
}
|
||||
}()
|
||||
|
||||
streamCtx, streamCancelFn := context.WithCancel(ctx) // We create child context for the stream so we can cancel it to prevent leak
|
||||
stream, err := db.remoteKV.Tx(streamCtx)
|
||||
if err != nil {
|
||||
@ -172,6 +188,7 @@ func (tx *remoteTx) Commit() error {
|
||||
func (tx *remoteTx) Rollback() {
|
||||
// don't close opened cursors - just close stream, server will cleanup everything well
|
||||
tx.closeGrpcStream()
|
||||
tx.db.roTxsLimiter.Release(1)
|
||||
}
|
||||
func (tx *remoteTx) DBSize() (uint64, error) { panic("not implemented") }
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user