lmdb append (#678)

This commit is contained in:
Alex Sharov 2020-06-19 18:19:29 +07:00 committed by GitHub
parent fd98914c28
commit e3755a0df2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -19,7 +19,7 @@ import (
var (
lmdbKvTxPool = sync.Pool{New: func() interface{} { return &lmdbTx{} }}
lmdbKvCursorPool = sync.Pool{New: func() interface{} { return &lmdbCursor{} }}
lmdbKvCursorPool = sync.Pool{New: func() interface{} { return &LmdbCursor{} }}
lmdbKvBucketPool = sync.Pool{New: func() interface{} { return &lmdbBucket{} }}
)
@ -297,7 +297,7 @@ type lmdbTx struct {
tx *lmdb.Txn
ctx context.Context
db *LmdbKV
cursors []*lmdbCursor
cursors []*LmdbCursor
buckets []*lmdbBucket
}
@ -307,7 +307,7 @@ type lmdbBucket struct {
dbi lmdb.DBI
}
type lmdbCursor struct {
type LmdbCursor struct {
ctx context.Context
bucket *lmdbBucket
prefix []byte
@ -390,23 +390,23 @@ func (tx *lmdbTx) closeCursors() {
tx.buckets = tx.buckets[:0]
}
func (c *lmdbCursor) Prefix(v []byte) Cursor {
func (c *LmdbCursor) Prefix(v []byte) Cursor {
c.prefix = v
return c
}
func (c *lmdbCursor) MatchBits(n uint) Cursor {
func (c *LmdbCursor) MatchBits(n uint) Cursor {
panic("not implemented yet")
}
func (c *lmdbCursor) Prefetch(v uint) Cursor {
func (c *LmdbCursor) Prefetch(v uint) Cursor {
//c.cursorOpts.PrefetchSize = int(v)
return c
}
func (c *lmdbCursor) NoValues() NoValuesCursor {
func (c *LmdbCursor) NoValues() NoValuesCursor {
//c.cursorOpts.PrefetchValues = false
return &lmdbNoValuesCursor{lmdbCursor: c}
return &lmdbNoValuesCursor{LmdbCursor: c}
}
func (b lmdbBucket) Get(key []byte) (val []byte, err error) {
@ -469,20 +469,20 @@ func (b *lmdbBucket) Clear() error {
func (b *lmdbBucket) Cursor() Cursor {
tx := b.tx
c := lmdbKvCursorPool.Get().(*lmdbCursor)
c := lmdbKvCursorPool.Get().(*LmdbCursor)
c.ctx = tx.ctx
c.bucket = b
c.prefix = nil
c.cursor = nil
// add to auto-close on end of transactions
if tx.cursors == nil {
tx.cursors = make([]*lmdbCursor, 0, 1)
tx.cursors = make([]*LmdbCursor, 0, 1)
}
tx.cursors = append(tx.cursors, c)
return c
}
func (c *lmdbCursor) initCursor() error {
func (c *LmdbCursor) initCursor() error {
if c.cursor != nil {
return nil
}
@ -509,7 +509,7 @@ func (c *lmdbCursor) initCursor() error {
return nil
}
func (c *lmdbCursor) First() ([]byte, []byte, error) {
func (c *LmdbCursor) First() ([]byte, []byte, error) {
if c.cursor == nil {
if err := c.initCursor(); err != nil {
return []byte{}, nil, err
@ -519,7 +519,7 @@ func (c *lmdbCursor) First() ([]byte, []byte, error) {
return c.Seek(c.prefix)
}
func (c *lmdbCursor) Seek(seek []byte) (k, v []byte, err error) {
func (c *LmdbCursor) Seek(seek []byte) (k, v []byte, err error) {
if c.cursor == nil {
if err := c.initCursor(); err != nil {
return []byte{}, nil, err
@ -544,11 +544,11 @@ func (c *lmdbCursor) Seek(seek []byte) (k, v []byte, err error) {
return k, v, nil
}
func (c *lmdbCursor) SeekTo(seek []byte) ([]byte, []byte, error) {
func (c *LmdbCursor) SeekTo(seek []byte) ([]byte, []byte, error) {
return c.Seek(seek)
}
func (c *lmdbCursor) Next() (k, v []byte, err error) {
func (c *LmdbCursor) Next() (k, v []byte, err error) {
select {
case <-c.ctx.Done():
return []byte{}, nil, c.ctx.Err()
@ -569,7 +569,7 @@ func (c *lmdbCursor) Next() (k, v []byte, err error) {
return k, v, nil
}
func (c *lmdbCursor) Delete(key []byte) error {
func (c *LmdbCursor) Delete(key []byte) error {
select {
case <-c.ctx.Done():
return c.ctx.Err()
@ -593,7 +593,7 @@ func (c *lmdbCursor) Delete(key []byte) error {
return c.cursor.Del(0)
}
func (c *lmdbCursor) Put(key []byte, value []byte) error {
func (c *LmdbCursor) Put(key []byte, value []byte) error {
select {
case <-c.ctx.Done():
return c.ctx.Err()
@ -612,7 +612,24 @@ func (c *lmdbCursor) Put(key []byte, value []byte) error {
return c.cursor.Put(key, value, 0)
}
func (c *lmdbCursor) Walk(walker func(k, v []byte) (bool, error)) error {
// Append - speedy feature of lmdb which is not part of KV interface.
// Cast your cursor to *LmdbCursor to use this method.
// Danger: if provided data will not sorted (or bucket have old records which mess with new in sorting manner) - db will corrupt.
func (c *LmdbCursor) Append(key []byte, value []byte) error {
if len(key) == 0 {
return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", dbutils.Buckets[c.bucket.id])
}
if c.cursor == nil {
if err := c.initCursor(); err != nil {
return err
}
}
return c.cursor.Put(key, value, lmdb.Append)
}
func (c *LmdbCursor) Walk(walker func(k, v []byte) (bool, error)) error {
for k, v, err := c.First(); k != nil; k, v, err = c.Next() {
if err != nil {
return err
@ -629,7 +646,7 @@ func (c *lmdbCursor) Walk(walker func(k, v []byte) (bool, error)) error {
}
type lmdbNoValuesCursor struct {
*lmdbCursor
*LmdbCursor
}
func (c *lmdbNoValuesCursor) Walk(walker func(k []byte, vSize uint32) (bool, error)) error {