mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-03 09:37:38 +00:00
15b4095718
* Move ETL to erigon-lib * Update link in the readme * go mod tidy * Use common/chan.go from erigon-lib * Clean up * Fix lint * Fix test * Fix compilation Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
178 lines
5.0 KiB
Go
178 lines
5.0 KiB
Go
package migrations
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/ledgerwatch/erigon-lib/etl"
|
|
"github.com/ledgerwatch/erigon-lib/kv"
|
|
"github.com/ledgerwatch/erigon/common"
|
|
"github.com/ledgerwatch/erigon/common/dbutils"
|
|
)
|
|
|
|
var headerPrefixToSeparateBuckets = Migration{
|
|
Name: "header_prefix_to_separate_buckets",
|
|
Up: func(db kv.RwDB, tmpdir string, progress []byte, BeforeCommit Callback) (err error) {
|
|
tx, err := db.BeginRw(context.Background())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
exists, err := tx.ExistsBucket(kv.HeaderPrefixOld)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !exists {
|
|
if err := BeforeCommit(tx, nil, true); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
if err = tx.ClearBucket(kv.HeaderCanonical); err != nil {
|
|
return err
|
|
}
|
|
if err = tx.ClearBucket(kv.HeaderTD); err != nil {
|
|
return err
|
|
}
|
|
logPrefix := "split_header_prefix_bucket"
|
|
const loadStep = "load"
|
|
|
|
canonicalCollector, err := etl.NewCollectorFromFiles(tmpdir + "canonical")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tdCollector, err := etl.NewCollectorFromFiles(tmpdir + "td")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
headersCollector, err := etl.NewCollectorFromFiles(tmpdir + "headers")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch string(progress) {
|
|
case "":
|
|
// can't use files if progress field not set, clear them
|
|
if canonicalCollector != nil {
|
|
canonicalCollector.Close(logPrefix)
|
|
canonicalCollector = nil
|
|
}
|
|
|
|
if tdCollector != nil {
|
|
tdCollector.Close(logPrefix)
|
|
tdCollector = nil
|
|
}
|
|
if headersCollector != nil {
|
|
headersCollector.Close(logPrefix)
|
|
headersCollector = nil
|
|
}
|
|
case loadStep:
|
|
if headersCollector == nil || canonicalCollector == nil || tdCollector == nil {
|
|
return ErrMigrationETLFilesDeleted
|
|
}
|
|
defer func() {
|
|
// don't clean if error or panic happened
|
|
if err != nil {
|
|
return
|
|
}
|
|
if rec := recover(); rec != nil {
|
|
panic(rec)
|
|
}
|
|
canonicalCollector.Close(logPrefix)
|
|
tdCollector.Close(logPrefix)
|
|
headersCollector.Close(logPrefix)
|
|
}()
|
|
goto LoadStep
|
|
}
|
|
|
|
canonicalCollector = etl.NewCriticalCollector(tmpdir+"canonical", etl.NewSortableBuffer(etl.BufferOptimalSize*4))
|
|
tdCollector = etl.NewCriticalCollector(tmpdir+"td", etl.NewSortableBuffer(etl.BufferOptimalSize*4))
|
|
headersCollector = etl.NewCriticalCollector(tmpdir+"headers", etl.NewSortableBuffer(etl.BufferOptimalSize*4))
|
|
defer func() {
|
|
// don't clean if error or panic happened
|
|
if err != nil {
|
|
return
|
|
}
|
|
if rec := recover(); rec != nil {
|
|
panic(rec)
|
|
}
|
|
canonicalCollector.Close(logPrefix)
|
|
tdCollector.Close(logPrefix)
|
|
headersCollector.Close(logPrefix)
|
|
}()
|
|
|
|
err = tx.ForEach(kv.HeaderPrefixOld, []byte{}, func(k, v []byte) error {
|
|
var innerErr error
|
|
switch {
|
|
case IsHeaderKey(k):
|
|
innerErr = headersCollector.Collect(k, v)
|
|
case IsHeaderTDKey(k):
|
|
innerErr = tdCollector.Collect(bytes.TrimSuffix(k, HeaderTDSuffix), v)
|
|
case IsHeaderHashKey(k):
|
|
innerErr = canonicalCollector.Collect(bytes.TrimSuffix(k, HeaderHashSuffix), v)
|
|
default:
|
|
return fmt.Errorf("incorrect header prefix key: %v", common.Bytes2Hex(k))
|
|
}
|
|
if innerErr != nil {
|
|
return innerErr
|
|
}
|
|
return nil
|
|
})
|
|
if err = tx.DropBucket(kv.HeaderPrefixOld); err != nil {
|
|
return err
|
|
}
|
|
|
|
LoadStep:
|
|
// Now transaction would have been re-opened, and we should be re-using the space
|
|
if err = canonicalCollector.Load(logPrefix, tx, kv.HeaderCanonical, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
|
|
return fmt.Errorf("loading the transformed data back into the storage table: %w", err)
|
|
}
|
|
if err = tdCollector.Load(logPrefix, tx, kv.HeaderTD, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
|
|
return fmt.Errorf("loading the transformed data back into the acc table: %w", err)
|
|
}
|
|
if err = headersCollector.Load(logPrefix, tx, kv.Headers, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
|
|
return fmt.Errorf("loading the transformed data back into the acc table: %w", err)
|
|
}
|
|
if err := BeforeCommit(tx, nil, true); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
},
|
|
}
|
|
|
|
func IsHeaderKey(k []byte) bool {
|
|
l := common.BlockNumberLength + common.HashLength
|
|
if len(k) != l {
|
|
return false
|
|
}
|
|
|
|
return !IsHeaderHashKey(k) && !IsHeaderTDKey(k)
|
|
}
|
|
|
|
func IsHeaderTDKey(k []byte) bool {
|
|
l := common.BlockNumberLength + common.HashLength + 1
|
|
return len(k) == l && bytes.Equal(k[l-1:], HeaderTDSuffix)
|
|
}
|
|
|
|
// headerHashKey = headerPrefix + num (uint64 big endian) + headerHashSuffix
|
|
func HeaderHashKey(number uint64) []byte {
|
|
return append(dbutils.EncodeBlockNumber(number), HeaderHashSuffix...)
|
|
}
|
|
|
|
func CheckCanonicalKey(k []byte) bool {
|
|
return len(k) == 8+len(HeaderHashSuffix) && bytes.Equal(k[8:], HeaderHashSuffix)
|
|
}
|
|
|
|
func IsHeaderHashKey(k []byte) bool {
|
|
l := common.BlockNumberLength + 1
|
|
return len(k) == l && bytes.Equal(k[l-1:], HeaderHashSuffix)
|
|
}
|
|
|
|
var (
|
|
HeaderTDSuffix = []byte("t") // block_num_u64 + hash + headerTDSuffix -> td
|
|
HeaderHashSuffix = []byte("n") // block_num_u64 + headerHashSuffix -> hash
|
|
)
|