2020-02-13 14:45:02 +00:00
package migrations
import (
2020-08-05 10:13:35 +00:00
"bytes"
2020-09-28 17:18:36 +00:00
"context"
2020-08-05 10:13:35 +00:00
"errors"
2020-08-11 11:23:41 +00:00
"fmt"
2020-10-22 08:30:04 +00:00
"path"
2020-08-10 17:46:06 +00:00
2020-08-05 10:13:35 +00:00
"github.com/ledgerwatch/turbo-geth/common"
2020-02-13 14:45:02 +00:00
"github.com/ledgerwatch/turbo-geth/common/dbutils"
2020-08-05 10:13:35 +00:00
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
2020-02-13 14:45:02 +00:00
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
2020-08-05 10:13:35 +00:00
"github.com/ugorji/go/codec"
2020-02-13 14:45:02 +00:00
)
2020-08-05 10:13:35 +00:00
// migrations apply sequentially in order of this array, skips applied migrations
// it allows - don't worry about merge conflicts and use switch branches
// see also dbutils.Migrations - it stores context in which each transaction was exectured - useful for bug-reports
//
// Idempotency is expected
// Best practices to achieve Idempotency:
// - in dbutils/bucket.go add suffix for existing bucket variable, create new bucket with same variable name.
// Example:
// - SyncStageProgress = []byte("SSP1")
// + SyncStageProgressOld1 = []byte("SSP1")
// + SyncStageProgress = []byte("SSP2")
// - in the beginning of migration: check that old bucket exists, clear new bucket
// - in the end:drop old bucket (not in defer!).
// Example:
2020-10-23 11:18:45 +00:00
// Up: func(db ethdb.Database, tmpdir string, OnLoadCommit etl.LoadCommitHandler) error {
2020-09-08 19:39:43 +00:00
// if exists, err := db.(ethdb.BucketsMigrator).BucketExists(dbutils.SyncStageProgressOld1); err != nil {
2020-08-05 10:13:35 +00:00
// return err
// } else if !exists {
2020-08-10 09:16:14 +00:00
// return OnLoadCommit(db, nil, true)
2020-08-05 10:13:35 +00:00
// }
//
2020-09-08 19:39:43 +00:00
// if err := db.(ethdb.BucketsMigrator).ClearBuckets(dbutils.SyncStageProgress); err != nil {
2020-08-05 10:13:35 +00:00
// return err
// }
//
// extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error {
// ... // migration logic
// }
// if err := etl.Transform(...); err != nil {
// return err
// }
//
2020-09-08 19:39:43 +00:00
// if err := db.(ethdb.BucketsMigrator).DropBuckets(dbutils.SyncStageProgressOld1); err != nil { // clear old bucket
2020-08-05 10:13:35 +00:00
// return err
// }
// },
// - if you need migrate multiple buckets - create separate migration for each bucket
// - write test where apply migration twice
var migrations = [ ] Migration {
stagesToUseNamedKeys ,
unwindStagesToUseNamedKeys ,
2020-08-10 17:46:06 +00:00
stagedsyncToUseStageBlockhashes ,
unwindStagedsyncToUseStageBlockhashes ,
2020-08-15 07:11:40 +00:00
dupSortHashState ,
dupSortPlainState ,
2020-09-10 12:35:58 +00:00
dupSortIH ,
2020-09-28 17:18:36 +00:00
clearIndices ,
2020-09-29 09:28:49 +00:00
resetIHBucketToRecoverDB ,
2020-10-02 12:51:20 +00:00
receiptsCborEncode ,
2020-10-25 08:38:55 +00:00
receiptsOnePerTx ,
2020-11-16 12:08:28 +00:00
accChangeSetDupSort ,
storageChangeSetDupSort ,
2020-11-22 21:25:26 +00:00
transactionsTable ,
2020-12-04 21:16:51 +00:00
historyAccBitmap ,
historyStorageBitmap ,
2021-02-21 18:41:59 +00:00
splitHashStateBucket ,
splitIHBucket ,
2020-08-05 10:13:35 +00:00
}
2020-02-13 14:45:02 +00:00
type Migration struct {
Name string
2020-10-23 11:18:45 +00:00
Up func ( db ethdb . Database , tmpdir string , progress [ ] byte , OnLoadCommitOnLoadCommit etl . LoadCommitHandler ) error
2020-02-13 14:45:02 +00:00
}
2020-08-11 11:23:41 +00:00
var (
ErrMigrationNonUniqueName = fmt . Errorf ( "please provide unique migration name" )
ErrMigrationCommitNotCalled = fmt . Errorf ( "migraion commit function was not called" )
2020-10-19 19:20:18 +00:00
ErrMigrationETLFilesDeleted = fmt . Errorf ( "db migration progress was interrupted after extraction step and ETL files was deleted, please contact development team for help or re-sync from scratch" )
2020-08-11 11:23:41 +00:00
)
2020-02-13 14:45:02 +00:00
func NewMigrator ( ) * Migrator {
return & Migrator {
Migrations : migrations ,
}
}
type Migrator struct {
Migrations [ ] Migration
}
2020-08-05 10:13:35 +00:00
func AppliedMigrations ( db ethdb . Database , withPayload bool ) ( map [ string ] [ ] byte , error ) {
applied := map [ string ] [ ] byte { }
err := db . Walk ( dbutils . Migrations , nil , 0 , func ( k [ ] byte , v [ ] byte ) ( bool , error ) {
2020-10-19 19:20:18 +00:00
if bytes . HasPrefix ( k , [ ] byte ( "_progress_" ) ) {
return true , nil
}
2020-08-05 10:13:35 +00:00
if withPayload {
applied [ string ( common . CopyBytes ( k ) ) ] = common . CopyBytes ( v )
} else {
applied [ string ( common . CopyBytes ( k ) ) ] = [ ] byte { }
}
return true , nil
} )
return applied , err
}
2020-10-28 09:52:15 +00:00
func ( m * Migrator ) HasPendingMigrations ( db ethdb . Database ) ( bool , error ) {
pending , err := m . PendingMigrations ( db )
if err != nil {
return false , err
}
return len ( pending ) > 0 , nil
}
func ( m * Migrator ) PendingMigrations ( db ethdb . Database ) ( [ ] Migration , error ) {
applied , err := AppliedMigrations ( db , false )
if err != nil {
return nil , err
}
counter := 0
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
counter ++
}
pending := make ( [ ] Migration , 0 , counter )
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
pending = append ( pending , v )
}
return pending , nil
}
2020-10-23 11:18:45 +00:00
func ( m * Migrator ) Apply ( db ethdb . Database , tmpdir string ) error {
2020-02-13 14:45:02 +00:00
if len ( m . Migrations ) == 0 {
return nil
}
2020-10-19 19:20:18 +00:00
applied , err1 := AppliedMigrations ( db , false )
if err1 != nil {
return err1
2020-02-13 14:45:02 +00:00
}
2020-08-11 11:23:41 +00:00
// migration names must be unique, protection against people's mistake
uniqueNameCheck := map [ string ] bool { }
for i := range m . Migrations {
_ , ok := uniqueNameCheck [ m . Migrations [ i ] . Name ]
if ok {
return fmt . Errorf ( "%w, duplicate: %s" , ErrMigrationNonUniqueName , m . Migrations [ i ] . Name )
}
uniqueNameCheck [ m . Migrations [ i ] . Name ] = true
}
2020-10-25 08:38:55 +00:00
tx , err1 := db . Begin ( context . Background ( ) , ethdb . RW )
2020-10-19 19:20:18 +00:00
if err1 != nil {
return err1
2020-09-08 19:39:43 +00:00
}
defer tx . Rollback ( )
2020-08-05 10:13:35 +00:00
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
2020-08-11 11:23:41 +00:00
commitFuncCalled := false // commit function must be called if no error, protection against people's mistake
2020-08-05 10:13:35 +00:00
log . Info ( "Apply migration" , "name" , v . Name )
2020-10-19 19:20:18 +00:00
progress , err := tx . Get ( dbutils . Migrations , [ ] byte ( "_progress_" + v . Name ) )
if err != nil && ! errors . Is ( err , ethdb . ErrKeyNotFound ) {
return err
}
2020-10-23 11:18:45 +00:00
if err = v . Up ( tx , path . Join ( tmpdir , "migrations" , v . Name ) , progress , func ( _ ethdb . Putter , key [ ] byte , isDone bool ) error {
2020-08-05 10:13:35 +00:00
if ! isDone {
2020-10-19 19:20:18 +00:00
if key != nil {
err = tx . Put ( dbutils . Migrations , [ ] byte ( "_progress_" + v . Name ) , key )
if err != nil {
return err
}
}
2020-10-19 06:43:30 +00:00
// do commit, but don't save partial progress
if err := tx . CommitAndBegin ( context . Background ( ) ) ; err != nil {
return err
}
return nil
2020-08-05 10:13:35 +00:00
}
2020-08-11 11:23:41 +00:00
commitFuncCalled = true
2020-09-08 19:39:43 +00:00
stagesProgress , err := MarshalMigrationPayload ( tx )
2020-08-05 10:13:35 +00:00
if err != nil {
return err
}
2020-10-19 19:20:18 +00:00
err = tx . Put ( dbutils . Migrations , [ ] byte ( v . Name ) , stagesProgress )
if err != nil {
return err
}
2020-10-29 13:19:31 +00:00
err = tx . Delete ( dbutils . Migrations , [ ] byte ( "_progress_" + v . Name ) , nil )
2020-08-05 10:13:35 +00:00
if err != nil {
return err
}
2020-09-08 19:39:43 +00:00
2020-09-28 17:18:36 +00:00
if err := tx . CommitAndBegin ( context . Background ( ) ) ; err != nil {
2020-09-08 19:39:43 +00:00
return err
}
2020-08-05 10:13:35 +00:00
return nil
} ) ; err != nil {
return err
2020-02-13 14:45:02 +00:00
}
2020-08-05 10:13:35 +00:00
2020-08-11 11:23:41 +00:00
if ! commitFuncCalled {
2020-08-12 02:57:55 +00:00
return fmt . Errorf ( "%w: %s" , ErrMigrationCommitNotCalled , v . Name )
2020-08-11 11:23:41 +00:00
}
2020-08-05 10:13:35 +00:00
log . Info ( "Applied migration" , "name" , v . Name )
2020-02-13 14:45:02 +00:00
}
2020-08-05 10:13:35 +00:00
return nil
}
2020-08-04 09:25:28 +00:00
2020-08-05 10:13:35 +00:00
func MarshalMigrationPayload ( db ethdb . Getter ) ( [ ] byte , error ) {
s := map [ string ] [ ] byte { }
buf := bytes . NewBuffer ( nil )
encoder := codec . NewEncoder ( buf , & codec . CborHandle { } )
2020-09-05 16:07:27 +00:00
for _ , stage := range stages . AllStages {
v , err := db . Get ( dbutils . SyncStageProgress , stage )
2020-08-05 10:13:35 +00:00
if err != nil && ! errors . Is ( err , ethdb . ErrKeyNotFound ) {
return nil , err
2020-08-04 09:25:28 +00:00
}
2020-08-05 10:13:35 +00:00
if len ( v ) > 0 {
2020-09-05 16:07:27 +00:00
s [ string ( stage ) ] = common . CopyBytes ( v )
2020-08-05 10:13:35 +00:00
}
2020-09-05 16:07:27 +00:00
v , err = db . Get ( dbutils . SyncStageUnwind , stage )
2020-08-05 10:13:35 +00:00
if err != nil && ! errors . Is ( err , ethdb . ErrKeyNotFound ) {
return nil , err
}
if len ( v ) > 0 {
2020-09-05 16:07:27 +00:00
s [ "unwind_" + string ( stage ) ] = common . CopyBytes ( v )
2020-02-13 14:45:02 +00:00
}
}
2020-08-05 10:13:35 +00:00
if err := encoder . Encode ( s ) ; err != nil {
return nil , err
}
return buf . Bytes ( ) , nil
2020-02-13 14:45:02 +00:00
}
2020-08-05 10:13:35 +00:00
func UnmarshalMigrationPayload ( data [ ] byte ) ( map [ string ] [ ] byte , error ) {
s := map [ string ] [ ] byte { }
if err := codec . NewDecoder ( bytes . NewReader ( data ) , & codec . CborHandle { } ) . Decode ( & s ) ; err != nil {
return nil , err
}
return s , nil
}