erigon-pulse/common/etl/etl_test.go

422 lines
10 KiB
Go
Raw Normal View History

package etl
import (
"bytes"
"fmt"
"io"
"os"
"strings"
"testing"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/stretchr/testify/assert"
"github.com/ugorji/go/codec"
)
func TestWriteAndReadBufferEntry(t *testing.T) {
buffer := bytes.NewBuffer(make([]byte, 0))
encoder := codec.NewEncoder(buffer, &cbor)
keys := make([]string, 100)
vals := make([]string, 100)
for i := range keys {
keys[i] = fmt.Sprintf("key-%d", i)
vals[i] = fmt.Sprintf("value-%d", i)
}
for i := range keys {
if err := writeToDisk(encoder, []byte(keys[i]), []byte(vals[i])); err != nil {
t.Error(err)
}
}
bb := buffer.Bytes()
readBuffer := bytes.NewReader(bb)
decoder := codec.NewDecoder(readBuffer, &cbor)
for i := range keys {
k, v, err := readElementFromDisk(decoder)
if err != nil {
t.Error(err)
}
assert.Equal(t, keys[i], string(k))
assert.Equal(t, vals[i], string(v))
}
_, _, err := readElementFromDisk(decoder)
assert.Equal(t, io.EOF, err)
}
func TestNextKey(t *testing.T) {
for _, tc := range []string{
"00000001->00000002",
"000000FF->00000100",
"FEFFFFFF->FF000000",
} {
parts := strings.Split(tc, "->")
input := common.Hex2Bytes(parts[0])
expectedOutput := common.Hex2Bytes(parts[1])
actualOutput, err := NextKey(input)
assert.NoError(t, err)
assert.Equal(t, expectedOutput, actualOutput)
}
}
func TestNextKeyErr(t *testing.T) {
for _, tc := range []string{
"",
"FFFFFF",
} {
input := common.Hex2Bytes(tc)
_, err := NextKey(input)
assert.Error(t, err)
}
}
func TestFileDataProviders(t *testing.T) {
// test invariant when we go through files (> 1 buffer)
db := ethdb.NewMemDatabase()
sourceBucket := dbutils.Buckets[0]
generateTestData(t, db, sourceBucket, 10)
collector := NewCollector("", NewSortableBuffer(1))
err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil)
assert.NoError(t, err)
assert.Equal(t, 10, len(collector.dataProviders))
for _, p := range collector.dataProviders {
fp, ok := p.(*fileDataProvider)
assert.True(t, ok)
_, err = os.Stat(fp.file.Name())
assert.NoError(t, err)
}
disposeProviders(collector.dataProviders)
for _, p := range collector.dataProviders {
fp, ok := p.(*fileDataProvider)
assert.True(t, ok)
_, err = os.Stat(fp.file.Name())
assert.True(t, os.IsNotExist(err))
}
}
func TestRAMDataProviders(t *testing.T) {
// test invariant when we go through memory (1 buffer)
db := ethdb.NewMemDatabase()
sourceBucket := dbutils.Buckets[0]
generateTestData(t, db, sourceBucket, 10)
collector := NewCollector("", NewSortableBuffer(BufferOptimalSize))
err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(collector.dataProviders))
for _, p := range collector.dataProviders {
mp, ok := p.(*memoryDataProvider)
assert.True(t, ok)
assert.Equal(t, 10, mp.buffer.Len())
}
}
func TestTransformRAMOnly(t *testing.T) {
// test invariant when we only have one buffer and it fits into RAM (exactly 1 buffer)
db := ethdb.NewMemDatabase()
sourceBucket := dbutils.Buckets[0]
destBucket := dbutils.Buckets[1]
generateTestData(t, db, sourceBucket, 20)
err := Transform(
db,
sourceBucket,
destBucket,
"", // temp dir
testExtractToMapFunc,
testLoadFromMapFunc,
TransformArgs{},
)
assert.Nil(t, err)
compareBuckets(t, db, sourceBucket, destBucket, nil)
}
func TestTransformOnLoadCommitCustomBatchSize(t *testing.T) {
// test invariant when we only have one buffer and it fits into RAM (exactly 1 buffer)
db := ethdb.NewMemDatabase()
sourceBucket := dbutils.Buckets[0]
destBucket := dbutils.Buckets[1]
generateTestData(t, db, sourceBucket, 20)
numberOfCalls := 0
finalized := false
err := Transform(
db,
sourceBucket,
destBucket,
"", // temp dir
testExtractToMapFunc,
testLoadFromMapFunc,
TransformArgs{
OnLoadCommit: func(_ ethdb.Putter, _ []byte, isDone bool) error {
numberOfCalls++
if isDone {
finalized = true
}
return nil
},
loadBatchSize: 1,
},
)
assert.Nil(t, err)
compareBuckets(t, db, sourceBucket, destBucket, nil)
assert.Equal(t, 21, numberOfCalls)
assert.True(t, finalized)
}
func TestTransformOnLoadCommitDefaultBatchSize(t *testing.T) {
// test invariant when we only have one buffer and it fits into RAM (exactly 1 buffer)
db := ethdb.NewMemDatabase()
sourceBucket := dbutils.Buckets[0]
destBucket := dbutils.Buckets[1]
generateTestData(t, db, sourceBucket, 20)
numberOfCalls := 0
finalized := false
err := Transform(
db,
sourceBucket,
destBucket,
"", // temp dir
testExtractToMapFunc,
testLoadFromMapFunc,
TransformArgs{
OnLoadCommit: func(_ ethdb.Putter, _ []byte, isDone bool) error {
numberOfCalls++
if isDone {
finalized = true
}
return nil
},
},
)
assert.Nil(t, err)
compareBuckets(t, db, sourceBucket, destBucket, nil)
assert.Equal(t, 1, numberOfCalls)
assert.True(t, finalized)
}
func TestEmptySourceBucket(t *testing.T) {
db := ethdb.NewMemDatabase()
sourceBucket := dbutils.Buckets[0]
destBucket := dbutils.Buckets[1]
err := Transform(
db,
sourceBucket,
destBucket,
"", // temp dir
testExtractToMapFunc,
testLoadFromMapFunc,
TransformArgs{},
)
assert.Nil(t, err)
compareBuckets(t, db, sourceBucket, destBucket, nil)
}
func TestTransformExtractStartKey(t *testing.T) {
// test invariant when we only have one buffer and it fits into RAM (exactly 1 buffer)
db := ethdb.NewMemDatabase()
sourceBucket := dbutils.Buckets[0]
destBucket := dbutils.Buckets[1]
generateTestData(t, db, sourceBucket, 10)
err := Transform(
db,
sourceBucket,
destBucket,
"", // temp dir
testExtractToMapFunc,
testLoadFromMapFunc,
TransformArgs{ExtractStartKey: []byte(fmt.Sprintf("%10d-key-%010d", 5, 5))},
)
assert.Nil(t, err)
compareBuckets(t, db, sourceBucket, destBucket, []byte(fmt.Sprintf("%10d-key-%010d", 5, 5)))
}
func TestTransformThroughFiles(t *testing.T) {
// test invariant when we go through files (> 1 buffer)
db := ethdb.NewMemDatabase()
sourceBucket := dbutils.Buckets[0]
destBucket := dbutils.Buckets[1]
generateTestData(t, db, sourceBucket, 10)
err := Transform(
db,
sourceBucket,
destBucket,
"", // temp dir
testExtractToMapFunc,
testLoadFromMapFunc,
TransformArgs{
BufferSize: 1,
},
)
assert.Nil(t, err)
compareBuckets(t, db, sourceBucket, destBucket, nil)
}
func TestTransformDoubleOnExtract(t *testing.T) {
// test invariant when extractFunc multiplies the data 2x
db := ethdb.NewMemDatabase()
sourceBucket := dbutils.Buckets[0]
destBucket := dbutils.Buckets[1]
generateTestData(t, db, sourceBucket, 10)
err := Transform(
db,
sourceBucket,
destBucket,
"", // temp dir
testExtractDoubleToMapFunc,
testLoadFromMapFunc,
TransformArgs{},
)
assert.Nil(t, err)
compareBucketsDouble(t, db, sourceBucket, destBucket)
}
func TestTransformDoubleOnLoad(t *testing.T) {
// test invariant when loadFunc multiplies the data 2x
db := ethdb.NewMemDatabase()
sourceBucket := dbutils.Buckets[0]
destBucket := dbutils.Buckets[1]
generateTestData(t, db, sourceBucket, 10)
err := Transform(
db,
sourceBucket,
destBucket,
"", // temp dir
testExtractToMapFunc,
testLoadFromMapDoubleFunc,
TransformArgs{},
)
assert.Nil(t, err)
compareBucketsDouble(t, db, sourceBucket, destBucket)
}
func generateTestData(t *testing.T, db ethdb.Putter, bucket []byte, count int) {
for i := 0; i < count; i++ {
k := []byte(fmt.Sprintf("%10d-key-%010d", i, i))
v := []byte(fmt.Sprintf("val-%099d", i))
err := db.Put(bucket, k, v)
assert.NoError(t, err)
}
}
func testExtractToMapFunc(k, v []byte, next ExtractNextFunc) error {
buf := bytes.NewBuffer(nil)
encoder := codec.NewEncoder(nil, &cbor)
valueMap := make(map[string][]byte)
valueMap["value"] = v
encoder.Reset(buf)
encoder.MustEncode(valueMap)
return next(k, k, buf.Bytes())
}
func testExtractDoubleToMapFunc(k, v []byte, next ExtractNextFunc) error {
buf := bytes.NewBuffer(nil)
encoder := codec.NewEncoder(nil, &cbor)
valueMap := make(map[string][]byte)
valueMap["value"] = append(v, 0xAA)
k1 := append(k, 0xAA)
encoder.Reset(buf)
encoder.MustEncode(valueMap)
err := next(k, k1, buf.Bytes())
if err != nil {
return err
}
valueMap = make(map[string][]byte)
valueMap["value"] = append(v, 0xBB)
k2 := append(k, 0xBB)
buf.Reset()
encoder.Reset(buf)
encoder.MustEncode(valueMap)
return next(k, k2, buf.Bytes())
}
func testLoadFromMapFunc(k []byte, v []byte, _ State, next LoadNextFunc) error {
decoder := codec.NewDecoder(nil, &cbor)
decoder.ResetBytes(v)
valueMap := make(map[string][]byte)
err := decoder.Decode(&valueMap)
if err != nil {
return err
}
realValue := valueMap["value"]
return next(k, k, realValue)
}
func testLoadFromMapDoubleFunc(k []byte, v []byte, _ State, next LoadNextFunc) error {
decoder := codec.NewDecoder(nil, &cbor)
decoder.ResetBytes(v)
valueMap := make(map[string][]byte)
err := decoder.Decode(valueMap)
if err != nil {
return err
}
realValue := valueMap["value"]
err = next(k, append(k, 0xAA), append(realValue, 0xAA))
if err != nil {
return err
}
return next(k, append(k, 0xBB), append(realValue, 0xBB))
}
func compareBuckets(t *testing.T, db ethdb.Database, b1, b2 []byte, startKey []byte) {
t.Helper()
b1Map := make(map[string]string)
err := db.Walk(b1, startKey, len(startKey), func(k, v []byte) (bool, error) {
b1Map[fmt.Sprintf("%x", k)] = fmt.Sprintf("%x", v)
return true, nil
})
assert.NoError(t, err)
b2Map := make(map[string]string)
err = db.Walk(b2, nil, 0, func(k, v []byte) (bool, error) {
b2Map[fmt.Sprintf("%x", k)] = fmt.Sprintf("%x", v)
return true, nil
})
assert.NoError(t, err)
assert.Equal(t, b1Map, b2Map)
}
func compareBucketsDouble(t *testing.T, db ethdb.Database, b1, b2 []byte) {
t.Helper()
b1Map := make(map[string]string)
err := db.Walk(b1, nil, 0, func(k, v []byte) (bool, error) {
b1Map[fmt.Sprintf("%x", append(k, 0xAA))] = fmt.Sprintf("%x", append(v, 0xAA))
b1Map[fmt.Sprintf("%x", append(k, 0xBB))] = fmt.Sprintf("%x", append(v, 0xBB))
return true, nil
})
assert.NoError(t, err)
b2Map := make(map[string]string)
err = db.Walk(b2, nil, 0, func(k, v []byte) (bool, error) {
b2Map[fmt.Sprintf("%x", k)] = fmt.Sprintf("%x", v)
return true, nil
})
assert.NoError(t, err)
assert.Equal(t, b1Map, b2Map)
}