2020-02-13 14:45:02 +00:00
package migrations
import (
2020-08-05 10:13:35 +00:00
"bytes"
2020-09-28 17:18:36 +00:00
"context"
2021-04-24 15:46:29 +00:00
"encoding/binary"
2020-08-11 11:23:41 +00:00
"fmt"
2022-02-12 13:33:09 +00:00
"path/filepath"
2020-08-10 17:46:06 +00:00
2021-07-29 11:53:13 +00:00
"github.com/ledgerwatch/erigon-lib/kv"
2021-05-20 18:25:53 +00:00
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
2021-07-29 10:23:23 +00:00
"github.com/ledgerwatch/log/v3"
2020-08-05 10:13:35 +00:00
"github.com/ugorji/go/codec"
2020-02-13 14:45:02 +00:00
)
2020-08-05 10:13:35 +00:00
// migrations apply sequentially in order of this array, skips applied migrations
// it allows - don't worry about merge conflicts and use switch branches
// see also dbutils.Migrations - it stores context in which each transaction was exectured - useful for bug-reports
//
// Idempotency is expected
// Best practices to achieve Idempotency:
// - in dbutils/bucket.go add suffix for existing bucket variable, create new bucket with same variable name.
// Example:
// - SyncStageProgress = []byte("SSP1")
// + SyncStageProgressOld1 = []byte("SSP1")
// + SyncStageProgress = []byte("SSP2")
// - in the beginning of migration: check that old bucket exists, clear new bucket
// - in the end:drop old bucket (not in defer!).
// - if you need migrate multiple buckets - create separate migration for each bucket
2021-08-02 12:04:51 +00:00
// - write test - and check that it's safe to apply same migration twice
2021-07-28 02:47:38 +00:00
var migrations = map [ kv . Label ] [ ] Migration {
kv . ChainDB : {
2021-11-21 03:32:14 +00:00
dbSchemaVersion5 ,
2022-03-10 07:48:58 +00:00
txsBeginEnd ,
2021-06-04 14:56:49 +00:00
} ,
2021-07-28 02:47:38 +00:00
kv . TxPoolDB : { } ,
kv . SentryDB : { } ,
2020-08-05 10:13:35 +00:00
}
2021-07-28 02:47:38 +00:00
type Callback func ( tx kv . RwTx , progress [ ] byte , isDone bool ) error
2020-02-13 14:45:02 +00:00
type Migration struct {
Name string
2021-07-28 02:47:38 +00:00
Up func ( db kv . RwDB , tmpdir string , progress [ ] byte , BeforeCommit Callback ) error
2020-02-13 14:45:02 +00:00
}
2020-08-11 11:23:41 +00:00
var (
ErrMigrationNonUniqueName = fmt . Errorf ( "please provide unique migration name" )
2022-03-10 07:48:58 +00:00
ErrMigrationCommitNotCalled = fmt . Errorf ( "migration before-commit function was not called" )
2020-10-19 19:20:18 +00:00
ErrMigrationETLFilesDeleted = fmt . Errorf ( "db migration progress was interrupted after extraction step and ETL files was deleted, please contact development team for help or re-sync from scratch" )
2020-08-11 11:23:41 +00:00
)
2021-07-28 02:47:38 +00:00
func NewMigrator ( label kv . Label ) * Migrator {
2020-02-13 14:45:02 +00:00
return & Migrator {
2021-06-04 14:56:49 +00:00
Migrations : migrations [ label ] ,
2020-02-13 14:45:02 +00:00
}
}
type Migrator struct {
Migrations [ ] Migration
}
2021-07-28 02:47:38 +00:00
func AppliedMigrations ( tx kv . Tx , withPayload bool ) ( map [ string ] [ ] byte , error ) {
2020-08-05 10:13:35 +00:00
applied := map [ string ] [ ] byte { }
2021-07-28 02:47:38 +00:00
err := tx . ForEach ( kv . Migrations , nil , func ( k [ ] byte , v [ ] byte ) error {
2020-10-19 19:20:18 +00:00
if bytes . HasPrefix ( k , [ ] byte ( "_progress_" ) ) {
2021-06-04 14:56:49 +00:00
return nil
2020-10-19 19:20:18 +00:00
}
2020-08-05 10:13:35 +00:00
if withPayload {
applied [ string ( common . CopyBytes ( k ) ) ] = common . CopyBytes ( v )
} else {
applied [ string ( common . CopyBytes ( k ) ) ] = [ ] byte { }
}
2021-06-04 14:56:49 +00:00
return nil
2020-08-05 10:13:35 +00:00
} )
return applied , err
}
2021-07-28 02:47:38 +00:00
func ( m * Migrator ) HasPendingMigrations ( db kv . RwDB ) ( bool , error ) {
2021-06-04 14:56:49 +00:00
var has bool
2021-07-28 02:47:38 +00:00
if err := db . View ( context . Background ( ) , func ( tx kv . Tx ) error {
2021-06-04 14:56:49 +00:00
pending , err := m . PendingMigrations ( tx )
if err != nil {
return err
}
has = len ( pending ) > 0
return nil
} ) ; err != nil {
2020-10-28 09:52:15 +00:00
return false , err
}
2021-06-04 14:56:49 +00:00
return has , nil
2020-10-28 09:52:15 +00:00
}
2021-07-28 02:47:38 +00:00
func ( m * Migrator ) PendingMigrations ( tx kv . Tx ) ( [ ] Migration , error ) {
2021-06-04 14:56:49 +00:00
applied , err := AppliedMigrations ( tx , false )
2020-10-28 09:52:15 +00:00
if err != nil {
return nil , err
}
counter := 0
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
counter ++
}
pending := make ( [ ] Migration , 0 , counter )
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
pending = append ( pending , v )
}
return pending , nil
}
2022-03-08 03:02:35 +00:00
func ( m * Migrator ) VerifyVersion ( db kv . RwDB ) error {
2021-07-28 02:47:38 +00:00
if err := db . View ( context . Background ( ) , func ( tx kv . Tx ) error {
2021-06-04 14:56:49 +00:00
var err error
2022-03-08 03:02:35 +00:00
existingVersion , err := tx . GetOne ( kv . DatabaseInfo , kv . DBSchemaVersionKey )
2021-12-15 21:03:04 +00:00
if err != nil {
return fmt . Errorf ( "reading DB schema version: %w" , err )
}
if len ( existingVersion ) != 0 && len ( existingVersion ) != 12 {
return fmt . Errorf ( "incorrect length of DB schema version: %d" , len ( existingVersion ) )
}
if len ( existingVersion ) == 12 {
major := binary . BigEndian . Uint32 ( existingVersion )
minor := binary . BigEndian . Uint32 ( existingVersion [ 4 : ] )
if major > kv . DBSchemaVersion . Major {
return fmt . Errorf ( "cannot downgrade major DB version from %d to %d" , major , kv . DBSchemaVersion . Major )
} else if major == kv . DBSchemaVersion . Major {
if minor > kv . DBSchemaVersion . Minor {
return fmt . Errorf ( "cannot downgrade minor DB version from %d.%d to %d.%d" , major , minor , kv . DBSchemaVersion . Major , kv . DBSchemaVersion . Major )
}
} else {
2022-02-04 02:21:10 +00:00
// major < kv.DBSchemaVersion.Major
if kv . DBSchemaVersion . Major - major > 1 {
return fmt . Errorf ( "cannot upgrade major DB version for more than 1 version from %d to %d, use integration tool if you know what you are doing" , major , kv . DBSchemaVersion . Major )
2021-12-15 21:03:04 +00:00
}
}
}
return nil
2021-06-04 14:56:49 +00:00
} ) ; err != nil {
2022-03-08 03:02:35 +00:00
return fmt . Errorf ( "migrator.VerifyVersion: %w" , err )
}
return nil
}
func ( m * Migrator ) Apply ( db kv . RwDB , datadir string ) error {
if len ( m . Migrations ) == 0 {
return nil
}
var applied map [ string ] [ ] byte
2022-03-09 09:46:48 +00:00
if err := db . View ( context . Background ( ) , func ( tx kv . Tx ) error {
var err error
applied , err = AppliedMigrations ( tx , false )
if err != nil {
return fmt . Errorf ( "reading applied migrations: %w" , err )
}
return nil
} ) ; err != nil {
return err
}
2022-03-08 03:02:35 +00:00
if err := m . VerifyVersion ( db ) ; err != nil {
2022-03-10 07:48:58 +00:00
return fmt . Errorf ( "migrator.Apply: %w" , err )
2021-06-04 14:56:49 +00:00
}
2020-08-11 11:23:41 +00:00
// migration names must be unique, protection against people's mistake
uniqueNameCheck := map [ string ] bool { }
for i := range m . Migrations {
_ , ok := uniqueNameCheck [ m . Migrations [ i ] . Name ]
if ok {
return fmt . Errorf ( "%w, duplicate: %s" , ErrMigrationNonUniqueName , m . Migrations [ i ] . Name )
}
uniqueNameCheck [ m . Migrations [ i ] . Name ] = true
}
2020-08-05 10:13:35 +00:00
for i := range m . Migrations {
v := m . Migrations [ i ]
if _ , ok := applied [ v . Name ] ; ok {
continue
}
2020-08-11 11:23:41 +00:00
2021-07-24 04:28:05 +00:00
callbackCalled := false // commit function must be called if no error, protection against people's mistake
2020-08-11 11:23:41 +00:00
2020-08-05 10:13:35 +00:00
log . Info ( "Apply migration" , "name" , v . Name )
2021-07-24 04:28:05 +00:00
var progress [ ] byte
2021-07-28 02:47:38 +00:00
if err := db . View ( context . Background ( ) , func ( tx kv . Tx ) ( err error ) {
progress , err = tx . GetOne ( kv . Migrations , [ ] byte ( "_progress_" + v . Name ) )
2021-07-24 04:28:05 +00:00
return err
} ) ; err != nil {
2022-03-10 07:48:58 +00:00
return fmt . Errorf ( "migrator.Apply: %w" , err )
2020-10-19 19:20:18 +00:00
}
2022-02-12 13:33:09 +00:00
if err := v . Up ( db , filepath . Join ( datadir , "migrations" , v . Name ) , progress , func ( tx kv . RwTx , key [ ] byte , isDone bool ) error {
2020-08-05 10:13:35 +00:00
if ! isDone {
2020-10-19 19:20:18 +00:00
if key != nil {
2021-07-28 02:47:38 +00:00
if err := tx . Put ( kv . Migrations , [ ] byte ( "_progress_" + v . Name ) , key ) ; err != nil {
2020-10-19 19:20:18 +00:00
return err
}
}
2020-10-19 06:43:30 +00:00
return nil
2020-08-05 10:13:35 +00:00
}
2021-07-24 04:28:05 +00:00
callbackCalled = true
2020-08-11 11:23:41 +00:00
2020-09-08 19:39:43 +00:00
stagesProgress , err := MarshalMigrationPayload ( tx )
2020-08-05 10:13:35 +00:00
if err != nil {
return err
}
2021-07-28 02:47:38 +00:00
err = tx . Put ( kv . Migrations , [ ] byte ( v . Name ) , stagesProgress )
2020-10-19 19:20:18 +00:00
if err != nil {
return err
}
2021-07-28 02:47:38 +00:00
err = tx . Delete ( kv . Migrations , [ ] byte ( "_progress_" + v . Name ) , nil )
2020-08-05 10:13:35 +00:00
if err != nil {
return err
}
2020-09-08 19:39:43 +00:00
2020-08-05 10:13:35 +00:00
return nil
} ) ; err != nil {
2022-03-10 07:48:58 +00:00
return fmt . Errorf ( "migrator.Apply.Up: %s, %w" , v . Name , err )
2020-02-13 14:45:02 +00:00
}
2020-08-05 10:13:35 +00:00
2021-07-24 04:28:05 +00:00
if ! callbackCalled {
2020-08-12 02:57:55 +00:00
return fmt . Errorf ( "%w: %s" , ErrMigrationCommitNotCalled , v . Name )
2020-08-11 11:23:41 +00:00
}
2020-08-05 10:13:35 +00:00
log . Info ( "Applied migration" , "name" , v . Name )
2020-02-13 14:45:02 +00:00
}
2021-04-24 15:46:29 +00:00
// Write DB schema version
var version [ 12 ] byte
2021-07-28 02:47:38 +00:00
binary . BigEndian . PutUint32 ( version [ : ] , kv . DBSchemaVersion . Major )
binary . BigEndian . PutUint32 ( version [ 4 : ] , kv . DBSchemaVersion . Minor )
binary . BigEndian . PutUint32 ( version [ 8 : ] , kv . DBSchemaVersion . Patch )
if err := db . Update ( context . Background ( ) , func ( tx kv . RwTx ) error {
if err := tx . Put ( kv . DatabaseInfo , kv . DBSchemaVersionKey , version [ : ] ) ; err != nil {
2021-07-24 04:28:05 +00:00
return fmt . Errorf ( "writing DB schema version: %w" , err )
}
return nil
} ) ; err != nil {
2022-03-10 07:48:58 +00:00
return fmt . Errorf ( "migrator.Apply: %w" , err )
2021-04-24 15:46:29 +00:00
}
2021-07-28 02:47:38 +00:00
log . Info ( "Updated DB schema to" , "version" , fmt . Sprintf ( "%d.%d.%d" , kv . DBSchemaVersion . Major , kv . DBSchemaVersion . Minor , kv . DBSchemaVersion . Patch ) )
2020-08-05 10:13:35 +00:00
return nil
}
2020-08-04 09:25:28 +00:00
2021-07-28 02:47:38 +00:00
func MarshalMigrationPayload ( db kv . Getter ) ( [ ] byte , error ) {
2020-08-05 10:13:35 +00:00
s := map [ string ] [ ] byte { }
buf := bytes . NewBuffer ( nil )
encoder := codec . NewEncoder ( buf , & codec . CborHandle { } )
2020-09-05 16:07:27 +00:00
for _ , stage := range stages . AllStages {
2021-07-28 02:47:38 +00:00
v , err := db . GetOne ( kv . SyncStageProgress , [ ] byte ( stage ) )
2021-04-05 13:04:58 +00:00
if err != nil {
2020-08-05 10:13:35 +00:00
return nil , err
2020-08-04 09:25:28 +00:00
}
2020-08-05 10:13:35 +00:00
if len ( v ) > 0 {
2020-09-05 16:07:27 +00:00
s [ string ( stage ) ] = common . CopyBytes ( v )
2020-08-05 10:13:35 +00:00
}
2020-02-13 14:45:02 +00:00
}
2020-08-05 10:13:35 +00:00
if err := encoder . Encode ( s ) ; err != nil {
return nil , err
}
return buf . Bytes ( ) , nil
2020-02-13 14:45:02 +00:00
}
2020-08-05 10:13:35 +00:00
func UnmarshalMigrationPayload ( data [ ] byte ) ( map [ string ] [ ] byte , error ) {
s := map [ string ] [ ] byte { }
if err := codec . NewDecoder ( bytes . NewReader ( data ) , & codec . CborHandle { } ) . Decode ( & s ) ; err != nil {
return nil , err
}
return s , nil
}