2020-02-13 17:45:02 +03:00
package migrations
import (
2020-08-05 17:13:35 +07:00
"bytes"
2020-09-29 00:18:36 +07:00
"context"
2021-04-24 16:46:29 +01:00
"encoding/binary"
2020-08-11 18:23:41 +07:00
"fmt"
2020-10-22 15:30:04 +07:00
"path"
2020-08-10 19:46:06 +02:00
2020-08-05 17:13:35 +07:00
"github.com/ledgerwatch/turbo-geth/common"
2020-02-13 17:45:02 +03:00
"github.com/ledgerwatch/turbo-geth/common/dbutils"
2020-08-05 17:13:35 +07:00
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
2020-02-13 17:45:02 +03:00
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
2020-08-05 17:13:35 +07:00
"github.com/ugorji/go/codec"
2020-02-13 17:45:02 +03:00
)
2020-08-05 17:13:35 +07:00
// migrations apply sequentially in order of this array, skips applied migrations
// it allows - don't worry about merge conflicts and use switch branches
// see also dbutils.Migrations - it stores context in which each transaction was exectured - useful for bug-reports
//
// Idempotency is expected
// Best practices to achieve Idempotency:
// - in dbutils/bucket.go add suffix for existing bucket variable, create new bucket with same variable name.
// Example:
// - SyncStageProgress = []byte("SSP1")
// + SyncStageProgressOld1 = []byte("SSP1")
// + SyncStageProgress = []byte("SSP2")
// - in the beginning of migration: check that old bucket exists, clear new bucket
// - in the end:drop old bucket (not in defer!).
// Example:
2020-10-23 18:18:45 +07:00
// Up: func(db ethdb.Database, tmpdir string, OnLoadCommit etl.LoadCommitHandler) error {
2020-09-09 02:39:43 +07:00
// if exists, err := db.(ethdb.BucketsMigrator).BucketExists(dbutils.SyncStageProgressOld1); err != nil {
2020-08-05 17:13:35 +07:00
// return err
// } else if !exists {
2020-08-10 16:16:14 +07:00
// return OnLoadCommit(db, nil, true)
2020-08-05 17:13:35 +07:00
// }
//
2020-09-09 02:39:43 +07:00
// if err := db.(ethdb.BucketsMigrator).ClearBuckets(dbutils.SyncStageProgress); err != nil {
2020-08-05 17:13:35 +07:00
// return err
// }
//
// extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error {
// ... // migration logic
// }
// if err := etl.Transform(...); err != nil {
// return err
// }
//
2020-09-09 02:39:43 +07:00
// if err := db.(ethdb.BucketsMigrator).DropBuckets(dbutils.SyncStageProgressOld1); err != nil { // clear old bucket
2020-08-05 17:13:35 +07:00
// return err
// }
// },
// - if you need migrate multiple buckets - create separate migration for each bucket
// - write test where apply migration twice
var migrations = [ ] Migration {
stagesToUseNamedKeys ,
unwindStagesToUseNamedKeys ,
2020-08-10 19:46:06 +02:00
stagedsyncToUseStageBlockhashes ,
unwindStagedsyncToUseStageBlockhashes ,
2020-08-15 14:11:40 +07:00
dupSortHashState ,
dupSortPlainState ,
2020-09-10 19:35:58 +07:00
dupSortIH ,
2020-09-29 00:18:36 +07:00
clearIndices ,
2020-09-29 16:28:49 +07:00
resetIHBucketToRecoverDB ,
2020-10-02 19:51:20 +07:00
receiptsCborEncode ,
2020-10-25 15:38:55 +07:00
receiptsOnePerTx ,
2020-11-16 19:08:28 +07:00
accChangeSetDupSort ,
storageChangeSetDupSort ,
2020-11-23 04:25:26 +07:00
transactionsTable ,
2020-12-05 04:16:51 +07:00
historyAccBitmap ,
historyStorageBitmap ,
2021-02-22 01:41:59 +07:00
splitHashStateBucket ,
splitIHBucket ,
2021-03-12 16:26:39 +07:00
deleteExtensionHashesFromTrieBucket ,
2021-03-19 15:54:47 +03:00
headerPrefixToSeparateBuckets ,
2021-04-20 00:58:05 +03:00
removeCliqueBucket ,
2021-04-24 16:46:29 +01:00
dbSchemaVersion ,
2020-08-05 17:13:35 +07:00
}
2020-02-13 17:45:02 +03:00
type Migration struct {
Name string
2020-10-23 18:18:45 +07:00
Up func ( db ethdb . Database , tmpdir string , progress [ ] byte , OnLoadCommitOnLoadCommit etl . LoadCommitHandler ) error
2020-02-13 17:45:02 +03:00
}
2020-08-11 18:23:41 +07:00
var (
ErrMigrationNonUniqueName = fmt . Errorf ( "please provide unique migration name" )
2021-04-20 00:58:05 +03:00
ErrMigrationCommitNotCalled = fmt . Errorf ( "migration commit function was not called" )
2020-10-20 02:20:18 +07:00
ErrMigrationETLFilesDeleted = fmt . Errorf ( "db migration progress was interrupted after extraction step and ETL files was deleted, please contact development team for help or re-sync from scratch" )
2020-08-11 18:23:41 +07:00
)
2020-02-13 17:45:02 +03:00
func NewMigrator ( ) * Migrator {
return & Migrator {
Migrations : migrations ,
}
}
type Migrator struct {
Migrations [ ] Migration
}
2020-08-05 17:13:35 +07:00
func AppliedMigrations ( db ethdb . Database , withPayload bool ) ( map [ string ] [ ] byte , error ) {
applied := map [ string ] [ ] byte { }
err := db . Walk ( dbutils . Migrations , nil , 0 , func ( k [ ] byte , v [ ] byte ) ( bool , error ) {
2020-10-20 02:20:18 +07:00
if bytes . HasPrefix ( k , [ ] byte ( "_progress_" ) ) {
return true , nil
}
2020-08-05 17:13:35 +07:00
if withPayload {
applied [ string ( common . CopyBytes ( k ) ) ] = common . CopyBytes ( v )
} else {
applied [ string ( common . CopyBytes ( k ) ) ] = [ ] byte { }
}
return true , nil
} )
return applied , err
}
2020-10-28 16:52:15 +07:00
func ( m * Migrator ) HasPendingMigrations ( db ethdb . Database ) ( bool , error ) {
pending , err := m . PendingMigrations ( db )
if err != nil {
return false , err
}
return len ( pending ) > 0 , nil
}
func ( m * Migrator ) PendingMigrations ( db ethdb . Database ) ( [ ] Migration , error ) {
applied , err := AppliedMigrations ( db , false )
if err != nil {
return nil , err
}
counter := 0
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
counter ++
}
pending := make ( [ ] Migration , 0 , counter )
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
pending = append ( pending , v )
}
return pending , nil
}
2021-04-26 19:51:01 +07:00
func ( m * Migrator ) Apply ( db ethdb . Database , datadir string ) error {
2020-02-13 17:45:02 +03:00
if len ( m . Migrations ) == 0 {
return nil
}
2020-10-20 02:20:18 +07:00
applied , err1 := AppliedMigrations ( db , false )
if err1 != nil {
return err1
2020-02-13 17:45:02 +03:00
}
2020-08-11 18:23:41 +07:00
// migration names must be unique, protection against people's mistake
uniqueNameCheck := map [ string ] bool { }
for i := range m . Migrations {
_ , ok := uniqueNameCheck [ m . Migrations [ i ] . Name ]
if ok {
return fmt . Errorf ( "%w, duplicate: %s" , ErrMigrationNonUniqueName , m . Migrations [ i ] . Name )
}
uniqueNameCheck [ m . Migrations [ i ] . Name ] = true
}
2020-10-25 15:38:55 +07:00
tx , err1 := db . Begin ( context . Background ( ) , ethdb . RW )
2020-10-20 02:20:18 +07:00
if err1 != nil {
return err1
2020-09-09 02:39:43 +07:00
}
defer tx . Rollback ( )
2020-08-05 17:13:35 +07:00
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
2020-08-11 18:23:41 +07:00
commitFuncCalled := false // commit function must be called if no error, protection against people's mistake
2020-08-05 17:13:35 +07:00
log . Info ( "Apply migration" , "name" , v . Name )
2021-04-05 16:04:58 +03:00
progress , err := tx . GetOne ( dbutils . Migrations , [ ] byte ( "_progress_" + v . Name ) )
if err != nil {
2020-10-20 02:20:18 +07:00
return err
}
2021-04-26 19:51:01 +07:00
if err = v . Up ( tx , path . Join ( datadir , "migrations" , v . Name ) , progress , func ( _ ethdb . Putter , key [ ] byte , isDone bool ) error {
2020-08-05 17:13:35 +07:00
if ! isDone {
2020-10-20 02:20:18 +07:00
if key != nil {
err = tx . Put ( dbutils . Migrations , [ ] byte ( "_progress_" + v . Name ) , key )
if err != nil {
return err
}
}
2020-10-19 13:43:30 +07:00
// do commit, but don't save partial progress
if err := tx . CommitAndBegin ( context . Background ( ) ) ; err != nil {
return err
}
return nil
2020-08-05 17:13:35 +07:00
}
2020-08-11 18:23:41 +07:00
commitFuncCalled = true
2020-09-09 02:39:43 +07:00
stagesProgress , err := MarshalMigrationPayload ( tx )
2020-08-05 17:13:35 +07:00
if err != nil {
return err
}
2020-10-20 02:20:18 +07:00
err = tx . Put ( dbutils . Migrations , [ ] byte ( v . Name ) , stagesProgress )
if err != nil {
return err
}
2020-10-29 20:19:31 +07:00
err = tx . Delete ( dbutils . Migrations , [ ] byte ( "_progress_" + v . Name ) , nil )
2020-08-05 17:13:35 +07:00
if err != nil {
return err
}
2020-09-09 02:39:43 +07:00
2020-09-29 00:18:36 +07:00
if err := tx . CommitAndBegin ( context . Background ( ) ) ; err != nil {
2020-09-09 02:39:43 +07:00
return err
}
2020-08-05 17:13:35 +07:00
return nil
} ) ; err != nil {
return err
2020-02-13 17:45:02 +03:00
}
2020-08-05 17:13:35 +07:00
2020-08-11 18:23:41 +07:00
if ! commitFuncCalled {
2020-08-12 09:57:55 +07:00
return fmt . Errorf ( "%w: %s" , ErrMigrationCommitNotCalled , v . Name )
2020-08-11 18:23:41 +07:00
}
2020-08-05 17:13:35 +07:00
log . Info ( "Applied migration" , "name" , v . Name )
2020-02-13 17:45:02 +03:00
}
2021-04-24 16:46:29 +01:00
// Write DB schema version
var version [ 12 ] byte
binary . BigEndian . PutUint32 ( version [ : ] , dbutils . DBSchemaVersion . Major )
binary . BigEndian . PutUint32 ( version [ 4 : ] , dbutils . DBSchemaVersion . Minor )
binary . BigEndian . PutUint32 ( version [ 8 : ] , dbutils . DBSchemaVersion . Patch )
if err := tx . Put ( dbutils . DatabaseInfoBucket , dbutils . DBSchemaVersionKey , version [ : ] ) ; err != nil {
return fmt . Errorf ( "writing DB schema version: %w" , err )
}
if err := tx . Commit ( ) ; err != nil {
return fmt . Errorf ( "committing DB version update: %w" , err )
}
log . Info ( "Updated DB schema to" , "version" , fmt . Sprintf ( "%d.%d.%d" , dbutils . DBSchemaVersion . Major , dbutils . DBSchemaVersion . Minor , dbutils . DBSchemaVersion . Patch ) )
2020-08-05 17:13:35 +07:00
return nil
}
2020-08-04 16:25:28 +07:00
2020-08-05 17:13:35 +07:00
func MarshalMigrationPayload ( db ethdb . Getter ) ( [ ] byte , error ) {
s := map [ string ] [ ] byte { }
buf := bytes . NewBuffer ( nil )
encoder := codec . NewEncoder ( buf , & codec . CborHandle { } )
2020-09-05 17:07:27 +01:00
for _ , stage := range stages . AllStages {
2021-04-21 15:02:21 +07:00
v , err := db . GetOne ( dbutils . SyncStageProgress , [ ] byte ( stage ) )
2021-04-05 16:04:58 +03:00
if err != nil {
2020-08-05 17:13:35 +07:00
return nil , err
2020-08-04 16:25:28 +07:00
}
2020-08-05 17:13:35 +07:00
if len ( v ) > 0 {
2020-09-05 17:07:27 +01:00
s [ string ( stage ) ] = common . CopyBytes ( v )
2020-08-05 17:13:35 +07:00
}
2021-04-21 15:02:21 +07:00
v , err = db . GetOne ( dbutils . SyncStageUnwind , [ ] byte ( stage ) )
2021-04-05 16:04:58 +03:00
if err != nil {
2020-08-05 17:13:35 +07:00
return nil , err
}
if len ( v ) > 0 {
2020-09-05 17:07:27 +01:00
s [ "unwind_" + string ( stage ) ] = common . CopyBytes ( v )
2020-02-13 17:45:02 +03:00
}
}
2020-08-05 17:13:35 +07:00
if err := encoder . Encode ( s ) ; err != nil {
return nil , err
}
return buf . Bytes ( ) , nil
2020-02-13 17:45:02 +03:00
}
2020-08-05 17:13:35 +07:00
func UnmarshalMigrationPayload ( data [ ] byte ) ( map [ string ] [ ] byte , error ) {
s := map [ string ] [ ] byte { }
if err := codec . NewDecoder ( bytes . NewReader ( data ) , & codec . CborHandle { } ) . Decode ( & s ) ; err != nil {
return nil , err
}
return s , nil
}