2021-07-28 03:37:25 +00:00
/ *
Copyright 2021 Erigon contributors
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2021-07-28 03:36:06 +00:00
package mdbx
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io/ioutil"
"os"
"runtime"
"sort"
"strings"
"sync"
2022-03-10 03:25:11 +00:00
"time"
2021-07-28 03:36:06 +00:00
"github.com/c2h5oh/datasize"
2021-10-22 02:12:39 +00:00
stack2 "github.com/go-stack/stack"
2021-07-28 03:36:06 +00:00
"github.com/ledgerwatch/erigon-lib/kv"
2021-07-28 04:50:45 +00:00
"github.com/ledgerwatch/log/v3"
2021-07-28 03:36:06 +00:00
"github.com/torquem-ch/mdbx-go/mdbx"
)
const NonExistingDBI kv . DBI = 999_999_999
type TableCfgFunc func ( defaultBuckets kv . TableCfg ) kv . TableCfg
func WithChaindataTables ( defaultBuckets kv . TableCfg ) kv . TableCfg {
return defaultBuckets
}
type MdbxOpts struct {
2021-09-27 02:49:17 +00:00
bucketsCfg TableCfgFunc
path string
inMem bool
label kv . Label // marker to distinct db instances - one process may open many databases. for example to collect metrics of only 1 database
verbosity kv . DBVerbosityLvl
mapSize datasize . ByteSize
flags uint
log log . Logger
2022-03-10 03:25:11 +00:00
syncPeriod time . Duration
2021-09-27 02:49:17 +00:00
augumentLimit uint64
2022-01-22 05:13:35 +00:00
pageSize uint64
2022-03-07 12:21:59 +00:00
roTxsLimiter chan struct { }
2021-07-28 03:36:06 +00:00
}
func testKVPath ( ) string {
dir , err := ioutil . TempDir ( os . TempDir ( ) , "erigon-test-db" )
if err != nil {
panic ( err )
}
return dir
}
func NewMDBX ( log log . Logger ) MdbxOpts {
return MdbxOpts {
bucketsCfg : WithChaindataTables ,
flags : mdbx . NoReadahead | mdbx . Coalesce | mdbx . Durable ,
log : log ,
2022-01-22 05:13:35 +00:00
pageSize : 4096 ,
2021-07-28 03:36:06 +00:00
}
}
func ( opts MdbxOpts ) Label ( label kv . Label ) MdbxOpts {
opts . label = label
return opts
}
2022-03-07 12:21:59 +00:00
func ( opts MdbxOpts ) RoTxsLimiter ( l chan struct { } ) MdbxOpts {
opts . roTxsLimiter = l
return opts
}
2021-09-27 02:49:17 +00:00
func ( opts MdbxOpts ) AugumentLimit ( v uint64 ) MdbxOpts {
opts . augumentLimit = v
return opts
}
2022-01-22 05:13:35 +00:00
func ( opts MdbxOpts ) PageSize ( v uint64 ) MdbxOpts {
opts . pageSize = v
return opts
}
2021-07-28 03:36:06 +00:00
func ( opts MdbxOpts ) Path ( path string ) MdbxOpts {
opts . path = path
return opts
}
func ( opts MdbxOpts ) Set ( opt MdbxOpts ) MdbxOpts {
return opt
}
func ( opts MdbxOpts ) InMem ( ) MdbxOpts {
opts . inMem = true
2022-01-18 05:55:20 +00:00
opts . flags = mdbx . UtterlyNoSync | mdbx . NoMetaSync | mdbx . LifoReclaim | mdbx . WriteMap
2021-07-28 03:36:06 +00:00
return opts
}
func ( opts MdbxOpts ) Exclusive ( ) MdbxOpts {
opts . flags = opts . flags | mdbx . Exclusive
return opts
}
func ( opts MdbxOpts ) Flags ( f func ( uint ) uint ) MdbxOpts {
opts . flags = f ( opts . flags )
return opts
}
func ( opts MdbxOpts ) Readonly ( ) MdbxOpts {
opts . flags = opts . flags | mdbx . Readonly
return opts
}
2022-03-10 03:25:11 +00:00
func ( opts MdbxOpts ) SyncPeriod ( period time . Duration ) MdbxOpts {
opts . syncPeriod = period
return opts
}
2021-07-28 03:36:06 +00:00
func ( opts MdbxOpts ) DBVerbosity ( v kv . DBVerbosityLvl ) MdbxOpts {
opts . verbosity = v
return opts
}
func ( opts MdbxOpts ) MapSize ( sz datasize . ByteSize ) MdbxOpts {
opts . mapSize = sz
return opts
}
2022-03-23 14:35:13 +00:00
func ( opts MdbxOpts ) WriteMap ( ) MdbxOpts {
opts . flags |= mdbx . WriteMap
return opts
}
2021-07-28 03:36:06 +00:00
func ( opts MdbxOpts ) WithTablessCfg ( f TableCfgFunc ) MdbxOpts {
opts . bucketsCfg = f
return opts
}
func ( opts MdbxOpts ) Open ( ) ( kv . RwDB , error ) {
var err error
if opts . inMem {
opts . path = testKVPath ( )
}
env , err := mdbx . NewEnv ( )
if err != nil {
return nil , err
}
if opts . verbosity != - 1 {
err = env . SetDebug ( mdbx . LogLvl ( opts . verbosity ) , mdbx . DbgDoNotChange , mdbx . LoggerDoNotChange ) // temporary disable error, because it works if call it 1 time, but returns error if call it twice in same process (what often happening in tests)
if err != nil {
return nil , fmt . Errorf ( "db verbosity set: %w" , err )
}
}
if err = env . SetOption ( mdbx . OptMaxDB , 100 ) ; err != nil {
return nil , err
}
if err = env . SetOption ( mdbx . OptMaxReaders , kv . ReadersLimit ) ; err != nil {
return nil , err
}
if opts . mapSize == 0 {
if opts . inMem {
opts . mapSize = 64 * datasize . MB
} else {
2021-11-30 07:06:57 +00:00
opts . mapSize = 3 * datasize . TB
2021-07-28 03:36:06 +00:00
}
}
if opts . flags & mdbx . Accede == 0 {
if opts . inMem {
if err = env . SetGeometry ( - 1 , - 1 , int ( opts . mapSize ) , int ( 2 * datasize . MB ) , 0 , 4 * 1024 ) ; err != nil {
return nil , err
}
} else {
2022-01-22 05:13:35 +00:00
if err = env . SetGeometry ( - 1 , - 1 , int ( opts . mapSize ) , int ( 2 * datasize . GB ) , - 1 , int ( opts . pageSize ) ) ; err != nil {
2021-07-28 03:36:06 +00:00
return nil , err
}
}
2021-09-27 02:49:17 +00:00
2022-01-11 08:12:20 +00:00
const MAX_AUGMENT_LIMIT = 0x7fffFFFF
if err = env . SetOption ( mdbx . OptRpAugmentLimit , MAX_AUGMENT_LIMIT ) ; err != nil {
2021-07-28 03:36:06 +00:00
return nil , err
}
if err = os . MkdirAll ( opts . path , 0744 ) ; err != nil {
return nil , fmt . Errorf ( "could not create dir: %s, %w" , opts . path , err )
}
}
defaultDirtyPagesLimit , err := env . GetOption ( mdbx . OptTxnDpLimit )
if err != nil {
return nil , err
}
if opts . flags & mdbx . Accede == 0 && opts . flags & mdbx . Readonly == 0 {
// 1/8 is good for transactions with a lot of modifications - to reduce invalidation size.
// But Erigon app now using Batch and etl.Collectors to avoid writing to DB frequently changing data.
// It means most of our writes are: APPEND or "single UPSERT per key during transaction"
//if err = env.SetOption(mdbx.OptSpillMinDenominator, 8); err != nil {
// return nil, err
//}
if err = env . SetOption ( mdbx . OptTxnDpInitial , 16 * 1024 ) ; err != nil {
return nil , err
}
if err = env . SetOption ( mdbx . OptDpReverseLimit , 16 * 1024 ) ; err != nil {
return nil , err
}
if err = env . SetOption ( mdbx . OptTxnDpLimit , defaultDirtyPagesLimit * 2 ) ; err != nil { // default is RAM/42
return nil , err
}
// must be in the range from 12.5% (almost empty) to 50% (half empty)
// which corresponds to the range from 8192 and to 32768 in units respectively
2022-02-03 09:28:43 +00:00
if err = env . SetOption ( mdbx . OptMergeThreshold16dot16Percent , 32768 ) ; err != nil {
return nil , err
}
2021-07-28 03:36:06 +00:00
}
dirtyPagesLimit , err := env . GetOption ( mdbx . OptTxnDpLimit )
if err != nil {
return nil , err
}
2021-10-05 08:30:06 +00:00
err = env . Open ( opts . path , opts . flags , 0664 )
if err != nil {
2022-03-16 11:59:11 +00:00
if err != nil {
return nil , fmt . Errorf ( "%w, label: %s, trace: %s" , err , opts . label . String ( ) , stack2 . Trace ( ) . String ( ) )
}
2021-10-05 08:30:06 +00:00
}
2022-03-10 03:25:11 +00:00
if opts . syncPeriod != 0 {
2022-03-10 12:04:08 +00:00
if err = env . SetSyncPeriod ( opts . syncPeriod ) ; err != nil {
2022-03-10 03:25:11 +00:00
env . Close ( )
return nil , err
}
}
2022-03-07 12:21:59 +00:00
if opts . roTxsLimiter == nil {
2022-04-21 03:43:46 +00:00
opts . roTxsLimiter = make ( chan struct { } , runtime . GOMAXPROCS ( - 1 ) )
2022-03-07 12:21:59 +00:00
}
2021-07-28 03:36:06 +00:00
db := & MdbxKV {
2022-03-07 12:21:59 +00:00
opts : opts ,
env : env ,
log : opts . log ,
wg : & sync . WaitGroup { } ,
buckets : kv . TableCfg { } ,
txSize : dirtyPagesLimit * opts . pageSize ,
roTxsLimiter : opts . roTxsLimiter ,
2021-07-28 03:36:06 +00:00
}
2022-03-07 12:21:59 +00:00
2021-07-28 03:36:06 +00:00
customBuckets := opts . bucketsCfg ( kv . ChaindataTablesCfg )
for name , cfg := range customBuckets { // copy map to avoid changing global variable
db . buckets [ name ] = cfg
}
buckets := bucketSlice ( db . buckets )
2021-10-05 08:30:06 +00:00
if err := db . openDBIs ( buckets ) ; err != nil {
return nil , err
2021-07-28 03:36:06 +00:00
}
// Configure buckets and open deprecated buckets
if err := env . View ( func ( tx * mdbx . Txn ) error {
for _ , name := range buckets {
// Open deprecated buckets if they exist, don't create
if ! db . buckets [ name ] . IsDeprecated {
continue
}
cnfCopy := db . buckets [ name ]
dbi , createErr := tx . OpenDBI ( name , mdbx . DBAccede , nil , nil )
if createErr != nil {
if mdbx . IsNotFound ( createErr ) {
cnfCopy . DBI = NonExistingDBI
db . buckets [ name ] = cnfCopy
continue // if deprecated bucket couldn't be open - then it's deleted and it's fine
} else {
return fmt . Errorf ( "bucket: %s, %w" , name , createErr )
}
}
cnfCopy . DBI = kv . DBI ( dbi )
db . buckets [ name ] = cnfCopy
}
return nil
} ) ; err != nil {
return nil , err
}
if ! opts . inMem {
if staleReaders , err := db . env . ReaderCheck ( ) ; err != nil {
2021-07-29 10:33:03 +00:00
db . log . Error ( "failed ReaderCheck" , "err" , err )
2021-07-28 03:36:06 +00:00
} else if staleReaders > 0 {
2021-07-28 04:50:45 +00:00
db . log . Info ( "[db] cleared reader slots from dead processes" , "amount" , staleReaders )
2021-07-28 03:36:06 +00:00
}
2021-09-07 02:52:17 +00:00
2021-07-28 03:36:06 +00:00
}
return db , nil
}
func ( opts MdbxOpts ) MustOpen ( ) kv . RwDB {
db , err := opts . Open ( )
if err != nil {
panic ( fmt . Errorf ( "fail to open mdbx: %w" , err ) )
}
return db
}
type MdbxKV struct {
2022-03-07 12:21:59 +00:00
env * mdbx . Env
log log . Logger
wg * sync . WaitGroup
buckets kv . TableCfg
opts MdbxOpts
txSize uint64
roTxsLimiter chan struct { } // does limit amount of concurrent Ro transactions - in most casess runtime.NumCPU() is good value for this channel capacity - this channel can be shared with other components (like Decompressor)
2021-07-28 03:36:06 +00:00
}
2021-10-05 08:30:06 +00:00
// openDBIs - first trying to open existing DBI's in RO transaction
// otherwise re-try by RW transaction
// it allow open DB from another process - even if main process holding long RW transaction
func ( db * MdbxKV ) openDBIs ( buckets [ ] string ) error {
2021-10-22 03:13:11 +00:00
if db . opts . flags & mdbx . Readonly != 0 {
if err := db . View ( context . Background ( ) , func ( tx kv . Tx ) error {
for _ , name := range buckets {
if db . buckets [ name ] . IsDeprecated {
continue
}
if err := tx . ( kv . BucketMigrator ) . CreateBucket ( name ) ; err != nil {
return err
}
2021-10-05 08:30:06 +00:00
}
2021-10-22 03:13:11 +00:00
return tx . Commit ( ) // when open db as read-only, commit of this RO transaction is required
} ) ; err != nil {
2021-10-05 08:30:06 +00:00
return err
}
2021-10-22 03:13:11 +00:00
} else {
2021-10-05 08:30:06 +00:00
if err := db . Update ( context . Background ( ) , func ( tx kv . RwTx ) error {
for _ , name := range buckets {
if db . buckets [ name ] . IsDeprecated {
continue
}
if err := tx . ( kv . BucketMigrator ) . CreateBucket ( name ) ; err != nil {
return err
}
}
return nil
} ) ; err != nil {
return err
}
}
return nil
}
2021-07-28 03:36:06 +00:00
// Close closes db
// All transactions must be closed before closing the database.
func ( db * MdbxKV ) Close ( ) {
if db . env == nil {
return
}
db . wg . Wait ( )
db . env . Close ( )
db . env = nil
if db . opts . inMem {
if err := os . RemoveAll ( db . opts . path ) ; err != nil {
2021-07-29 10:33:03 +00:00
db . log . Warn ( "failed to remove in-mem db file" , "err" , err )
2021-07-28 03:36:06 +00:00
}
}
}
2021-09-02 08:41:29 +00:00
func ( db * MdbxKV ) BeginRo ( ctx context . Context ) ( txn kv . Tx , err error ) {
select {
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
2022-03-07 12:21:59 +00:00
case db . roTxsLimiter <- struct { } { } :
2021-09-02 08:41:29 +00:00
}
2021-09-02 09:33:23 +00:00
if db . env == nil {
return nil , fmt . Errorf ( "db closed" )
}
2021-09-02 08:41:29 +00:00
2021-07-28 03:36:06 +00:00
defer func ( ) {
if err == nil {
db . wg . Add ( 1 )
}
} ( )
tx , err := db . env . BeginTxn ( nil , mdbx . Readonly )
if err != nil {
2021-10-22 02:12:39 +00:00
return nil , fmt . Errorf ( "%w, label: %s, trace: %s" , err , db . opts . label . String ( ) , stack2 . Trace ( ) . String ( ) )
2021-07-28 03:36:06 +00:00
}
tx . RawRead = true
return & MdbxTx {
db : db ,
tx : tx ,
readOnly : true ,
} , nil
}
func ( db * MdbxKV ) BeginRw ( _ context . Context ) ( txn kv . RwTx , err error ) {
if db . env == nil {
return nil , fmt . Errorf ( "db closed" )
}
runtime . LockOSThread ( )
defer func ( ) {
if err == nil {
db . wg . Add ( 1 )
}
} ( )
2021-12-27 08:11:24 +00:00
2021-07-28 03:36:06 +00:00
tx , err := db . env . BeginTxn ( nil , 0 )
if err != nil {
runtime . UnlockOSThread ( ) // unlock only in case of error. normal flow is "defer .Rollback()"
2021-10-22 02:12:39 +00:00
return nil , fmt . Errorf ( "%w, lable: %s, trace: %s" , err , db . opts . label . String ( ) , stack2 . Trace ( ) . String ( ) )
2021-07-28 03:36:06 +00:00
}
tx . RawRead = true
return & MdbxTx {
db : db ,
tx : tx ,
} , nil
}
type MdbxTx struct {
tx * mdbx . Txn
db * MdbxKV
cursors map [ uint64 ] * mdbx . Cursor
statelessCursors map [ string ] kv . Cursor
readOnly bool
cursorID uint64
}
type MdbxCursor struct {
tx * MdbxTx
c * mdbx . Cursor
bucketName string
bucketCfg kv . TableCfgItem
dbi mdbx . DBI
id uint64
}
func ( db * MdbxKV ) Env ( ) * mdbx . Env {
return db . env
}
func ( db * MdbxKV ) AllDBI ( ) map [ string ] kv . DBI {
res := map [ string ] kv . DBI { }
for name , cfg := range db . buckets {
res [ name ] = cfg . DBI
}
return res
}
func ( db * MdbxKV ) AllBuckets ( ) kv . TableCfg {
return db . buckets
}
func ( tx * MdbxTx ) ForEach ( bucket string , fromPrefix [ ] byte , walker func ( k , v [ ] byte ) error ) error {
c , err := tx . Cursor ( bucket )
if err != nil {
return err
}
defer c . Close ( )
for k , v , err := c . Seek ( fromPrefix ) ; k != nil ; k , v , err = c . Next ( ) {
if err != nil {
return err
}
if err := walker ( k , v ) ; err != nil {
return err
}
}
return nil
}
func ( tx * MdbxTx ) ForPrefix ( bucket string , prefix [ ] byte , walker func ( k , v [ ] byte ) error ) error {
c , err := tx . Cursor ( bucket )
if err != nil {
return err
}
defer c . Close ( )
for k , v , err := c . Seek ( prefix ) ; k != nil ; k , v , err = c . Next ( ) {
if err != nil {
return err
}
if ! bytes . HasPrefix ( k , prefix ) {
break
}
if err := walker ( k , v ) ; err != nil {
return err
}
}
return nil
}
func ( tx * MdbxTx ) ForAmount ( bucket string , fromPrefix [ ] byte , amount uint32 , walker func ( k , v [ ] byte ) error ) error {
2022-03-04 02:13:15 +00:00
if amount == 0 {
return nil
}
2021-07-28 03:36:06 +00:00
c , err := tx . Cursor ( bucket )
if err != nil {
return err
}
defer c . Close ( )
for k , v , err := c . Seek ( fromPrefix ) ; k != nil && amount > 0 ; k , v , err = c . Next ( ) {
if err != nil {
return err
}
if err := walker ( k , v ) ; err != nil {
return err
}
amount --
}
return nil
}
2021-09-17 02:56:04 +00:00
func ( tx * MdbxTx ) ViewID ( ) uint64 { return tx . tx . ID ( ) }
2021-09-15 06:11:34 +00:00
2021-07-28 03:36:06 +00:00
func ( tx * MdbxTx ) CollectMetrics ( ) {
2021-07-29 10:33:03 +00:00
if tx . db . opts . label != kv . ChainDB {
return
}
info , err := tx . db . env . Info ( tx . tx )
if err != nil {
return
}
if info . SinceReaderCheck . Hours ( ) > 1 {
if staleReaders , err := tx . db . env . ReaderCheck ( ) ; err != nil {
tx . db . log . Error ( "failed ReaderCheck" , "err" , err )
} else if staleReaders > 0 {
tx . db . log . Info ( "[db] cleared reader slots from dead processes" , "amount" , staleReaders )
2021-07-28 03:36:06 +00:00
}
2021-07-29 10:33:03 +00:00
}
2021-07-28 03:36:06 +00:00
2021-07-29 10:33:03 +00:00
kv . DbSize . Set ( info . Geo . Current )
kv . DbPgopsNewly . Set ( info . PageOps . Newly )
kv . DbPgopsCow . Set ( info . PageOps . Cow )
kv . DbPgopsClone . Set ( info . PageOps . Clone )
kv . DbPgopsSplit . Set ( info . PageOps . Split )
kv . DbPgopsMerge . Set ( info . PageOps . Merge )
kv . DbPgopsSpill . Set ( info . PageOps . Spill )
kv . DbPgopsUnspill . Set ( info . PageOps . Unspill )
kv . DbPgopsWops . Set ( info . PageOps . Wops )
txInfo , err := tx . tx . Info ( true )
if err != nil {
return
}
kv . TxDirty . Set ( txInfo . SpaceDirty )
kv . TxLimit . Set ( tx . db . txSize )
kv . TxSpill . Set ( txInfo . Spill )
kv . TxUnspill . Set ( txInfo . Unspill )
gc , err := tx . BucketStat ( "gc" )
if err != nil {
return
}
kv . GcLeafMetric . Set ( gc . LeafPages )
kv . GcOverflowMetric . Set ( gc . OverflowPages )
2022-01-22 05:13:35 +00:00
kv . GcPagesMetric . Set ( ( gc . LeafPages + gc . OverflowPages ) * tx . db . opts . pageSize / 8 )
2021-07-29 10:33:03 +00:00
{
st , err := tx . BucketStat ( kv . PlainState )
2021-07-28 03:36:06 +00:00
if err != nil {
return
}
2021-07-29 10:33:03 +00:00
kv . TableStateLeaf . Set ( st . LeafPages )
kv . TableStateBranch . Set ( st . BranchPages )
kv . TableStateEntries . Set ( st . Entries )
2022-01-22 05:13:35 +00:00
kv . TableStateSize . Set ( ( st . LeafPages + st . BranchPages + st . OverflowPages ) * tx . db . opts . pageSize )
2021-07-29 10:33:03 +00:00
}
{
st , err := tx . BucketStat ( kv . StorageChangeSet )
if err != nil {
2021-07-28 03:36:06 +00:00
return
}
2021-07-29 10:33:03 +00:00
kv . TableScsLeaf . Set ( st . LeafPages )
kv . TableScsBranch . Set ( st . BranchPages )
kv . TableScsEntries . Set ( st . Entries )
2022-01-22 05:13:35 +00:00
kv . TableScsSize . Set ( ( st . LeafPages + st . BranchPages + st . OverflowPages ) * tx . db . opts . pageSize )
2021-07-29 10:33:03 +00:00
}
{
st , err := tx . BucketStat ( kv . EthTx )
2021-07-28 03:36:06 +00:00
if err != nil {
return
}
2021-07-29 10:33:03 +00:00
kv . TableTxLeaf . Set ( st . LeafPages )
kv . TableTxBranch . Set ( st . BranchPages )
kv . TableTxOverflow . Set ( st . OverflowPages )
kv . TableTxEntries . Set ( st . Entries )
2022-01-22 05:13:35 +00:00
kv . TableTxSize . Set ( ( st . LeafPages + st . BranchPages + st . OverflowPages ) * tx . db . opts . pageSize )
2021-07-29 10:33:03 +00:00
}
{
st , err := tx . BucketStat ( kv . Log )
2021-07-28 03:36:06 +00:00
if err != nil {
return
}
2021-07-29 10:33:03 +00:00
kv . TableLogLeaf . Set ( st . LeafPages )
kv . TableLogBranch . Set ( st . BranchPages )
kv . TableLogOverflow . Set ( st . OverflowPages )
kv . TableLogEntries . Set ( st . Entries )
2022-01-22 05:13:35 +00:00
kv . TableLogSize . Set ( ( st . LeafPages + st . BranchPages + st . OverflowPages ) * tx . db . opts . pageSize )
2021-07-29 10:33:03 +00:00
}
2021-07-28 03:36:06 +00:00
}
2021-08-22 10:39:47 +00:00
// ListBuckets - all buckets stored as keys of un-named bucket
2021-07-28 03:36:06 +00:00
func ( tx * MdbxTx ) ListBuckets ( ) ( [ ] string , error ) {
return tx . tx . ListDBI ( )
}
func ( db * MdbxKV ) View ( ctx context . Context , f func ( tx kv . Tx ) error ) ( err error ) {
if db . env == nil {
return fmt . Errorf ( "db closed" )
}
db . wg . Add ( 1 )
defer db . wg . Done ( )
// can't use db.evn.View method - because it calls commit for read transactions - it conflicts with write transactions.
tx , err := db . BeginRo ( ctx )
if err != nil {
return err
}
defer tx . Rollback ( )
return f ( tx )
}
func ( db * MdbxKV ) Update ( ctx context . Context , f func ( tx kv . RwTx ) error ) ( err error ) {
if db . env == nil {
return fmt . Errorf ( "db closed" )
}
db . wg . Add ( 1 )
defer db . wg . Done ( )
tx , err := db . BeginRw ( ctx )
if err != nil {
return err
}
defer tx . Rollback ( )
err = f ( tx )
if err != nil {
return err
}
err = tx . Commit ( )
if err != nil {
return err
}
return nil
}
func ( tx * MdbxTx ) CreateBucket ( name string ) error {
cnfCopy := tx . db . buckets [ name ]
dbi , err := tx . tx . OpenDBI ( name , mdbx . DBAccede , nil , nil )
if err != nil && ! mdbx . IsNotFound ( err ) {
return fmt . Errorf ( "create bucket: %s, %w" , name , err )
}
if err == nil {
cnfCopy . DBI = kv . DBI ( dbi )
var flags uint
flags , err = tx . tx . Flags ( dbi )
if err != nil {
return err
}
cnfCopy . Flags = kv . TableFlags ( flags )
tx . db . buckets [ name ] = cnfCopy
return nil
}
// if bucket doesn't exists - create it
var flags = tx . db . buckets [ name ] . Flags
var nativeFlags uint
if tx . db . opts . flags & mdbx . Readonly == 0 {
nativeFlags |= mdbx . Create
}
if flags & kv . DupSort != 0 {
nativeFlags |= mdbx . DupSort
flags ^ = kv . DupSort
}
if flags != 0 {
return fmt . Errorf ( "some not supported flag provided for bucket" )
}
dbi , err = tx . tx . OpenDBI ( name , nativeFlags , nil , nil )
if err != nil {
return fmt . Errorf ( "create bucket: %s, %w" , name , err )
}
cnfCopy . DBI = kv . DBI ( dbi )
tx . db . buckets [ name ] = cnfCopy
return nil
}
func ( tx * MdbxTx ) dropEvenIfBucketIsNotDeprecated ( name string ) error {
dbi := tx . db . buckets [ name ] . DBI
// if bucket was not open on db start, then it's may be deprecated
// try to open it now without `Create` flag, and if fail then nothing to drop
if dbi == NonExistingDBI {
nativeDBI , err := tx . tx . OpenDBI ( name , 0 , nil , nil )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil // DBI doesn't exists means no drop needed
}
return fmt . Errorf ( "bucket: %s, %w" , name , err )
}
dbi = kv . DBI ( nativeDBI )
}
if err := tx . tx . Drop ( mdbx . DBI ( dbi ) , true ) ; err != nil {
return err
}
cnfCopy := tx . db . buckets [ name ]
cnfCopy . DBI = NonExistingDBI
tx . db . buckets [ name ] = cnfCopy
return nil
}
func ( tx * MdbxTx ) ClearBucket ( bucket string ) error {
dbi := tx . db . buckets [ bucket ] . DBI
if dbi == NonExistingDBI {
return nil
}
return tx . tx . Drop ( mdbx . DBI ( dbi ) , false )
}
func ( tx * MdbxTx ) DropBucket ( bucket string ) error {
if cfg , ok := tx . db . buckets [ bucket ] ; ! ( ok && cfg . IsDeprecated ) {
return fmt . Errorf ( "%w, bucket: %s" , kv . ErrAttemptToDeleteNonDeprecatedBucket , bucket )
}
return tx . dropEvenIfBucketIsNotDeprecated ( bucket )
}
func ( tx * MdbxTx ) ExistsBucket ( bucket string ) ( bool , error ) {
if cfg , ok := tx . db . buckets [ bucket ] ; ok {
return cfg . DBI != NonExistingDBI , nil
}
return false , nil
}
func ( tx * MdbxTx ) Commit ( ) error {
if tx . db . env == nil {
return fmt . Errorf ( "db closed" )
}
if tx . tx == nil {
return nil
}
defer func ( ) {
tx . tx = nil
tx . db . wg . Done ( )
2022-03-07 12:21:59 +00:00
if tx . readOnly {
select {
case <- tx . db . roTxsLimiter :
default :
}
} else {
2021-07-28 03:36:06 +00:00
runtime . UnlockOSThread ( )
}
} ( )
tx . closeCursors ( )
2021-07-29 10:33:03 +00:00
//slowTx := 10 * time.Second
//if debug.SlowCommit() > 0 {
// slowTx = debug.SlowCommit()
//}
//
//if debug.BigRoTxKb() > 0 || debug.BigRwTxKb() > 0 {
// tx.PrintDebugInfo()
//}
tx . CollectMetrics ( )
latency , err := tx . tx . Commit ( )
2021-07-28 03:36:06 +00:00
if err != nil {
return err
}
2021-07-29 10:33:03 +00:00
if tx . db . opts . label == kv . ChainDB {
kv . DbCommitPreparation . Update ( latency . Preparation . Seconds ( ) )
kv . DbCommitGc . Update ( latency . GC . Seconds ( ) )
kv . DbCommitAudit . Update ( latency . Audit . Seconds ( ) )
kv . DbCommitWrite . Update ( latency . Write . Seconds ( ) )
kv . DbCommitSync . Update ( latency . Sync . Seconds ( ) )
kv . DbCommitEnding . Update ( latency . Ending . Seconds ( ) )
kv . DbCommitTotal . Update ( latency . Whole . Seconds ( ) )
}
//if latency.Whole > slowTx {
// log.Info("Commit",
// "preparation", latency.Preparation,
// "gc", latency.GC,
// "audit", latency.Audit,
// "write", latency.Write,
// "fsync", latency.Sync,
// "ending", latency.Ending,
// "whole", latency.Whole,
// )
//}
2021-07-28 03:36:06 +00:00
return nil
}
func ( tx * MdbxTx ) Rollback ( ) {
if tx . db . env == nil {
return
}
if tx . tx == nil {
return
}
defer func ( ) {
tx . tx = nil
tx . db . wg . Done ( )
2022-03-07 12:21:59 +00:00
if tx . readOnly {
select {
case <- tx . db . roTxsLimiter :
default :
}
} else {
2021-07-28 03:36:06 +00:00
runtime . UnlockOSThread ( )
}
} ( )
tx . closeCursors ( )
//tx.printDebugInfo()
tx . tx . Abort ( )
}
func ( tx * MdbxTx ) SpaceDirty ( ) ( uint64 , uint64 , error ) {
txInfo , err := tx . tx . Info ( true )
if err != nil {
return 0 , 0 , err
}
return txInfo . SpaceDirty , tx . db . txSize , nil
}
func ( tx * MdbxTx ) PrintDebugInfo ( ) {
/ *
txInfo , err := tx . tx . Info ( true )
if err != nil {
panic ( err )
}
txSize := uint ( txInfo . SpaceDirty / 1024 )
doPrint := debug . BigRoTxKb ( ) == 0 && debug . BigRwTxKb ( ) == 0 ||
tx . readOnly && debug . BigRoTxKb ( ) > 0 && txSize > debug . BigRoTxKb ( ) ||
( ! tx . readOnly && debug . BigRwTxKb ( ) > 0 && txSize > debug . BigRwTxKb ( ) )
if doPrint {
tx . db . log . Info ( "Tx info" ,
"id" , txInfo . Id ,
"read_lag" , txInfo . ReadLag ,
"ro" , tx . readOnly ,
//"space_retired_mb", txInfo.SpaceRetired/1024/1024,
"space_dirty_mb" , txInfo . SpaceDirty / 1024 / 1024 ,
2021-07-29 10:33:03 +00:00
//"callers", debug.Callers(7),
2021-07-28 03:36:06 +00:00
)
}
* /
}
func ( tx * MdbxTx ) closeCursors ( ) {
for _ , c := range tx . cursors {
if c != nil {
c . Close ( )
}
}
tx . cursors = nil
tx . statelessCursors = nil
}
func ( tx * MdbxTx ) statelessCursor ( bucket string ) ( kv . RwCursor , error ) {
if tx . statelessCursors == nil {
tx . statelessCursors = make ( map [ string ] kv . Cursor )
}
c , ok := tx . statelessCursors [ bucket ]
if ! ok {
var err error
c , err = tx . Cursor ( bucket )
if err != nil {
return nil , err
}
tx . statelessCursors [ bucket ] = c
}
return c . ( kv . RwCursor ) , nil
}
func ( tx * MdbxTx ) Put ( bucket string , k , v [ ] byte ) error {
c , err := tx . statelessCursor ( bucket )
if err != nil {
return err
}
return c . Put ( k , v )
}
func ( tx * MdbxTx ) Delete ( bucket string , k , v [ ] byte ) error {
c , err := tx . statelessCursor ( bucket )
if err != nil {
return err
}
return c . Delete ( k , v )
}
func ( tx * MdbxTx ) GetOne ( bucket string , k [ ] byte ) ( [ ] byte , error ) {
c , err := tx . statelessCursor ( bucket )
if err != nil {
return nil , err
}
_ , v , err := c . SeekExact ( k )
return v , err
}
func ( tx * MdbxTx ) Has ( bucket string , key [ ] byte ) ( bool , error ) {
c , err := tx . statelessCursor ( bucket )
if err != nil {
return false , err
}
k , _ , err := c . Seek ( key )
if err != nil {
return false , err
}
return bytes . Equal ( key , k ) , nil
}
func ( tx * MdbxTx ) Append ( bucket string , k , v [ ] byte ) error {
c , err := tx . statelessCursor ( bucket )
if err != nil {
return err
}
return c . Append ( k , v )
}
func ( tx * MdbxTx ) AppendDup ( bucket string , k , v [ ] byte ) error {
c , err := tx . statelessCursor ( bucket )
if err != nil {
return err
}
return c . ( * MdbxDupSortCursor ) . AppendDup ( k , v )
}
func ( tx * MdbxTx ) IncrementSequence ( bucket string , amount uint64 ) ( uint64 , error ) {
c , err := tx . statelessCursor ( kv . Sequence )
if err != nil {
return 0 , err
}
_ , v , err := c . SeekExact ( [ ] byte ( bucket ) )
if err != nil {
return 0 , err
}
var currentV uint64 = 0
if len ( v ) > 0 {
currentV = binary . BigEndian . Uint64 ( v )
}
newVBytes := make ( [ ] byte , 8 )
binary . BigEndian . PutUint64 ( newVBytes , currentV + amount )
err = c . Put ( [ ] byte ( bucket ) , newVBytes )
if err != nil {
return 0 , err
}
return currentV , nil
}
func ( tx * MdbxTx ) ReadSequence ( bucket string ) ( uint64 , error ) {
c , err := tx . statelessCursor ( kv . Sequence )
if err != nil {
return 0 , err
}
_ , v , err := c . SeekExact ( [ ] byte ( bucket ) )
if err != nil && ! mdbx . IsNotFound ( err ) {
return 0 , err
}
2022-03-19 04:38:37 +00:00
var currentV uint64
2021-07-28 03:36:06 +00:00
if len ( v ) > 0 {
currentV = binary . BigEndian . Uint64 ( v )
}
return currentV , nil
}
func ( tx * MdbxTx ) BucketSize ( name string ) ( uint64 , error ) {
st , err := tx . BucketStat ( name )
if err != nil {
return 0 , err
}
2022-01-22 05:13:35 +00:00
return ( st . LeafPages + st . BranchPages + st . OverflowPages ) * tx . db . opts . pageSize , nil
2021-07-28 03:36:06 +00:00
}
func ( tx * MdbxTx ) BucketStat ( name string ) ( * mdbx . Stat , error ) {
if name == "freelist" || name == "gc" || name == "free_list" {
return tx . tx . StatDBI ( mdbx . DBI ( 0 ) )
}
if name == "root" {
return tx . tx . StatDBI ( mdbx . DBI ( 1 ) )
}
st , err := tx . tx . StatDBI ( mdbx . DBI ( tx . db . buckets [ name ] . DBI ) )
if err != nil {
return nil , fmt . Errorf ( "bucket: %s, %w" , name , err )
}
return st , nil
}
func ( tx * MdbxTx ) RwCursor ( bucket string ) ( kv . RwCursor , error ) {
b := tx . db . buckets [ bucket ]
if b . AutoDupSortKeysConversion {
return tx . stdCursor ( bucket )
}
if b . Flags & kv . DupSort != 0 {
return tx . RwCursorDupSort ( bucket )
}
return tx . stdCursor ( bucket )
}
func ( tx * MdbxTx ) Cursor ( bucket string ) ( kv . Cursor , error ) {
return tx . RwCursor ( bucket )
}
func ( tx * MdbxTx ) stdCursor ( bucket string ) ( kv . RwCursor , error ) {
b := tx . db . buckets [ bucket ]
c := & MdbxCursor { bucketName : bucket , tx : tx , bucketCfg : b , dbi : mdbx . DBI ( tx . db . buckets [ bucket ] . DBI ) , id : tx . cursorID }
tx . cursorID ++
var err error
c . c , err = tx . tx . OpenCursor ( c . dbi )
if err != nil {
return nil , fmt . Errorf ( "table: %s, %w" , c . bucketName , err )
}
// add to auto-cleanup on end of transactions
if tx . cursors == nil {
tx . cursors = map [ uint64 ] * mdbx . Cursor { }
}
tx . cursors [ c . id ] = c . c
return c , nil
}
func ( tx * MdbxTx ) RwCursorDupSort ( bucket string ) ( kv . RwCursorDupSort , error ) {
basicCursor , err := tx . stdCursor ( bucket )
if err != nil {
return nil , err
}
return & MdbxDupSortCursor { MdbxCursor : basicCursor . ( * MdbxCursor ) } , nil
}
func ( tx * MdbxTx ) CursorDupSort ( bucket string ) ( kv . CursorDupSort , error ) {
return tx . RwCursorDupSort ( bucket )
}
// methods here help to see better pprof picture
func ( c * MdbxCursor ) set ( k [ ] byte ) ( [ ] byte , [ ] byte , error ) { return c . c . Get ( k , nil , mdbx . Set ) }
func ( c * MdbxCursor ) getCurrent ( ) ( [ ] byte , [ ] byte , error ) { return c . c . Get ( nil , nil , mdbx . GetCurrent ) }
func ( c * MdbxCursor ) first ( ) ( [ ] byte , [ ] byte , error ) { return c . c . Get ( nil , nil , mdbx . First ) }
func ( c * MdbxCursor ) next ( ) ( [ ] byte , [ ] byte , error ) { return c . c . Get ( nil , nil , mdbx . Next ) }
func ( c * MdbxCursor ) nextDup ( ) ( [ ] byte , [ ] byte , error ) { return c . c . Get ( nil , nil , mdbx . NextDup ) }
func ( c * MdbxCursor ) nextNoDup ( ) ( [ ] byte , [ ] byte , error ) { return c . c . Get ( nil , nil , mdbx . NextNoDup ) }
func ( c * MdbxCursor ) prev ( ) ( [ ] byte , [ ] byte , error ) { return c . c . Get ( nil , nil , mdbx . Prev ) }
func ( c * MdbxCursor ) prevDup ( ) ( [ ] byte , [ ] byte , error ) { return c . c . Get ( nil , nil , mdbx . PrevDup ) }
func ( c * MdbxCursor ) prevNoDup ( ) ( [ ] byte , [ ] byte , error ) { return c . c . Get ( nil , nil , mdbx . PrevNoDup ) }
func ( c * MdbxCursor ) last ( ) ( [ ] byte , [ ] byte , error ) { return c . c . Get ( nil , nil , mdbx . Last ) }
func ( c * MdbxCursor ) delCurrent ( ) error { return c . c . Del ( mdbx . Current ) }
func ( c * MdbxCursor ) delNoDupData ( ) error { return c . c . Del ( mdbx . NoDupData ) }
func ( c * MdbxCursor ) put ( k , v [ ] byte ) error { return c . c . Put ( k , v , 0 ) }
func ( c * MdbxCursor ) putCurrent ( k , v [ ] byte ) error { return c . c . Put ( k , v , mdbx . Current ) }
func ( c * MdbxCursor ) putNoOverwrite ( k , v [ ] byte ) error { return c . c . Put ( k , v , mdbx . NoOverwrite ) }
func ( c * MdbxCursor ) putNoDupData ( k , v [ ] byte ) error { return c . c . Put ( k , v , mdbx . NoDupData ) }
func ( c * MdbxCursor ) append ( k , v [ ] byte ) error { return c . c . Put ( k , v , mdbx . Append ) }
func ( c * MdbxCursor ) appendDup ( k , v [ ] byte ) error { return c . c . Put ( k , v , mdbx . AppendDup ) }
func ( c * MdbxCursor ) getBoth ( k , v [ ] byte ) ( [ ] byte , error ) {
_ , v , err := c . c . Get ( k , v , mdbx . GetBoth )
return v , err
}
func ( c * MdbxCursor ) setRange ( k [ ] byte ) ( [ ] byte , [ ] byte , error ) {
return c . c . Get ( k , nil , mdbx . SetRange )
}
func ( c * MdbxCursor ) getBothRange ( k , v [ ] byte ) ( [ ] byte , error ) {
_ , v , err := c . c . Get ( k , v , mdbx . GetBothRange )
return v , err
}
func ( c * MdbxCursor ) firstDup ( ) ( [ ] byte , error ) {
_ , v , err := c . c . Get ( nil , nil , mdbx . FirstDup )
return v , err
}
func ( c * MdbxCursor ) lastDup ( ) ( [ ] byte , error ) {
_ , v , err := c . c . Get ( nil , nil , mdbx . LastDup )
return v , err
}
func ( c * MdbxCursor ) Count ( ) ( uint64 , error ) {
st , err := c . tx . tx . StatDBI ( c . dbi )
if err != nil {
return 0 , err
}
return st . Entries , nil
}
func ( c * MdbxCursor ) First ( ) ( [ ] byte , [ ] byte , error ) {
return c . Seek ( nil )
}
func ( c * MdbxCursor ) Last ( ) ( [ ] byte , [ ] byte , error ) {
k , v , err := c . last ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
err = fmt . Errorf ( "failed MdbxKV cursor.Last(): %w, bucket: %s" , err , c . bucketName )
return [ ] byte { } , nil , err
}
b := c . bucketCfg
if b . AutoDupSortKeysConversion && len ( k ) == b . DupToLen {
keyPart := b . DupFromLen - b . DupToLen
k = append ( k , v [ : keyPart ] ... )
v = v [ keyPart : ]
}
return k , v , nil
}
func ( c * MdbxCursor ) Seek ( seek [ ] byte ) ( k , v [ ] byte , err error ) {
if c . bucketCfg . AutoDupSortKeysConversion {
return c . seekDupSort ( seek )
}
if len ( seek ) == 0 {
k , v , err = c . first ( )
} else {
k , v , err = c . setRange ( seek )
}
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
err = fmt . Errorf ( "failed MdbxKV cursor.Seek(): %w, bucket: %s, key: %x" , err , c . bucketName , seek )
return [ ] byte { } , nil , err
}
return k , v , nil
}
func ( c * MdbxCursor ) seekDupSort ( seek [ ] byte ) ( k , v [ ] byte , err error ) {
b := c . bucketCfg
from , to := b . DupFromLen , b . DupToLen
if len ( seek ) == 0 {
k , v , err = c . first ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , err
}
if len ( k ) == to {
k2 := make ( [ ] byte , 0 , len ( k ) + from - to )
k2 = append ( append ( k2 , k ... ) , v [ : from - to ] ... )
v = v [ from - to : ]
k = k2
}
return k , v , nil
}
var seek1 , seek2 [ ] byte
if len ( seek ) > to {
seek1 , seek2 = seek [ : to ] , seek [ to : ]
} else {
seek1 = seek
}
k , v , err = c . setRange ( seek1 )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , err
}
if seek2 != nil && bytes . Equal ( seek1 , k ) {
v , err = c . getBothRange ( seek1 , seek2 )
if err != nil && mdbx . IsNotFound ( err ) {
k , v , err = c . next ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , err
}
} else if err != nil {
return [ ] byte { } , nil , err
}
}
if len ( k ) == to {
k2 := make ( [ ] byte , 0 , len ( k ) + from - to )
k2 = append ( append ( k2 , k ... ) , v [ : from - to ] ... )
v = v [ from - to : ]
k = k2
}
return k , v , nil
}
func ( c * MdbxCursor ) Next ( ) ( k , v [ ] byte , err error ) {
k , v , err = c . next ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , fmt . Errorf ( "failed MdbxKV cursor.Next(): %w" , err )
}
b := c . bucketCfg
if b . AutoDupSortKeysConversion && len ( k ) == b . DupToLen {
keyPart := b . DupFromLen - b . DupToLen
k = append ( k , v [ : keyPart ] ... )
v = v [ keyPart : ]
}
return k , v , nil
}
func ( c * MdbxCursor ) Prev ( ) ( k , v [ ] byte , err error ) {
k , v , err = c . prev ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , fmt . Errorf ( "failed MdbxKV cursor.Prev(): %w" , err )
}
b := c . bucketCfg
if b . AutoDupSortKeysConversion && len ( k ) == b . DupToLen {
keyPart := b . DupFromLen - b . DupToLen
k = append ( k , v [ : keyPart ] ... )
v = v [ keyPart : ]
}
return k , v , nil
}
// Current - return key/data at current cursor position
func ( c * MdbxCursor ) Current ( ) ( [ ] byte , [ ] byte , error ) {
k , v , err := c . getCurrent ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , err
}
b := c . bucketCfg
if b . AutoDupSortKeysConversion && len ( k ) == b . DupToLen {
keyPart := b . DupFromLen - b . DupToLen
k = append ( k , v [ : keyPart ] ... )
v = v [ keyPart : ]
}
return k , v , nil
}
func ( c * MdbxCursor ) Delete ( k , v [ ] byte ) error {
if c . bucketCfg . AutoDupSortKeysConversion {
return c . deleteDupSort ( k )
}
if c . bucketCfg . Flags & mdbx . DupSort != 0 {
_ , err := c . getBoth ( k , v )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil
}
return err
}
return c . delCurrent ( )
}
_ , _ , err := c . set ( k )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil
}
return err
}
return c . delCurrent ( )
}
// DeleteCurrent This function deletes the key/data pair to which the cursor refers.
// This does not invalidate the cursor, so operations such as MDB_NEXT
// can still be used on it.
// Both MDB_NEXT and MDB_GET_CURRENT will return the same record after
// this operation.
func ( c * MdbxCursor ) DeleteCurrent ( ) error {
return c . delCurrent ( )
}
func ( c * MdbxCursor ) deleteDupSort ( key [ ] byte ) error {
b := c . bucketCfg
from , to := b . DupFromLen , b . DupToLen
if len ( key ) != from && len ( key ) >= to {
return fmt . Errorf ( "delete from dupsort bucket: %s, can have keys of len==%d and len<%d. key: %x,%d" , c . bucketName , from , to , key , len ( key ) )
}
if len ( key ) == from {
v , err := c . getBothRange ( key [ : to ] , key [ to : ] )
if err != nil { // if key not found, or found another one - then nothing to delete
if mdbx . IsNotFound ( err ) {
return nil
}
return err
}
if ! bytes . Equal ( v [ : from - to ] , key [ to : ] ) {
return nil
}
return c . delCurrent ( )
}
_ , _ , err := c . set ( key )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil
}
return err
}
return c . delCurrent ( )
}
func ( c * MdbxCursor ) PutNoOverwrite ( key [ ] byte , value [ ] byte ) error {
if len ( key ) == 0 {
return fmt . Errorf ( "mdbx doesn't support empty keys. bucket: %s" , c . bucketName )
}
if c . bucketCfg . AutoDupSortKeysConversion {
panic ( "not implemented" )
}
return c . putNoOverwrite ( key , value )
}
func ( c * MdbxCursor ) Put ( key [ ] byte , value [ ] byte ) error {
if len ( key ) == 0 {
return fmt . Errorf ( "mdbx doesn't support empty keys. bucket: %s" , c . bucketName )
}
b := c . bucketCfg
if b . AutoDupSortKeysConversion {
if err := c . putDupSort ( key , value ) ; err != nil {
return err
}
return nil
}
if err := c . put ( key , value ) ; err != nil {
2021-09-27 12:45:10 +00:00
return fmt . Errorf ( "table: %s, err: %w" , c . bucketName , err )
2021-07-28 03:36:06 +00:00
}
return nil
}
func ( c * MdbxCursor ) putDupSort ( key [ ] byte , value [ ] byte ) error {
b := c . bucketCfg
from , to := b . DupFromLen , b . DupToLen
if len ( key ) != from && len ( key ) >= to {
return fmt . Errorf ( "put dupsort bucket: %s, can have keys of len==%d and len<%d. key: %x,%d" , c . bucketName , from , to , key , len ( key ) )
}
if len ( key ) != from {
err := c . putNoOverwrite ( key , value )
if err != nil {
if mdbx . IsKeyExists ( err ) {
return c . putCurrent ( key , value )
}
return fmt . Errorf ( "putNoOverwrite, bucket: %s, key: %x, val: %x, err: %w" , c . bucketName , key , value , err )
}
return nil
}
value = append ( key [ to : ] , value ... )
key = key [ : to ]
v , err := c . getBothRange ( key , value [ : from - to ] )
if err != nil { // if key not found, or found another one - then just insert
if mdbx . IsNotFound ( err ) {
return c . put ( key , value )
}
return err
}
if bytes . Equal ( v [ : from - to ] , value [ : from - to ] ) {
if len ( v ) == len ( value ) { // in DupSort case mdbx.Current works only with values of same length
return c . putCurrent ( key , value )
}
err = c . delCurrent ( )
if err != nil {
return err
}
}
return c . put ( key , value )
}
func ( c * MdbxCursor ) SeekExact ( key [ ] byte ) ( [ ] byte , [ ] byte , error ) {
b := c . bucketCfg
if b . AutoDupSortKeysConversion && len ( key ) == b . DupFromLen {
from , to := b . DupFromLen , b . DupToLen
v , err := c . getBothRange ( key [ : to ] , key [ to : ] )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , err
}
if ! bytes . Equal ( key [ to : ] , v [ : from - to ] ) {
return nil , nil , nil
}
return key [ : to ] , v [ from - to : ] , nil
}
k , v , err := c . set ( key )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , err
}
return k , v , nil
}
// Append - speedy feature of mdbx which is not part of KV interface.
// Cast your cursor to *MdbxCursor to use this method.
// Return error - if provided data will not sorted (or bucket have old records which mess with new in sorting manner).
func ( c * MdbxCursor ) Append ( k [ ] byte , v [ ] byte ) error {
if len ( k ) == 0 {
return fmt . Errorf ( "mdbx doesn't support empty keys. bucket: %s" , c . bucketName )
}
b := c . bucketCfg
if b . AutoDupSortKeysConversion {
from , to := b . DupFromLen , b . DupToLen
if len ( k ) != from && len ( k ) >= to {
return fmt . Errorf ( "append dupsort bucket: %s, can have keys of len==%d and len<%d. key: %x,%d" , c . bucketName , from , to , k , len ( k ) )
}
if len ( k ) == from {
v = append ( k [ to : ] , v ... )
k = k [ : to ]
}
}
if b . Flags & mdbx . DupSort != 0 {
if err := c . appendDup ( k , v ) ; err != nil {
return fmt . Errorf ( "bucket: %s, %w" , c . bucketName , err )
}
return nil
}
if err := c . append ( k , v ) ; err != nil {
return fmt . Errorf ( "bucket: %s, %w" , c . bucketName , err )
}
return nil
}
func ( c * MdbxCursor ) Close ( ) {
if c . c != nil {
c . c . Close ( )
delete ( c . tx . cursors , c . id )
c . c = nil
}
}
type MdbxDupSortCursor struct {
* MdbxCursor
}
func ( c * MdbxDupSortCursor ) Internal ( ) * mdbx . Cursor {
return c . c
}
// DeleteExact - does delete
func ( c * MdbxDupSortCursor ) DeleteExact ( k1 , k2 [ ] byte ) error {
_ , err := c . getBoth ( k1 , k2 )
if err != nil { // if key not found, or found another one - then nothing to delete
if mdbx . IsNotFound ( err ) {
return nil
}
return err
}
return c . delCurrent ( )
}
func ( c * MdbxDupSortCursor ) SeekBothExact ( key , value [ ] byte ) ( [ ] byte , [ ] byte , error ) {
v , err := c . getBoth ( key , value )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , fmt . Errorf ( "in SeekBothExact: %w" , err )
}
return key , v , nil
}
func ( c * MdbxDupSortCursor ) SeekBothRange ( key , value [ ] byte ) ( [ ] byte , error ) {
v , err := c . getBothRange ( key , value )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil
}
return nil , fmt . Errorf ( "in SeekBothRange: %w" , err )
}
return v , nil
}
func ( c * MdbxDupSortCursor ) FirstDup ( ) ( [ ] byte , error ) {
v , err := c . firstDup ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil
}
return nil , fmt . Errorf ( "in FirstDup: %w" , err )
}
return v , nil
}
// NextDup - iterate only over duplicates of current key
func ( c * MdbxDupSortCursor ) NextDup ( ) ( [ ] byte , [ ] byte , error ) {
k , v , err := c . nextDup ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , fmt . Errorf ( "in NextDup: %w" , err )
}
return k , v , nil
}
// NextNoDup - iterate with skipping all duplicates
func ( c * MdbxDupSortCursor ) NextNoDup ( ) ( [ ] byte , [ ] byte , error ) {
k , v , err := c . nextNoDup ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , fmt . Errorf ( "in NextNoDup: %w" , err )
}
return k , v , nil
}
func ( c * MdbxDupSortCursor ) PrevDup ( ) ( [ ] byte , [ ] byte , error ) {
k , v , err := c . prevDup ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , fmt . Errorf ( "in PrevDup: %w" , err )
}
return k , v , nil
}
func ( c * MdbxDupSortCursor ) PrevNoDup ( ) ( [ ] byte , [ ] byte , error ) {
k , v , err := c . prevNoDup ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil , nil
}
return [ ] byte { } , nil , fmt . Errorf ( "in PrevNoDup: %w" , err )
}
return k , v , nil
}
func ( c * MdbxDupSortCursor ) LastDup ( ) ( [ ] byte , error ) {
v , err := c . lastDup ( )
if err != nil {
if mdbx . IsNotFound ( err ) {
return nil , nil
}
return nil , fmt . Errorf ( "in LastDup: %w" , err )
}
return v , nil
}
func ( c * MdbxDupSortCursor ) Append ( k [ ] byte , v [ ] byte ) error {
if err := c . c . Put ( k , v , mdbx . Append | mdbx . AppendDup ) ; err != nil {
return fmt . Errorf ( "in Append: bucket=%s, %w" , c . bucketName , err )
}
return nil
}
func ( c * MdbxDupSortCursor ) AppendDup ( k [ ] byte , v [ ] byte ) error {
if err := c . appendDup ( k , v ) ; err != nil {
return fmt . Errorf ( "in AppendDup: bucket=%s, %w" , c . bucketName , err )
}
return nil
}
func ( c * MdbxDupSortCursor ) PutNoDupData ( key , value [ ] byte ) error {
if err := c . putNoDupData ( key , value ) ; err != nil {
return fmt . Errorf ( "in PutNoDupData: %w" , err )
}
return nil
}
// DeleteCurrentDuplicates - delete all of the data items for the current key.
func ( c * MdbxDupSortCursor ) DeleteCurrentDuplicates ( ) error {
if err := c . delNoDupData ( ) ; err != nil {
return fmt . Errorf ( "in DeleteCurrentDuplicates: %w" , err )
}
return nil
}
2021-08-22 10:39:47 +00:00
// CountDuplicates returns the number of duplicates for the current key. See mdb_cursor_count
2021-07-28 03:36:06 +00:00
func ( c * MdbxDupSortCursor ) CountDuplicates ( ) ( uint64 , error ) {
res , err := c . c . Count ( )
if err != nil {
return 0 , fmt . Errorf ( "in CountDuplicates: %w" , err )
}
return res , nil
}
func bucketSlice ( b kv . TableCfg ) [ ] string {
buckets := make ( [ ] string , 0 , len ( b ) )
for name := range b {
buckets = append ( buckets , name )
}
sort . Slice ( buckets , func ( i , j int ) bool {
return strings . Compare ( buckets [ i ] , buckets [ j ] ) < 0
} )
return buckets
}