From e3755a0df293ad35438f2c02937603395ce91d38 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Fri, 19 Jun 2020 18:19:29 +0700 Subject: [PATCH] lmdb append (#678) --- ethdb/kv_lmdb.go | 55 +++++++++++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/ethdb/kv_lmdb.go b/ethdb/kv_lmdb.go index e68c8da0c..a65e91896 100644 --- a/ethdb/kv_lmdb.go +++ b/ethdb/kv_lmdb.go @@ -19,7 +19,7 @@ import ( var ( lmdbKvTxPool = sync.Pool{New: func() interface{} { return &lmdbTx{} }} - lmdbKvCursorPool = sync.Pool{New: func() interface{} { return &lmdbCursor{} }} + lmdbKvCursorPool = sync.Pool{New: func() interface{} { return &LmdbCursor{} }} lmdbKvBucketPool = sync.Pool{New: func() interface{} { return &lmdbBucket{} }} ) @@ -297,7 +297,7 @@ type lmdbTx struct { tx *lmdb.Txn ctx context.Context db *LmdbKV - cursors []*lmdbCursor + cursors []*LmdbCursor buckets []*lmdbBucket } @@ -307,7 +307,7 @@ type lmdbBucket struct { dbi lmdb.DBI } -type lmdbCursor struct { +type LmdbCursor struct { ctx context.Context bucket *lmdbBucket prefix []byte @@ -390,23 +390,23 @@ func (tx *lmdbTx) closeCursors() { tx.buckets = tx.buckets[:0] } -func (c *lmdbCursor) Prefix(v []byte) Cursor { +func (c *LmdbCursor) Prefix(v []byte) Cursor { c.prefix = v return c } -func (c *lmdbCursor) MatchBits(n uint) Cursor { +func (c *LmdbCursor) MatchBits(n uint) Cursor { panic("not implemented yet") } -func (c *lmdbCursor) Prefetch(v uint) Cursor { +func (c *LmdbCursor) Prefetch(v uint) Cursor { //c.cursorOpts.PrefetchSize = int(v) return c } -func (c *lmdbCursor) NoValues() NoValuesCursor { +func (c *LmdbCursor) NoValues() NoValuesCursor { //c.cursorOpts.PrefetchValues = false - return &lmdbNoValuesCursor{lmdbCursor: c} + return &lmdbNoValuesCursor{LmdbCursor: c} } func (b lmdbBucket) Get(key []byte) (val []byte, err error) { @@ -469,20 +469,20 @@ func (b *lmdbBucket) Clear() error { func (b *lmdbBucket) Cursor() Cursor { tx := b.tx - c := lmdbKvCursorPool.Get().(*lmdbCursor) + c := lmdbKvCursorPool.Get().(*LmdbCursor) c.ctx = tx.ctx c.bucket = b c.prefix = nil c.cursor = nil // add to auto-close on end of transactions if tx.cursors == nil { - tx.cursors = make([]*lmdbCursor, 0, 1) + tx.cursors = make([]*LmdbCursor, 0, 1) } tx.cursors = append(tx.cursors, c) return c } -func (c *lmdbCursor) initCursor() error { +func (c *LmdbCursor) initCursor() error { if c.cursor != nil { return nil } @@ -509,7 +509,7 @@ func (c *lmdbCursor) initCursor() error { return nil } -func (c *lmdbCursor) First() ([]byte, []byte, error) { +func (c *LmdbCursor) First() ([]byte, []byte, error) { if c.cursor == nil { if err := c.initCursor(); err != nil { return []byte{}, nil, err @@ -519,7 +519,7 @@ func (c *lmdbCursor) First() ([]byte, []byte, error) { return c.Seek(c.prefix) } -func (c *lmdbCursor) Seek(seek []byte) (k, v []byte, err error) { +func (c *LmdbCursor) Seek(seek []byte) (k, v []byte, err error) { if c.cursor == nil { if err := c.initCursor(); err != nil { return []byte{}, nil, err @@ -544,11 +544,11 @@ func (c *lmdbCursor) Seek(seek []byte) (k, v []byte, err error) { return k, v, nil } -func (c *lmdbCursor) SeekTo(seek []byte) ([]byte, []byte, error) { +func (c *LmdbCursor) SeekTo(seek []byte) ([]byte, []byte, error) { return c.Seek(seek) } -func (c *lmdbCursor) Next() (k, v []byte, err error) { +func (c *LmdbCursor) Next() (k, v []byte, err error) { select { case <-c.ctx.Done(): return []byte{}, nil, c.ctx.Err() @@ -569,7 +569,7 @@ func (c *lmdbCursor) Next() (k, v []byte, err error) { return k, v, nil } -func (c *lmdbCursor) Delete(key []byte) error { +func (c *LmdbCursor) Delete(key []byte) error { select { case <-c.ctx.Done(): return c.ctx.Err() @@ -593,7 +593,7 @@ func (c *lmdbCursor) Delete(key []byte) error { return c.cursor.Del(0) } -func (c *lmdbCursor) Put(key []byte, value []byte) error { +func (c *LmdbCursor) Put(key []byte, value []byte) error { select { case <-c.ctx.Done(): return c.ctx.Err() @@ -612,7 +612,24 @@ func (c *lmdbCursor) Put(key []byte, value []byte) error { return c.cursor.Put(key, value, 0) } -func (c *lmdbCursor) Walk(walker func(k, v []byte) (bool, error)) error { +// Append - speedy feature of lmdb which is not part of KV interface. +// Cast your cursor to *LmdbCursor to use this method. +// Danger: if provided data will not sorted (or bucket have old records which mess with new in sorting manner) - db will corrupt. +func (c *LmdbCursor) Append(key []byte, value []byte) error { + if len(key) == 0 { + return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", dbutils.Buckets[c.bucket.id]) + } + + if c.cursor == nil { + if err := c.initCursor(); err != nil { + return err + } + } + + return c.cursor.Put(key, value, lmdb.Append) +} + +func (c *LmdbCursor) Walk(walker func(k, v []byte) (bool, error)) error { for k, v, err := c.First(); k != nil; k, v, err = c.Next() { if err != nil { return err @@ -629,7 +646,7 @@ func (c *lmdbCursor) Walk(walker func(k, v []byte) (bool, error)) error { } type lmdbNoValuesCursor struct { - *lmdbCursor + *LmdbCursor } func (c *lmdbNoValuesCursor) Walk(walker func(k []byte, vSize uint32) (bool, error)) error {