From 2dcec832229b030cd9d5d5210546b90e7cc0792d Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 7 Dec 2022 11:19:08 +0700 Subject: [PATCH] mdbx: add BeginRwAsync method (#767) --- kv/kv_interface.go | 1 + kv/mdbx/kv_mdbx.go | 7 +++++-- kv/mdbx/kv_mdbx_temporary.go | 17 ++++++++++------- kv/remotedb/kv_remote.go | 3 +++ 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/kv/kv_interface.go b/kv/kv_interface.go index b80f98db7..fec3708b5 100644 --- a/kv/kv_interface.go +++ b/kv/kv_interface.go @@ -180,6 +180,7 @@ type RwDB interface { Update(ctx context.Context, f func(tx RwTx) error) error BeginRw(ctx context.Context) (RwTx, error) + BeginRwAsync(ctx context.Context) (RwTx, error) } type StatelessReadTx interface { diff --git a/kv/mdbx/kv_mdbx.go b/kv/mdbx/kv_mdbx.go index 9aa368314..927ed3039 100644 --- a/kv/mdbx/kv_mdbx.go +++ b/kv/mdbx/kv_mdbx.go @@ -471,7 +471,10 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) { }, nil } -func (db *MdbxKV) BeginRw(_ context.Context) (txn kv.RwTx, err error) { +func (db *MdbxKV) BeginRw(_ context.Context) (kv.RwTx, error) { return db.beginRw(0) } +func (db *MdbxKV) BeginRwAsync(_ context.Context) (kv.RwTx, error) { return db.beginRw(mdbx.TxNoSync) } + +func (db *MdbxKV) beginRw(flags uint) (txn kv.RwTx, err error) { if db.closed.Load() { return nil, fmt.Errorf("db closed") } @@ -482,7 +485,7 @@ func (db *MdbxKV) BeginRw(_ context.Context) (txn kv.RwTx, err error) { } }() - tx, err := db.env.BeginTxn(nil, 0) + tx, err := db.env.BeginTxn(nil, flags) if err != nil { runtime.UnlockOSThread() // unlock only in case of error. normal flow is "defer .Rollback()" return nil, fmt.Errorf("%w, lable: %s, trace: %s", err, db.opts.label.String(), stack2.Trace().String()) diff --git a/kv/mdbx/kv_mdbx_temporary.go b/kv/mdbx/kv_mdbx_temporary.go index 7ef92e647..2c2a5535e 100644 --- a/kv/mdbx/kv_mdbx_temporary.go +++ b/kv/mdbx/kv_mdbx_temporary.go @@ -25,24 +25,24 @@ import ( ) type TemporaryMdbx struct { - db kv.RwDB - chaindata string + db kv.RwDB + path string } func NewTemporaryMdbx() (kv.RwDB, error) { - chaindata, err := os.MkdirTemp("", "mdbx-temp") + path, err := os.MkdirTemp("", "mdbx-temp") if err != nil { return &TemporaryMdbx{}, err } - db, err := Open(chaindata, log.Root(), false) + db, err := Open(path, log.Root(), false) if err != nil { return &TemporaryMdbx{}, err } return &TemporaryMdbx{ - db: db, - chaindata: chaindata, + db: db, + path: path, }, nil } @@ -53,6 +53,9 @@ func (t *TemporaryMdbx) Update(ctx context.Context, f func(kv.RwTx) error) error func (t *TemporaryMdbx) BeginRw(ctx context.Context) (kv.RwTx, error) { return t.db.BeginRw(ctx) } +func (t *TemporaryMdbx) BeginRwAsync(ctx context.Context) (kv.RwTx, error) { + return t.db.BeginRwAsync(ctx) +} func (t *TemporaryMdbx) View(ctx context.Context, f func(kv.Tx) error) error { return t.db.View(ctx, f) @@ -72,5 +75,5 @@ func (t *TemporaryMdbx) PageSize() uint64 { func (t *TemporaryMdbx) Close() { t.db.Close() - os.RemoveAll(t.chaindata) + os.RemoveAll(t.path) } diff --git a/kv/remotedb/kv_remote.go b/kv/remotedb/kv_remote.go index 1af32cbe6..771bccfdd 100644 --- a/kv/remotedb/kv_remote.go +++ b/kv/remotedb/kv_remote.go @@ -160,6 +160,9 @@ func (db *RemoteKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) { func (db *RemoteKV) BeginRw(ctx context.Context) (kv.RwTx, error) { return nil, fmt.Errorf("remote db provider doesn't support .BeginRw method") } +func (db *RemoteKV) BeginRwAsync(ctx context.Context) (kv.RwTx, error) { + return nil, fmt.Errorf("remote db provider doesn't support .BeginRw method") +} func (db *RemoteKV) View(ctx context.Context, f func(tx kv.Tx) error) (err error) { tx, err := db.BeginRo(ctx)