2020-02-13 14:45:02 +00:00
package migrations
import (
2020-08-05 10:13:35 +00:00
"bytes"
2020-09-28 17:18:36 +00:00
"context"
2021-04-24 15:46:29 +00:00
"encoding/binary"
2020-08-11 11:23:41 +00:00
"fmt"
2020-10-22 08:30:04 +00:00
"path"
2020-08-10 17:46:06 +00:00
2021-05-20 18:25:53 +00:00
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/common/etl"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb"
2021-07-04 07:50:32 +00:00
kv2 "github.com/ledgerwatch/erigon/ethdb/kv"
2021-05-20 18:25:53 +00:00
"github.com/ledgerwatch/erigon/log"
2020-08-05 10:13:35 +00:00
"github.com/ugorji/go/codec"
2020-02-13 14:45:02 +00:00
)
2020-08-05 10:13:35 +00:00
// migrations apply sequentially in order of this array, skips applied migrations
// it allows - don't worry about merge conflicts and use switch branches
// see also dbutils.Migrations - it stores context in which each transaction was exectured - useful for bug-reports
//
// Idempotency is expected
// Best practices to achieve Idempotency:
// - in dbutils/bucket.go add suffix for existing bucket variable, create new bucket with same variable name.
// Example:
// - SyncStageProgress = []byte("SSP1")
// + SyncStageProgressOld1 = []byte("SSP1")
// + SyncStageProgress = []byte("SSP2")
// - in the beginning of migration: check that old bucket exists, clear new bucket
// - in the end:drop old bucket (not in defer!).
// Example:
2020-10-23 11:18:45 +00:00
// Up: func(db ethdb.Database, tmpdir string, OnLoadCommit etl.LoadCommitHandler) error {
2020-09-08 19:39:43 +00:00
// if exists, err := db.(ethdb.BucketsMigrator).BucketExists(dbutils.SyncStageProgressOld1); err != nil {
2020-08-05 10:13:35 +00:00
// return err
// } else if !exists {
2020-08-10 09:16:14 +00:00
// return OnLoadCommit(db, nil, true)
2020-08-05 10:13:35 +00:00
// }
//
2020-09-08 19:39:43 +00:00
// if err := db.(ethdb.BucketsMigrator).ClearBuckets(dbutils.SyncStageProgress); err != nil {
2020-08-05 10:13:35 +00:00
// return err
// }
//
// extractFunc := func(k []byte, v []byte, next etl.ExtractNextFunc) error {
// ... // migration logic
// }
// if err := etl.Transform(...); err != nil {
// return err
// }
//
2020-09-08 19:39:43 +00:00
// if err := db.(ethdb.BucketsMigrator).DropBuckets(dbutils.SyncStageProgressOld1); err != nil { // clear old bucket
2020-08-05 10:13:35 +00:00
// return err
// }
// },
// - if you need migrate multiple buckets - create separate migration for each bucket
// - write test where apply migration twice
2021-06-04 14:56:49 +00:00
var migrations = map [ ethdb . Label ] [ ] Migration {
ethdb . Chain : {
headerPrefixToSeparateBuckets ,
removeCliqueBucket ,
dbSchemaVersion ,
rebuilCallTraceIndex ,
2021-06-20 19:46:57 +00:00
fixSequences ,
2021-06-04 14:56:49 +00:00
} ,
ethdb . TxPool : { } ,
ethdb . Sentry : { } ,
2020-08-05 10:13:35 +00:00
}
2020-02-13 14:45:02 +00:00
type Migration struct {
Name string
2020-10-23 11:18:45 +00:00
Up func ( db ethdb . Database , tmpdir string , progress [ ] byte , OnLoadCommitOnLoadCommit etl . LoadCommitHandler ) error
2020-02-13 14:45:02 +00:00
}
2020-08-11 11:23:41 +00:00
var (
ErrMigrationNonUniqueName = fmt . Errorf ( "please provide unique migration name" )
2021-04-19 21:58:05 +00:00
ErrMigrationCommitNotCalled = fmt . Errorf ( "migration commit function was not called" )
2020-10-19 19:20:18 +00:00
ErrMigrationETLFilesDeleted = fmt . Errorf ( "db migration progress was interrupted after extraction step and ETL files was deleted, please contact development team for help or re-sync from scratch" )
2020-08-11 11:23:41 +00:00
)
2021-06-04 14:56:49 +00:00
func NewMigrator ( label ethdb . Label ) * Migrator {
2020-02-13 14:45:02 +00:00
return & Migrator {
2021-06-04 14:56:49 +00:00
Migrations : migrations [ label ] ,
2020-02-13 14:45:02 +00:00
}
}
type Migrator struct {
Migrations [ ] Migration
}
2021-06-04 14:56:49 +00:00
func AppliedMigrations ( tx ethdb . Tx , withPayload bool ) ( map [ string ] [ ] byte , error ) {
2020-08-05 10:13:35 +00:00
applied := map [ string ] [ ] byte { }
2021-06-04 14:56:49 +00:00
err := tx . ForEach ( dbutils . Migrations , nil , func ( k [ ] byte , v [ ] byte ) error {
2020-10-19 19:20:18 +00:00
if bytes . HasPrefix ( k , [ ] byte ( "_progress_" ) ) {
2021-06-04 14:56:49 +00:00
return nil
2020-10-19 19:20:18 +00:00
}
2020-08-05 10:13:35 +00:00
if withPayload {
applied [ string ( common . CopyBytes ( k ) ) ] = common . CopyBytes ( v )
} else {
applied [ string ( common . CopyBytes ( k ) ) ] = [ ] byte { }
}
2021-06-04 14:56:49 +00:00
return nil
2020-08-05 10:13:35 +00:00
} )
return applied , err
}
2021-06-04 14:56:49 +00:00
func ( m * Migrator ) HasPendingMigrations ( db ethdb . RwKV ) ( bool , error ) {
var has bool
if err := db . View ( context . Background ( ) , func ( tx ethdb . Tx ) error {
pending , err := m . PendingMigrations ( tx )
if err != nil {
return err
}
has = len ( pending ) > 0
return nil
} ) ; err != nil {
2020-10-28 09:52:15 +00:00
return false , err
}
2021-06-04 14:56:49 +00:00
return has , nil
2020-10-28 09:52:15 +00:00
}
2021-06-04 14:56:49 +00:00
func ( m * Migrator ) PendingMigrations ( tx ethdb . Tx ) ( [ ] Migration , error ) {
applied , err := AppliedMigrations ( tx , false )
2020-10-28 09:52:15 +00:00
if err != nil {
return nil , err
}
counter := 0
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
counter ++
}
pending := make ( [ ] Migration , 0 , counter )
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
pending = append ( pending , v )
}
return pending , nil
}
2021-07-04 07:50:32 +00:00
func ( m * Migrator ) Apply ( kv ethdb . RwKV , datadir string ) error {
2020-02-13 14:45:02 +00:00
if len ( m . Migrations ) == 0 {
return nil
}
2021-06-04 14:56:49 +00:00
var applied map [ string ] [ ] byte
2021-07-04 07:50:32 +00:00
if err := kv . View ( context . Background ( ) , func ( tx ethdb . Tx ) error {
2021-06-04 14:56:49 +00:00
var err error
applied , err = AppliedMigrations ( tx , false )
return err
} ) ; err != nil {
return err
}
2021-07-04 07:50:32 +00:00
db := kv2 . NewObjectDatabase ( kv )
2021-06-04 14:56:49 +00:00
tx , err1 := db . Begin ( context . Background ( ) , ethdb . RW )
2020-10-19 19:20:18 +00:00
if err1 != nil {
return err1
2020-02-13 14:45:02 +00:00
}
2021-06-04 14:56:49 +00:00
defer tx . Rollback ( )
2020-02-13 14:45:02 +00:00
2020-08-11 11:23:41 +00:00
// migration names must be unique, protection against people's mistake
uniqueNameCheck := map [ string ] bool { }
for i := range m . Migrations {
_ , ok := uniqueNameCheck [ m . Migrations [ i ] . Name ]
if ok {
return fmt . Errorf ( "%w, duplicate: %s" , ErrMigrationNonUniqueName , m . Migrations [ i ] . Name )
}
uniqueNameCheck [ m . Migrations [ i ] . Name ] = true
}
2020-08-05 10:13:35 +00:00
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
2020-08-11 11:23:41 +00:00
commitFuncCalled := false // commit function must be called if no error, protection against people's mistake
2020-08-05 10:13:35 +00:00
log . Info ( "Apply migration" , "name" , v . Name )
2021-04-05 13:04:58 +00:00
progress , err := tx . GetOne ( dbutils . Migrations , [ ] byte ( "_progress_" + v . Name ) )
if err != nil {
2020-10-19 19:20:18 +00:00
return err
}
2021-04-26 12:51:01 +00:00
if err = v . Up ( tx , path . Join ( datadir , "migrations" , v . Name ) , progress , func ( _ ethdb . Putter , key [ ] byte , isDone bool ) error {
2020-08-05 10:13:35 +00:00
if ! isDone {
2020-10-19 19:20:18 +00:00
if key != nil {
err = tx . Put ( dbutils . Migrations , [ ] byte ( "_progress_" + v . Name ) , key )
if err != nil {
return err
}
}
2020-10-19 06:43:30 +00:00
// do commit, but don't save partial progress
if err := tx . CommitAndBegin ( context . Background ( ) ) ; err != nil {
return err
}
return nil
2020-08-05 10:13:35 +00:00
}
2020-08-11 11:23:41 +00:00
commitFuncCalled = true
2020-09-08 19:39:43 +00:00
stagesProgress , err := MarshalMigrationPayload ( tx )
2020-08-05 10:13:35 +00:00
if err != nil {
return err
}
2020-10-19 19:20:18 +00:00
err = tx . Put ( dbutils . Migrations , [ ] byte ( v . Name ) , stagesProgress )
if err != nil {
return err
}
2020-10-29 13:19:31 +00:00
err = tx . Delete ( dbutils . Migrations , [ ] byte ( "_progress_" + v . Name ) , nil )
2020-08-05 10:13:35 +00:00
if err != nil {
return err
}
2020-09-08 19:39:43 +00:00
2020-09-28 17:18:36 +00:00
if err := tx . CommitAndBegin ( context . Background ( ) ) ; err != nil {
2020-09-08 19:39:43 +00:00
return err
}
2020-08-05 10:13:35 +00:00
return nil
} ) ; err != nil {
return err
2020-02-13 14:45:02 +00:00
}
2020-08-05 10:13:35 +00:00
2020-08-11 11:23:41 +00:00
if ! commitFuncCalled {
2020-08-12 02:57:55 +00:00
return fmt . Errorf ( "%w: %s" , ErrMigrationCommitNotCalled , v . Name )
2020-08-11 11:23:41 +00:00
}
2020-08-05 10:13:35 +00:00
log . Info ( "Applied migration" , "name" , v . Name )
2020-02-13 14:45:02 +00:00
}
2021-04-24 15:46:29 +00:00
// Write DB schema version
var version [ 12 ] byte
2021-06-16 10:57:58 +00:00
binary . BigEndian . PutUint32 ( version [ : ] , dbutils . DBSchemaVersion . Major )
binary . BigEndian . PutUint32 ( version [ 4 : ] , dbutils . DBSchemaVersion . Minor )
binary . BigEndian . PutUint32 ( version [ 8 : ] , dbutils . DBSchemaVersion . Patch )
2021-04-24 15:46:29 +00:00
if err := tx . Put ( dbutils . DatabaseInfoBucket , dbutils . DBSchemaVersionKey , version [ : ] ) ; err != nil {
return fmt . Errorf ( "writing DB schema version: %w" , err )
}
if err := tx . Commit ( ) ; err != nil {
return fmt . Errorf ( "committing DB version update: %w" , err )
}
2021-06-16 10:57:58 +00:00
log . Info ( "Updated DB schema to" , "version" , fmt . Sprintf ( "%d.%d.%d" , dbutils . DBSchemaVersion . Major , dbutils . DBSchemaVersion . Minor , dbutils . DBSchemaVersion . Patch ) )
2020-08-05 10:13:35 +00:00
return nil
}
2020-08-04 09:25:28 +00:00
2021-07-08 02:56:09 +00:00
func MarshalMigrationPayload ( db ethdb . KVGetter ) ( [ ] byte , error ) {
2020-08-05 10:13:35 +00:00
s := map [ string ] [ ] byte { }
buf := bytes . NewBuffer ( nil )
encoder := codec . NewEncoder ( buf , & codec . CborHandle { } )
2020-09-05 16:07:27 +00:00
for _ , stage := range stages . AllStages {
2021-04-21 08:02:21 +00:00
v , err := db . GetOne ( dbutils . SyncStageProgress , [ ] byte ( stage ) )
2021-04-05 13:04:58 +00:00
if err != nil {
2020-08-05 10:13:35 +00:00
return nil , err
2020-08-04 09:25:28 +00:00
}
2020-08-05 10:13:35 +00:00
if len ( v ) > 0 {
2020-09-05 16:07:27 +00:00
s [ string ( stage ) ] = common . CopyBytes ( v )
2020-08-05 10:13:35 +00:00
}
2021-04-21 08:02:21 +00:00
v , err = db . GetOne ( dbutils . SyncStageUnwind , [ ] byte ( stage ) )
2021-04-05 13:04:58 +00:00
if err != nil {
2020-08-05 10:13:35 +00:00
return nil , err
}
if len ( v ) > 0 {
2020-09-05 16:07:27 +00:00
s [ "unwind_" + string ( stage ) ] = common . CopyBytes ( v )
2020-02-13 14:45:02 +00:00
}
}
2020-08-05 10:13:35 +00:00
if err := encoder . Encode ( s ) ; err != nil {
return nil , err
}
return buf . Bytes ( ) , nil
2020-02-13 14:45:02 +00:00
}
2020-08-05 10:13:35 +00:00
func UnmarshalMigrationPayload ( data [ ] byte ) ( map [ string ] [ ] byte , error ) {
s := map [ string ] [ ] byte { }
if err := codec . NewDecoder ( bytes . NewReader ( data ) , & codec . CborHandle { } ) . Decode ( & s ) ; err != nil {
return nil , err
}
return s , nil
}