Lmdb: avoid empty key values in put and seek (#673)

* avoid 0 length key/values and seek arguments

* less abstractions in .get method, and copy memory from C space without unsafe conversion into go memory

* fix test
This commit is contained in:
Alex Sharov 2020-06-17 16:57:48 +07:00 committed by GitHub
parent 8112cf1f9f
commit 5b051d9ab3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 34 deletions

View File

@ -169,10 +169,11 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, blockchain B
return err
}
start := time.Now()
sz := batch.BatchSize()
if _, err = batch.Commit(); err != nil {
return err
}
log.Info("Batch committed", "in", time.Since(start), "size", common.StorageSize(batch.BatchSize()))
log.Info("Batch committed", "in", time.Since(start), "size", common.StorageSize(sz))
}
if prof {

View File

@ -95,7 +95,7 @@ func testPutGet(db MinDatabase, t *testing.T) {
t.Parallel()
for _, k := range testValues {
err := db.Put(testBucket, []byte(k), []byte{})
err := db.Put(testBucket, []byte(k), []byte{1})
if err != nil {
t.Fatalf("put failed: %v", err)
}
@ -106,8 +106,8 @@ func testPutGet(db MinDatabase, t *testing.T) {
if err != nil {
t.Fatalf("get failed: %v", err)
}
if len(data) != 0 {
t.Fatalf("get returned wrong result, got %q expected nil", string(data))
if len(data) != 1 {
t.Fatalf("get returned wrong result, got %q expected 1", string(data))
}
}

View File

@ -422,6 +422,13 @@ func (b *lmdbBucket) Put(key []byte, value []byte) error {
default:
}
if len(key) == 0 {
return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", dbutils.Buckets[b.id])
}
if len(value) == 0 {
return fmt.Errorf("lmdb doesn't support empty values. bucket: %s", dbutils.Buckets[b.id])
}
err := b.tx.tx.Put(b.dbi, key, value, 0)
if err != nil {
return fmt.Errorf("failed LmdbKV.Put: %w", err)
@ -517,7 +524,7 @@ func (c *lmdbCursor) Seek(seek []byte) (k, v []byte, err error) {
}
}
if seek == nil {
if len(seek) == 0 {
k, v, err = c.cursor.Get(nil, nil, lmdb.First)
} else {
k, v, err = c.cursor.Get(seek, nil, lmdb.SetRange)
@ -591,6 +598,12 @@ func (c *lmdbCursor) Put(key []byte, value []byte) error {
default:
}
if len(key) == 0 {
return fmt.Errorf("lmdb doesn't support empty keys. bucket: %s", dbutils.Buckets[c.bucket.id])
}
if len(value) == 0 {
return fmt.Errorf("lmdb doesn't support empty values. bucket: %s", dbutils.Buckets[c.bucket.id])
}
if c.cursor == nil {
if err := c.initCursor(); err != nil {
return err
@ -637,51 +650,33 @@ func (c *lmdbNoValuesCursor) Walk(walker func(k []byte, vSize uint32) (bool, err
}
func (c *lmdbNoValuesCursor) First() (k []byte, v uint32, err error) {
if c.cursor == nil {
if err := c.initCursor(); err != nil {
return []byte{}, 0, err
}
}
var val []byte
if len(c.prefix) == 0 {
k, val, err = c.cursor.Get(nil, nil, lmdb.First)
} else {
k, val, err = c.cursor.Get(c.prefix, nil, lmdb.SetKey)
}
if err != nil {
if lmdb.IsNotFound(err) {
return []byte{}, uint32(len(val)), nil
}
return []byte{}, 0, err
}
if c.prefix != nil && !bytes.HasPrefix(k, c.prefix) {
k, val = nil, nil
}
return k, uint32(len(val)), err
return c.Seek(c.prefix)
}
func (c *lmdbNoValuesCursor) Seek(seek []byte) (k []byte, v uint32, err error) {
func (c *lmdbNoValuesCursor) Seek(seek []byte) (k []byte, vSize uint32, err error) {
if c.cursor == nil {
if err := c.initCursor(); err != nil {
return []byte{}, 0, err
}
}
var val []byte
k, val, err = c.cursor.Get(seek, nil, lmdb.SetKey)
var v []byte
if len(seek) == 0 {
k, v, err = c.cursor.Get(nil, nil, lmdb.First)
} else {
k, v, err = c.cursor.Get(seek, nil, lmdb.SetRange)
}
if err != nil {
if lmdb.IsNotFound(err) {
return []byte{}, uint32(len(val)), nil
return []byte{}, uint32(len(v)), nil
}
return []byte{}, 0, err
}
if c.prefix != nil && !bytes.HasPrefix(k, c.prefix) {
k, val = nil, nil
k, v = nil, nil
}
return k, uint32(len(val)), err
return k, uint32(len(v)), err
}
func (c *lmdbNoValuesCursor) SeekTo(seek []byte) ([]byte, uint32, error) {