2020-03-11 11:02:37 +00:00
package ethdb_test
import (
"context"
"errors"
2020-04-04 07:18:10 +00:00
"fmt"
2020-03-11 11:02:37 +00:00
"testing"
"time"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
2020-07-27 12:15:48 +00:00
"github.com/ledgerwatch/turbo-geth/ethdb/remote"
2020-04-04 07:18:10 +00:00
"github.com/ledgerwatch/turbo-geth/ethdb/remote/remotedbserver"
2020-07-27 12:15:48 +00:00
"github.com/ledgerwatch/turbo-geth/log"
2020-03-11 11:02:37 +00:00
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
2020-07-27 12:15:48 +00:00
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
2020-03-11 11:02:37 +00:00
)
2020-11-14 13:48:29 +00:00
func TestSequence ( t * testing . T ) {
writeDBs , _ , closeAll := setupDatabases ( func ( defaultBuckets dbutils . BucketsCfg ) dbutils . BucketsCfg {
return defaultBuckets
} )
defer closeAll ( )
ctx := context . Background ( )
for _ , db := range writeDBs {
db := db
2021-02-10 17:04:22 +00:00
tx , err := db . Begin ( ctx , ethdb . RW )
2020-11-14 13:48:29 +00:00
require . NoError ( t , err )
defer tx . Rollback ( )
i , err := tx . Sequence ( dbutils . Buckets [ 0 ] , 0 )
require . NoError ( t , err )
require . Equal ( t , uint64 ( 0 ) , i )
i , err = tx . Sequence ( dbutils . Buckets [ 0 ] , 1 )
require . NoError ( t , err )
require . Equal ( t , uint64 ( 0 ) , i )
i , err = tx . Sequence ( dbutils . Buckets [ 0 ] , 6 )
require . NoError ( t , err )
require . Equal ( t , uint64 ( 1 ) , i )
i , err = tx . Sequence ( dbutils . Buckets [ 0 ] , 1 )
require . NoError ( t , err )
require . Equal ( t , uint64 ( 7 ) , i )
i , err = tx . Sequence ( dbutils . Buckets [ 1 ] , 0 )
require . NoError ( t , err )
require . Equal ( t , uint64 ( 0 ) , i )
i , err = tx . Sequence ( dbutils . Buckets [ 1 ] , 1 )
require . NoError ( t , err )
require . Equal ( t , uint64 ( 0 ) , i )
i , err = tx . Sequence ( dbutils . Buckets [ 1 ] , 6 )
require . NoError ( t , err )
require . Equal ( t , uint64 ( 1 ) , i )
i , err = tx . Sequence ( dbutils . Buckets [ 1 ] , 1 )
require . NoError ( t , err )
require . Equal ( t , uint64 ( 7 ) , i )
}
}
2020-03-11 11:02:37 +00:00
func TestManagedTx ( t * testing . T ) {
2020-08-30 17:34:18 +00:00
defaultConfig := dbutils . BucketsConfigs
2020-08-02 11:53:08 +00:00
defer func ( ) {
2020-08-30 17:34:18 +00:00
dbutils . BucketsConfigs = defaultConfig
2020-08-02 11:53:08 +00:00
} ( )
2020-05-30 08:12:21 +00:00
2020-07-20 09:11:47 +00:00
bucketID := 0
bucket1 := dbutils . Buckets [ bucketID ]
bucket2 := dbutils . Buckets [ bucketID + 1 ]
2020-08-30 17:34:18 +00:00
writeDBs , readDBs , closeAll := setupDatabases ( func ( defaultBuckets dbutils . BucketsCfg ) dbutils . BucketsCfg {
return map [ string ] dbutils . BucketConfigItem {
bucket1 : {
2020-10-28 03:18:10 +00:00
Flags : dbutils . DupSort ,
2020-09-04 03:54:15 +00:00
AutoDupSortKeysConversion : true ,
DupToLen : 4 ,
DupFromLen : 6 ,
2020-08-30 17:34:18 +00:00
} ,
bucket2 : {
Flags : 0 ,
} ,
}
} )
2020-08-02 11:53:08 +00:00
defer closeAll ( )
2020-03-11 11:02:37 +00:00
ctx := context . Background ( )
2020-05-30 08:12:21 +00:00
for _ , db := range writeDBs {
db := db
2021-02-10 17:04:22 +00:00
tx , err := db . Begin ( ctx , ethdb . RW )
2020-10-28 03:18:10 +00:00
require . NoError ( t , err )
defer tx . Rollback ( )
c := tx . Cursor ( bucket1 )
c1 := tx . Cursor ( bucket2 )
require . NoError ( t , c . Append ( [ ] byte { 0 } , [ ] byte { 1 } ) )
require . NoError ( t , c1 . Append ( [ ] byte { 0 } , [ ] byte { 1 } ) )
require . NoError ( t , c . Append ( [ ] byte { 0 , 0 , 0 , 0 , 0 , 1 } , [ ] byte { 1 } ) ) // prefixes of len=FromLen for DupSort test (other keys must be <ToLen)
require . NoError ( t , c1 . Append ( [ ] byte { 0 , 0 , 0 , 0 , 0 , 1 } , [ ] byte { 1 } ) )
require . NoError ( t , c . Append ( [ ] byte { 0 , 0 , 0 , 0 , 0 , 2 } , [ ] byte { 1 } ) )
require . NoError ( t , c1 . Append ( [ ] byte { 0 , 0 , 0 , 0 , 0 , 2 } , [ ] byte { 1 } ) )
require . NoError ( t , c . Append ( [ ] byte { 0 , 0 , 1 } , [ ] byte { 1 } ) )
require . NoError ( t , c1 . Append ( [ ] byte { 0 , 0 , 1 } , [ ] byte { 1 } ) )
for i := uint8 ( 1 ) ; i < 10 ; i ++ {
require . NoError ( t , c . Append ( [ ] byte { i } , [ ] byte { 1 } ) )
require . NoError ( t , c1 . Append ( [ ] byte { i } , [ ] byte { 1 } ) )
2020-05-30 08:12:21 +00:00
}
2020-10-28 03:18:10 +00:00
require . NoError ( t , c . Put ( [ ] byte { 0 , 0 , 0 , 0 , 0 , 1 } , [ ] byte { 2 } ) )
require . NoError ( t , c1 . Put ( [ ] byte { 0 , 0 , 0 , 0 , 0 , 1 } , [ ] byte { 2 } ) )
err = tx . Commit ( context . Background ( ) )
require . NoError ( t , err )
2020-05-30 08:12:21 +00:00
}
for _ , db := range readDBs {
db := db
msg := fmt . Sprintf ( "%T" , db )
2020-10-10 12:24:56 +00:00
switch db . ( type ) {
2020-10-24 06:54:03 +00:00
case * ethdb . RemoteKV :
2020-10-10 12:24:56 +00:00
default :
continue
}
2020-05-30 08:12:21 +00:00
t . Run ( "ctx cancel " + msg , func ( t * testing . T ) {
t . Skip ( "probably need enable after go 1.4" )
2020-07-20 09:11:47 +00:00
testCtxCancel ( t , db , bucket1 )
2020-05-30 08:12:21 +00:00
} )
t . Run ( "filter " + msg , func ( t * testing . T ) {
2020-10-10 12:24:56 +00:00
//testPrefixFilter(t, db, bucket1)
2020-05-30 08:12:21 +00:00
} )
2020-06-04 11:19:59 +00:00
t . Run ( "multiple cursors " + msg , func ( t * testing . T ) {
2020-07-20 09:11:47 +00:00
testMultiCursor ( t , db , bucket1 , bucket2 )
2020-06-04 11:19:59 +00:00
} )
2020-05-30 08:12:21 +00:00
}
}
2020-08-30 17:34:18 +00:00
func setupDatabases ( f ethdb . BucketConfigsFunc ) ( writeDBs [ ] ethdb . KV , readDBs [ ] ethdb . KV , close func ( ) ) {
2020-05-30 08:12:21 +00:00
writeDBs = [ ] ethdb . KV {
2020-08-30 17:34:18 +00:00
ethdb . NewLMDB ( ) . InMem ( ) . WithBucketsConfig ( f ) . MustOpen ( ) ,
2020-10-28 03:18:10 +00:00
ethdb . NewMDBX ( ) . InMem ( ) . WithBucketsConfig ( f ) . MustOpen ( ) ,
2020-08-30 17:34:18 +00:00
ethdb . NewLMDB ( ) . InMem ( ) . WithBucketsConfig ( f ) . MustOpen ( ) , // for remote db
2020-04-04 07:18:10 +00:00
}
2020-03-11 11:02:37 +00:00
2020-07-27 12:15:48 +00:00
conn := bufconn . Listen ( 1024 * 1024 )
2020-03-11 11:02:37 +00:00
2020-10-24 06:54:03 +00:00
rdb , _ := ethdb . NewRemote ( ) . InMem ( conn ) . MustOpen ( )
2020-05-30 08:12:21 +00:00
readDBs = [ ] ethdb . KV {
2020-04-04 07:18:10 +00:00
writeDBs [ 0 ] ,
2020-08-05 15:33:45 +00:00
writeDBs [ 1 ] ,
2020-08-11 21:09:30 +00:00
rdb ,
2020-04-04 07:18:10 +00:00
}
2020-03-11 11:02:37 +00:00
2020-07-27 12:15:48 +00:00
grpcServer := grpc . NewServer ( )
go func ( ) {
2020-10-24 06:54:03 +00:00
remote . RegisterKVServer ( grpcServer , remotedbserver . NewKvServer ( writeDBs [ 1 ] ) )
2020-07-27 12:15:48 +00:00
if err := grpcServer . Serve ( conn ) ; err != nil {
log . Error ( "private RPC server fail" , "err" , err )
2020-03-11 11:02:37 +00:00
}
2020-07-27 12:15:48 +00:00
} ( )
2020-03-11 11:02:37 +00:00
2020-07-27 12:15:48 +00:00
return writeDBs , readDBs , func ( ) {
grpcServer . Stop ( )
2020-08-15 07:11:40 +00:00
if err := conn . Close ( ) ; err != nil {
panic ( err )
}
2020-03-11 11:02:37 +00:00
2020-07-27 12:15:48 +00:00
for _ , db := range readDBs {
db . Close ( )
}
for _ , db := range writeDBs {
db . Close ( )
}
2020-03-11 11:02:37 +00:00
}
}
2020-08-10 23:55:32 +00:00
func testCtxCancel ( t * testing . T , db ethdb . KV , bucket1 string ) {
2020-04-04 07:18:10 +00:00
assert := assert . New ( t )
cancelableCtx , cancel := context . WithTimeout ( context . Background ( ) , time . Microsecond )
defer cancel ( )
2020-03-11 11:02:37 +00:00
2020-04-04 07:18:10 +00:00
if err := db . View ( cancelableCtx , func ( tx ethdb . Tx ) error {
2020-08-14 06:41:18 +00:00
c := tx . Cursor ( bucket1 )
2020-04-04 07:18:10 +00:00
for {
2020-05-30 08:12:21 +00:00
for k , _ , err := c . First ( ) ; k != nil ; k , _ , err = c . Next ( ) {
2020-04-04 07:18:10 +00:00
if err != nil {
return err
}
}
}
} ) ; err != nil {
assert . True ( errors . Is ( context . DeadlineExceeded , err ) )
}
}
2020-03-11 11:02:37 +00:00
2020-08-10 23:55:32 +00:00
func testMultiCursor ( t * testing . T , db ethdb . KV , bucket1 , bucket2 string ) {
2020-06-04 11:19:59 +00:00
assert , ctx := assert . New ( t ) , context . Background ( )
if err := db . View ( ctx , func ( tx ethdb . Tx ) error {
2020-08-14 06:41:18 +00:00
c1 := tx . Cursor ( bucket1 )
c2 := tx . Cursor ( bucket2 )
2020-06-04 11:19:59 +00:00
k1 , v1 , err := c1 . First ( )
assert . NoError ( err )
k2 , v2 , err := c2 . First ( )
assert . NoError ( err )
assert . Equal ( k1 , k2 )
assert . Equal ( v1 , v2 )
k1 , v1 , err = c1 . Next ( )
assert . NoError ( err )
k2 , v2 , err = c2 . Next ( )
assert . NoError ( err )
assert . Equal ( k1 , k2 )
assert . Equal ( v1 , v2 )
2020-07-20 09:11:47 +00:00
k1 , v1 , err = c1 . Seek ( [ ] byte { 0 } )
assert . NoError ( err )
k2 , v2 , err = c2 . Seek ( [ ] byte { 0 } )
assert . NoError ( err )
assert . Equal ( k1 , k2 )
assert . Equal ( v1 , v2 )
k1 , v1 , err = c1 . Seek ( [ ] byte { 0 , 0 } )
assert . NoError ( err )
k2 , v2 , err = c2 . Seek ( [ ] byte { 0 , 0 } )
assert . NoError ( err )
assert . Equal ( k1 , k2 )
assert . Equal ( v1 , v2 )
k1 , v1 , err = c1 . Seek ( [ ] byte { 0 , 0 , 0 , 0 } )
assert . NoError ( err )
k2 , v2 , err = c2 . Seek ( [ ] byte { 0 , 0 , 0 , 0 } )
assert . NoError ( err )
assert . Equal ( k1 , k2 )
assert . Equal ( v1 , v2 )
2020-08-02 11:53:08 +00:00
k1 , v1 , err = c1 . Next ( )
assert . NoError ( err )
k2 , v2 , err = c2 . Next ( )
assert . NoError ( err )
assert . Equal ( k1 , k2 )
assert . Equal ( v1 , v2 )
k1 , v1 , err = c1 . Seek ( [ ] byte { 0 } )
assert . NoError ( err )
k2 , v2 , err = c2 . Seek ( [ ] byte { 0 } )
assert . NoError ( err )
assert . Equal ( k1 , k2 )
assert . Equal ( v1 , v2 )
k1 , v1 , err = c1 . Seek ( [ ] byte { 0 , 0 } )
assert . NoError ( err )
k2 , v2 , err = c2 . Seek ( [ ] byte { 0 , 0 } )
assert . NoError ( err )
assert . Equal ( k1 , k2 )
assert . Equal ( v1 , v2 )
k1 , v1 , err = c1 . Seek ( [ ] byte { 0 , 0 , 0 , 0 } )
assert . NoError ( err )
k2 , v2 , err = c2 . Seek ( [ ] byte { 0 , 0 , 0 , 0 } )
assert . NoError ( err )
assert . Equal ( k1 , k2 )
assert . Equal ( v1 , v2 )
2020-07-20 09:11:47 +00:00
k1 , v1 , err = c1 . Next ( )
assert . NoError ( err )
k2 , v2 , err = c2 . Next ( )
assert . NoError ( err )
assert . Equal ( k1 , k2 )
assert . Equal ( v1 , v2 )
2020-06-04 11:19:59 +00:00
k1 , v1 , err = c1 . Seek ( [ ] byte { 2 } )
assert . NoError ( err )
k2 , v2 , err = c2 . Seek ( [ ] byte { 2 } )
assert . NoError ( err )
assert . Equal ( k1 , k2 )
assert . Equal ( v1 , v2 )
return nil
} ) ; err != nil {
assert . NoError ( err )
}
}
2020-10-29 13:19:31 +00:00
//func TestMultipleBuckets(t *testing.T) {
// writeDBs, readDBs, closeAll := setupDatabases(ethdb.DefaultBucketConfigs)
// defer closeAll()
//
// ctx := context.Background()
//
// for _, db := range writeDBs {
// db := db
// msg := fmt.Sprintf("%T", db)
// t.Run("FillBuckets "+msg, func(t *testing.T) {
// if err := db.Update(ctx, func(tx ethdb.Tx) error {
// c := tx.Cursor(dbutils.Buckets[0])
// for i := uint8(0); i < 10; i++ {
// require.NoError(t, c.Put([]byte{i}, []byte{i}))
// }
// c2 := tx.Cursor(dbutils.Buckets[1])
// for i := uint8(0); i < 12; i++ {
// require.NoError(t, c2.Put([]byte{i}, []byte{i}))
// }
//
// // delete from first bucket key 5, then will seek on it and expect to see key 6
// if err := c.Delete([]byte{5}, nil); err != nil {
// return err
// }
// // delete non-existing key
// if err := c.Delete([]byte{6, 1}, nil); err != nil {
// return err
// }
//
// return nil
// }); err != nil {
// require.NoError(t, err)
// }
// })
// }
//
// for _, db := range readDBs {
// db := db
// msg := fmt.Sprintf("%T", db)
// t.Run("MultipleBuckets "+msg, func(t *testing.T) {
// counter2, counter := 0, 0
// var key, value []byte
// err := db.View(ctx, func(tx ethdb.Tx) error {
// c := tx.Cursor(dbutils.Buckets[0])
// for k, _, err := c.First(); k != nil; k, _, err = c.Next() {
// if err != nil {
// return err
// }
// counter++
// }
//
// c2 := tx.Cursor(dbutils.Buckets[1])
// for k, _, err := c2.First(); k != nil; k, _, err = c2.Next() {
// if err != nil {
// return err
// }
// counter2++
// }
//
// c3 := tx.Cursor(dbutils.Buckets[0])
// k, v, err := c3.Seek([]byte{5})
// if err != nil {
// return err
// }
// key = common.CopyBytes(k)
// value = common.CopyBytes(v)
//
// return nil
// })
// require.NoError(t, err)
// assert.Equal(t, 9, counter)
// assert.Equal(t, 12, counter2)
// assert.Equal(t, []byte{6}, key)
// assert.Equal(t, []byte{6}, value)
// })
// }
//}
//func TestReadAfterPut(t *testing.T) {
// writeDBs, _, closeAll := setupDatabases(ethdb.DefaultBucketConfigs)
// defer closeAll()
//
// ctx := context.Background()
//
// for _, db := range writeDBs {
// db := db
// msg := fmt.Sprintf("%T", db)
// t.Run("GetAfterPut "+msg, func(t *testing.T) {
// if err := db.Update(ctx, func(tx ethdb.Tx) error {
// c := tx.Cursor(dbutils.Buckets[0])
// for i := uint8(0); i < 10; i++ { // don't read in same loop to check that writes don't affect each other (for example by sharing bucket.prefix buffer)
// require.NoError(t, c.Put([]byte{i}, []byte{i}))
// }
//
// for i := uint8(0); i < 10; i++ {
// v, err := c.SeekExact([]byte{i})
// require.NoError(t, err)
// require.Equal(t, []byte{i}, v)
// }
//
// c2 := tx.Cursor(dbutils.Buckets[1])
// for i := uint8(0); i < 12; i++ {
// require.NoError(t, c2.Put([]byte{i}, []byte{i}))
// }
//
// for i := uint8(0); i < 12; i++ {
// v, err := c2.SeekExact([]byte{i})
// require.NoError(t, err)
// require.Equal(t, []byte{i}, v)
// }
//
// {
// require.NoError(t, c2.Delete([]byte{5}, nil))
// v, err := c2.SeekExact([]byte{5})
// require.NoError(t, err)
// require.Nil(t, v)
//
// require.NoError(t, c2.Delete([]byte{255}, nil)) // delete non-existing key
// }
//
// return nil
// }); err != nil {
// require.NoError(t, err)
// }
// })
//
// t.Run("cursor put and delete"+msg, func(t *testing.T) {
// if err := db.Update(ctx, func(tx ethdb.Tx) error {
// c3 := tx.Cursor(dbutils.Buckets[2])
// for i := uint8(0); i < 10; i++ { // don't read in same loop to check that writes don't affect each other (for example by sharing bucket.prefix buffer)
// require.NoError(t, c3.Put([]byte{i}, []byte{i}))
// }
// for i := uint8(0); i < 10; i++ {
// v, err := tx.GetOne(dbutils.Buckets[2], []byte{i})
// require.NoError(t, err)
// require.Equal(t, []byte{i}, v)
// }
//
// require.NoError(t, c3.Delete([]byte{255}, nil)) // delete non-existing key
// return nil
// }); err != nil {
// t.Error(err)
// }
//
// if err := db.Update(ctx, func(tx ethdb.Tx) error {
// c3 := tx.Cursor(dbutils.Buckets[2])
// require.NoError(t, c3.Delete([]byte{5}, nil))
// v, err := tx.GetOne(dbutils.Buckets[2], []byte{5})
// require.NoError(t, err)
// require.Nil(t, v)
// return nil
// }); err != nil {
// t.Error(err)
// }
// })
// }
//}