mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-14 23:08:20 +00:00
331dcd45eb
* squash * add --database flag to integration * clean * split to 2 buckets * split to 2 buckets * split to 2 buckets * split to 2 buckets * split to 2 buckets * save progress * save progress * improve test * improve test * save progress * change app logic * change app logic * return err from rawdb package * don't clean automatically * don't clean automatically * clean * clean * clean * don't rely on `make clean` * improve cbor code * clean * clean * clean * fix tests * rebase master * stop on error: headers stage * make TxDb walk and multiwalk safe * Fix panics Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
232 lines
6.4 KiB
Go
232 lines
6.4 KiB
Go
package migrations
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"path"
|
|
|
|
"github.com/ledgerwatch/turbo-geth/common"
|
|
"github.com/ledgerwatch/turbo-geth/common/dbutils"
|
|
"github.com/ledgerwatch/turbo-geth/common/etl"
|
|
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
|
|
"github.com/ledgerwatch/turbo-geth/ethdb"
|
|
"github.com/ledgerwatch/turbo-geth/log"
|
|
"github.com/ugorji/go/codec"
|
|
)
|
|
|
|
// migrations apply sequentially in order of this array, skips applied migrations
|
|
// it allows - don't worry about merge conflicts and use switch branches
|
|
// see also dbutils.Migrations - it stores context in which each transaction was exectured - useful for bug-reports
|
|
//
|
|
// Idempotency is expected
|
|
// Best practices to achieve Idempotency:
|
|
// - in dbutils/bucket.go add suffix for existing bucket variable, create new bucket with same variable name.
|
|
// Example:
|
|
// - SyncStageProgress = []byte("SSP1")
|
|
// + SyncStageProgressOld1 = []byte("SSP1")
|
|
// + SyncStageProgress = []byte("SSP2")
|
|
// - in the beginning of migration: check that old bucket exists, clear new bucket
|
|
// - in the end:drop old bucket (not in defer!).
|
|
// Example:
|
|
// Up: func(db ethdb.Database, tmpdir string, OnLoadCommit etl.LoadCommitHandler) error {
|
|
// if exists, err := db.(ethdb.BucketsMigrator).BucketExists(dbutils.SyncStageProgressOld1); err != nil {
|
|
// return err
|
|
// } else if !exists {
|
|
// return OnLoadCommit(db, nil, true)
|
|
// }
|
|
//
|
|
// if err := db.(ethdb.BucketsMigrator).ClearBuckets(dbutils.SyncStageProgress); err != nil {
|
|
// return err
|
|
// }
|
|
//
|
|
// extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error {
|
|
// ... // migration logic
|
|
// }
|
|
// if err := etl.Transform(...); err != nil {
|
|
// return err
|
|
// }
|
|
//
|
|
// if err := db.(ethdb.BucketsMigrator).DropBuckets(dbutils.SyncStageProgressOld1); err != nil { // clear old bucket
|
|
// return err
|
|
// }
|
|
// },
|
|
// - if you need migrate multiple buckets - create separate migration for each bucket
|
|
// - write test where apply migration twice
|
|
var migrations = []Migration{
|
|
stagesToUseNamedKeys,
|
|
unwindStagesToUseNamedKeys,
|
|
stagedsyncToUseStageBlockhashes,
|
|
unwindStagedsyncToUseStageBlockhashes,
|
|
dupSortHashState,
|
|
dupSortPlainState,
|
|
dupSortIH,
|
|
clearIndices,
|
|
resetIHBucketToRecoverDB,
|
|
receiptsCborEncode,
|
|
receiptsOnePerTx,
|
|
}
|
|
|
|
type Migration struct {
|
|
Name string
|
|
Up func(db ethdb.Database, tmpdir string, progress []byte, OnLoadCommitOnLoadCommit etl.LoadCommitHandler) error
|
|
}
|
|
|
|
var (
|
|
ErrMigrationNonUniqueName = fmt.Errorf("please provide unique migration name")
|
|
ErrMigrationCommitNotCalled = fmt.Errorf("migraion commit function was not called")
|
|
ErrMigrationETLFilesDeleted = fmt.Errorf("db migration progress was interrupted after extraction step and ETL files was deleted, please contact development team for help or re-sync from scratch")
|
|
)
|
|
|
|
func NewMigrator() *Migrator {
|
|
return &Migrator{
|
|
Migrations: migrations,
|
|
}
|
|
}
|
|
|
|
type Migrator struct {
|
|
Migrations []Migration
|
|
}
|
|
|
|
func AppliedMigrations(db ethdb.Database, withPayload bool) (map[string][]byte, error) {
|
|
applied := map[string][]byte{}
|
|
err := db.Walk(dbutils.Migrations, nil, 0, func(k []byte, v []byte) (bool, error) {
|
|
if bytes.HasPrefix(k, []byte("_progress_")) {
|
|
return true, nil
|
|
}
|
|
if withPayload {
|
|
applied[string(common.CopyBytes(k))] = common.CopyBytes(v)
|
|
} else {
|
|
applied[string(common.CopyBytes(k))] = []byte{}
|
|
}
|
|
return true, nil
|
|
})
|
|
return applied, err
|
|
}
|
|
|
|
func (m *Migrator) Apply(db ethdb.Database, tmpdir string) error {
|
|
if len(m.Migrations) == 0 {
|
|
return nil
|
|
}
|
|
|
|
applied, err1 := AppliedMigrations(db, false)
|
|
if err1 != nil {
|
|
return err1
|
|
}
|
|
|
|
// migration names must be unique, protection against people's mistake
|
|
uniqueNameCheck := map[string]bool{}
|
|
for i := range m.Migrations {
|
|
_, ok := uniqueNameCheck[m.Migrations[i].Name]
|
|
if ok {
|
|
return fmt.Errorf("%w, duplicate: %s", ErrMigrationNonUniqueName, m.Migrations[i].Name)
|
|
}
|
|
uniqueNameCheck[m.Migrations[i].Name] = true
|
|
}
|
|
|
|
tx, err1 := db.Begin(context.Background(), ethdb.RW)
|
|
if err1 != nil {
|
|
return err1
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
for i := range m.Migrations {
|
|
v := m.Migrations[i]
|
|
if _, ok := applied[v.Name]; ok {
|
|
continue
|
|
}
|
|
|
|
commitFuncCalled := false // commit function must be called if no error, protection against people's mistake
|
|
|
|
log.Info("Apply migration", "name", v.Name)
|
|
progress, err := tx.Get(dbutils.Migrations, []byte("_progress_"+v.Name))
|
|
if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) {
|
|
return err
|
|
}
|
|
|
|
if err = v.Up(tx, path.Join(tmpdir, "migrations", v.Name), progress, func(_ ethdb.Putter, key []byte, isDone bool) error {
|
|
if !isDone {
|
|
if key != nil {
|
|
err = tx.Put(dbutils.Migrations, []byte("_progress_"+v.Name), key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// do commit, but don't save partial progress
|
|
if err := tx.CommitAndBegin(context.Background()); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
commitFuncCalled = true
|
|
|
|
stagesProgress, err := MarshalMigrationPayload(tx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = tx.Put(dbutils.Migrations, []byte(v.Name), stagesProgress)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = tx.Delete(dbutils.Migrations, []byte("_progress_"+v.Name))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := tx.CommitAndBegin(context.Background()); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !commitFuncCalled {
|
|
return fmt.Errorf("%w: %s", ErrMigrationCommitNotCalled, v.Name)
|
|
}
|
|
log.Info("Applied migration", "name", v.Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func MarshalMigrationPayload(db ethdb.Getter) ([]byte, error) {
|
|
s := map[string][]byte{}
|
|
|
|
buf := bytes.NewBuffer(nil)
|
|
encoder := codec.NewEncoder(buf, &codec.CborHandle{})
|
|
|
|
for _, stage := range stages.AllStages {
|
|
v, err := db.Get(dbutils.SyncStageProgress, stage)
|
|
if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) {
|
|
return nil, err
|
|
}
|
|
if len(v) > 0 {
|
|
s[string(stage)] = common.CopyBytes(v)
|
|
}
|
|
|
|
v, err = db.Get(dbutils.SyncStageUnwind, stage)
|
|
if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) {
|
|
return nil, err
|
|
}
|
|
if len(v) > 0 {
|
|
s["unwind_"+string(stage)] = common.CopyBytes(v)
|
|
}
|
|
}
|
|
|
|
if err := encoder.Encode(s); err != nil {
|
|
return nil, err
|
|
}
|
|
return buf.Bytes(), nil
|
|
}
|
|
|
|
func UnmarshalMigrationPayload(data []byte) (map[string][]byte, error) {
|
|
s := map[string][]byte{}
|
|
|
|
if err := codec.NewDecoder(bytes.NewReader(data), &codec.CborHandle{}).Decode(&s); err != nil {
|
|
return nil, err
|
|
}
|
|
return s, nil
|
|
}
|