mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-07 19:41:19 +00:00
391 lines
9.7 KiB
Go
391 lines
9.7 KiB
Go
package etl
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/ledgerwatch/turbo-geth/common"
|
|
"github.com/ledgerwatch/turbo-geth/common/dbutils"
|
|
"github.com/ledgerwatch/turbo-geth/ethdb"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/ugorji/go/codec"
|
|
)
|
|
|
|
func TestWriteAndReadBufferEntry(t *testing.T) {
|
|
buffer := bytes.NewBuffer(make([]byte, 0))
|
|
encoder := codec.NewEncoder(buffer, &cbor)
|
|
|
|
keys := make([]string, 100)
|
|
vals := make([]string, 100)
|
|
|
|
for i := range keys {
|
|
keys[i] = fmt.Sprintf("key-%d", i)
|
|
vals[i] = fmt.Sprintf("value-%d", i)
|
|
}
|
|
|
|
for i := range keys {
|
|
if err := writeToDisk(encoder, []byte(keys[i]), []byte(vals[i])); err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
|
|
bb := buffer.Bytes()
|
|
|
|
readBuffer := bytes.NewReader(bb)
|
|
|
|
decoder := codec.NewDecoder(readBuffer, &cbor)
|
|
|
|
for i := range keys {
|
|
k, v, err := readElementFromDisk(decoder)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
assert.Equal(t, keys[i], string(k))
|
|
assert.Equal(t, vals[i], string(v))
|
|
}
|
|
|
|
_, _, err := readElementFromDisk(decoder)
|
|
assert.Equal(t, io.EOF, err)
|
|
}
|
|
|
|
func TestNextKey(t *testing.T) {
|
|
for _, tc := range []string{
|
|
"00000001->00000002",
|
|
"000000FF->00000100",
|
|
"FEFFFFFF->FF000000",
|
|
} {
|
|
parts := strings.Split(tc, "->")
|
|
input := common.Hex2Bytes(parts[0])
|
|
expectedOutput := common.Hex2Bytes(parts[1])
|
|
actualOutput, err := NextKey(input)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, expectedOutput, actualOutput)
|
|
}
|
|
}
|
|
|
|
func TestNextKeyErr(t *testing.T) {
|
|
for _, tc := range []string{
|
|
"",
|
|
"FFFFFF",
|
|
} {
|
|
input := common.Hex2Bytes(tc)
|
|
_, err := NextKey(input)
|
|
assert.Error(t, err)
|
|
}
|
|
}
|
|
|
|
func TestFileDataProviders(t *testing.T) {
|
|
// test invariant when we go through files (> 1 buffer)
|
|
_, tx := ethdb.NewTestTx(t)
|
|
sourceBucket := dbutils.Buckets[0]
|
|
|
|
generateTestData(t, tx, sourceBucket, 10)
|
|
|
|
collector := NewCollector("", NewSortableBuffer(1))
|
|
|
|
err := extractBucketIntoFiles("logPrefix", tx, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil, nil)
|
|
assert.NoError(t, err)
|
|
|
|
assert.Equal(t, 10, len(collector.dataProviders))
|
|
|
|
for _, p := range collector.dataProviders {
|
|
fp, ok := p.(*fileDataProvider)
|
|
assert.True(t, ok)
|
|
_, err = os.Stat(fp.file.Name())
|
|
assert.NoError(t, err)
|
|
}
|
|
|
|
disposeProviders("logPrefix", collector.dataProviders)
|
|
|
|
for _, p := range collector.dataProviders {
|
|
fp, ok := p.(*fileDataProvider)
|
|
assert.True(t, ok)
|
|
_, err = os.Stat(fp.file.Name())
|
|
assert.True(t, os.IsNotExist(err))
|
|
}
|
|
}
|
|
|
|
func TestRAMDataProviders(t *testing.T) {
|
|
// test invariant when we go through memory (1 buffer)
|
|
_, tx := ethdb.NewTestTx(t)
|
|
sourceBucket := dbutils.Buckets[0]
|
|
generateTestData(t, tx, sourceBucket, 10)
|
|
|
|
collector := NewCollector("", NewSortableBuffer(BufferOptimalSize))
|
|
err := extractBucketIntoFiles("logPrefix", tx, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil, nil)
|
|
assert.NoError(t, err)
|
|
|
|
assert.Equal(t, 1, len(collector.dataProviders))
|
|
|
|
for _, p := range collector.dataProviders {
|
|
mp, ok := p.(*memoryDataProvider)
|
|
assert.True(t, ok)
|
|
assert.Equal(t, 10, mp.buffer.Len())
|
|
}
|
|
}
|
|
|
|
func TestTransformRAMOnly(t *testing.T) {
|
|
// test invariant when we only have one buffer and it fits into RAM (exactly 1 buffer)
|
|
db := ethdb.NewTestDB(t)
|
|
tx, err := db.Begin(context.Background(), ethdb.RW)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
sourceBucket := dbutils.Buckets[0]
|
|
destBucket := dbutils.Buckets[1]
|
|
generateTestData(t, tx, sourceBucket, 20)
|
|
err = Transform(
|
|
"logPrefix",
|
|
tx.(ethdb.HasTx).Tx().(ethdb.RwTx),
|
|
sourceBucket,
|
|
destBucket,
|
|
"", // temp dir
|
|
testExtractToMapFunc,
|
|
testLoadFromMapFunc,
|
|
TransformArgs{},
|
|
)
|
|
assert.Nil(t, err)
|
|
compareBuckets(t, tx, sourceBucket, destBucket, nil)
|
|
}
|
|
|
|
func TestEmptySourceBucket(t *testing.T) {
|
|
db := ethdb.NewTestDB(t)
|
|
tx, err := db.Begin(context.Background(), ethdb.RW)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer tx.Rollback()
|
|
sourceBucket := dbutils.Buckets[0]
|
|
destBucket := dbutils.Buckets[1]
|
|
err = Transform(
|
|
"logPrefix",
|
|
tx.(ethdb.HasTx).Tx().(ethdb.RwTx),
|
|
sourceBucket,
|
|
destBucket,
|
|
"", // temp dir
|
|
testExtractToMapFunc,
|
|
testLoadFromMapFunc,
|
|
TransformArgs{},
|
|
)
|
|
assert.Nil(t, err)
|
|
compareBuckets(t, tx, sourceBucket, destBucket, nil)
|
|
}
|
|
|
|
func TestTransformExtractStartKey(t *testing.T) {
|
|
// test invariant when we only have one buffer and it fits into RAM (exactly 1 buffer)
|
|
db := ethdb.NewTestDB(t)
|
|
tx, err := db.Begin(context.Background(), ethdb.RW)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer tx.Rollback()
|
|
sourceBucket := dbutils.Buckets[0]
|
|
destBucket := dbutils.Buckets[1]
|
|
generateTestData(t, tx, sourceBucket, 10)
|
|
err = Transform(
|
|
"logPrefix",
|
|
tx.(ethdb.HasTx).Tx().(ethdb.RwTx),
|
|
sourceBucket,
|
|
destBucket,
|
|
"", // temp dir
|
|
testExtractToMapFunc,
|
|
testLoadFromMapFunc,
|
|
TransformArgs{ExtractStartKey: []byte(fmt.Sprintf("%10d-key-%010d", 5, 5))},
|
|
)
|
|
assert.Nil(t, err)
|
|
compareBuckets(t, tx, sourceBucket, destBucket, []byte(fmt.Sprintf("%10d-key-%010d", 5, 5)))
|
|
}
|
|
|
|
func TestTransformThroughFiles(t *testing.T) {
|
|
// test invariant when we go through files (> 1 buffer)
|
|
db := ethdb.NewTestDB(t)
|
|
tx, err := db.Begin(context.Background(), ethdb.RW)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer tx.Rollback()
|
|
sourceBucket := dbutils.Buckets[0]
|
|
destBucket := dbutils.Buckets[1]
|
|
generateTestData(t, tx, sourceBucket, 10)
|
|
err = Transform(
|
|
"logPrefix",
|
|
tx.(ethdb.HasTx).Tx().(ethdb.RwTx),
|
|
sourceBucket,
|
|
destBucket,
|
|
"", // temp dir
|
|
testExtractToMapFunc,
|
|
testLoadFromMapFunc,
|
|
TransformArgs{
|
|
BufferSize: 1,
|
|
},
|
|
)
|
|
assert.Nil(t, err)
|
|
compareBuckets(t, tx, sourceBucket, destBucket, nil)
|
|
}
|
|
|
|
func TestTransformDoubleOnExtract(t *testing.T) {
|
|
// test invariant when extractFunc multiplies the data 2x
|
|
db := ethdb.NewTestDB(t)
|
|
tx, err := db.Begin(context.Background(), ethdb.RW)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer tx.Rollback()
|
|
sourceBucket := dbutils.Buckets[0]
|
|
destBucket := dbutils.Buckets[1]
|
|
generateTestData(t, tx, sourceBucket, 10)
|
|
err = Transform(
|
|
"logPrefix",
|
|
tx.(ethdb.HasTx).Tx().(ethdb.RwTx),
|
|
sourceBucket,
|
|
destBucket,
|
|
"", // temp dir
|
|
testExtractDoubleToMapFunc,
|
|
testLoadFromMapFunc,
|
|
TransformArgs{},
|
|
)
|
|
assert.Nil(t, err)
|
|
compareBucketsDouble(t, tx, sourceBucket, destBucket)
|
|
}
|
|
|
|
func TestTransformDoubleOnLoad(t *testing.T) {
|
|
// test invariant when loadFunc multiplies the data 2x
|
|
db := ethdb.NewTestDB(t)
|
|
tx, err := db.Begin(context.Background(), ethdb.RW)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer tx.Rollback()
|
|
sourceBucket := dbutils.Buckets[0]
|
|
destBucket := dbutils.Buckets[1]
|
|
generateTestData(t, tx, sourceBucket, 10)
|
|
err = Transform(
|
|
"logPrefix",
|
|
tx.(ethdb.HasTx).Tx().(ethdb.RwTx),
|
|
sourceBucket,
|
|
destBucket,
|
|
"", // temp dir
|
|
testExtractToMapFunc,
|
|
testLoadFromMapDoubleFunc,
|
|
TransformArgs{},
|
|
)
|
|
assert.Nil(t, err)
|
|
compareBucketsDouble(t, tx, sourceBucket, destBucket)
|
|
}
|
|
|
|
func generateTestData(t *testing.T, db ethdb.Putter, bucket string, count int) {
|
|
for i := 0; i < count; i++ {
|
|
k := []byte(fmt.Sprintf("%10d-key-%010d", i, i))
|
|
v := []byte(fmt.Sprintf("val-%099d", i))
|
|
err := db.Put(bucket, k, v)
|
|
assert.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
func testExtractToMapFunc(k, v []byte, next ExtractNextFunc) error {
|
|
buf := bytes.NewBuffer(nil)
|
|
encoder := codec.NewEncoder(nil, &cbor)
|
|
|
|
valueMap := make(map[string][]byte)
|
|
valueMap["value"] = v
|
|
encoder.Reset(buf)
|
|
encoder.MustEncode(valueMap)
|
|
return next(k, k, buf.Bytes())
|
|
}
|
|
|
|
func testExtractDoubleToMapFunc(k, v []byte, next ExtractNextFunc) error {
|
|
buf := bytes.NewBuffer(nil)
|
|
encoder := codec.NewEncoder(nil, &cbor)
|
|
|
|
valueMap := make(map[string][]byte)
|
|
valueMap["value"] = append(v, 0xAA)
|
|
k1 := append(k, 0xAA)
|
|
encoder.Reset(buf)
|
|
encoder.MustEncode(valueMap)
|
|
|
|
err := next(k, k1, buf.Bytes())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
valueMap = make(map[string][]byte)
|
|
valueMap["value"] = append(v, 0xBB)
|
|
k2 := append(k, 0xBB)
|
|
buf.Reset()
|
|
encoder.Reset(buf)
|
|
encoder.MustEncode(valueMap)
|
|
return next(k, k2, buf.Bytes())
|
|
}
|
|
|
|
func testLoadFromMapFunc(k []byte, v []byte, _ CurrentTableReader, next LoadNextFunc) error {
|
|
decoder := codec.NewDecoder(nil, &cbor)
|
|
decoder.ResetBytes(v)
|
|
valueMap := make(map[string][]byte)
|
|
err := decoder.Decode(&valueMap)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
realValue := valueMap["value"]
|
|
return next(k, k, realValue)
|
|
}
|
|
|
|
func testLoadFromMapDoubleFunc(k []byte, v []byte, _ CurrentTableReader, next LoadNextFunc) error {
|
|
decoder := codec.NewDecoder(nil, &cbor)
|
|
decoder.ResetBytes(v)
|
|
|
|
valueMap := make(map[string][]byte)
|
|
err := decoder.Decode(valueMap)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
realValue := valueMap["value"]
|
|
|
|
err = next(k, append(k, 0xAA), append(realValue, 0xAA))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return next(k, append(k, 0xBB), append(realValue, 0xBB))
|
|
}
|
|
|
|
func compareBuckets(t *testing.T, db ethdb.Database, b1, b2 string, startKey []byte) {
|
|
t.Helper()
|
|
b1Map := make(map[string]string)
|
|
err := db.Walk(b1, startKey, len(startKey), func(k, v []byte) (bool, error) {
|
|
b1Map[fmt.Sprintf("%x", k)] = fmt.Sprintf("%x", v)
|
|
return true, nil
|
|
})
|
|
assert.NoError(t, err)
|
|
b2Map := make(map[string]string)
|
|
err = db.Walk(b2, nil, 0, func(k, v []byte) (bool, error) {
|
|
b2Map[fmt.Sprintf("%x", k)] = fmt.Sprintf("%x", v)
|
|
return true, nil
|
|
})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, b1Map, b2Map)
|
|
}
|
|
|
|
func compareBucketsDouble(t *testing.T, db ethdb.Database, b1, b2 string) {
|
|
t.Helper()
|
|
b1Map := make(map[string]string)
|
|
err := db.Walk(b1, nil, 0, func(k, v []byte) (bool, error) {
|
|
b1Map[fmt.Sprintf("%x", append(k, 0xAA))] = fmt.Sprintf("%x", append(v, 0xAA))
|
|
b1Map[fmt.Sprintf("%x", append(k, 0xBB))] = fmt.Sprintf("%x", append(v, 0xBB))
|
|
return true, nil
|
|
})
|
|
assert.NoError(t, err)
|
|
b2Map := make(map[string]string)
|
|
err = db.Walk(b2, nil, 0, func(k, v []byte) (bool, error) {
|
|
b2Map[fmt.Sprintf("%x", k)] = fmt.Sprintf("%x", v)
|
|
return true, nil
|
|
})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, b1Map, b2Map)
|
|
}
|