package migrations import ( "bytes" "context" "encoding/binary" "fmt" "runtime" "time" common2 "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/node/nodecfg/datadir" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/log/v3" ) const ASSERT = false var ErrTxsBeginEndNoMigration = fmt.Errorf("in this Erigon version DB format was changed: added additional first/last system-txs to blocks. There is no DB migration for this change. Please re-sync or switch to earlier version") var txsBeginEnd = Migration{ Name: "txs_begin_end", Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback) (err error) { logEvery := time.NewTicker(10 * time.Second) defer logEvery.Stop() var latestBlock uint64 if err := db.View(context.Background(), func(tx kv.Tx) error { bodiesProgress, err := stages.GetStageProgress(tx, stages.Bodies) if err != nil { return err } if progress != nil { latestBlock = binary.BigEndian.Uint64(progress) log.Info("[database version migration] Continue migration", "from_block", latestBlock) } else { latestBlock = bodiesProgress + 1 // include block 0 } return nil }); err != nil { return err } tx, err := db.BeginRw(context.Background()) if err != nil { return err } defer tx.Rollback() numBuf := make([]byte, 8) numHashBuf := make([]byte, 8+32) for i := int(latestBlock); i >= 0; i-- { blockNum := uint64(i) select { case <-logEvery.C: var m runtime.MemStats common2.ReadMemStats(&m) log.Info("[database version migration] Adding system-txs", "progress", fmt.Sprintf("%.2f%%", 100-100*float64(blockNum)/float64(latestBlock)), "block_num", blockNum, "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) default: } canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum) if err != nil { return err } var oldBlock *types.Body if ASSERT { oldBlock = readCanonicalBodyWithTransactionsDeprecated(tx, canonicalHash, blockNum) } binary.BigEndian.PutUint64(numHashBuf[:8], blockNum) copy(numHashBuf[8:], canonicalHash[:]) b, err := rawdb.ReadBodyForStorageByKey(tx, numHashBuf) if err != nil { return err } if b == nil { continue } txs, err := rawdb.CanonicalTransactions(tx, b.BaseTxId, b.TxAmount) if err != nil { return err } b.BaseTxId += (blockNum) * 2 b.TxAmount += 2 if err := rawdb.WriteBodyForStorage(tx, canonicalHash, blockNum, b); err != nil { return fmt.Errorf("failed to write body: %w", err) } // del first tx in block if err = tx.Delete(kv.EthTx, dbutils.EncodeBlockNumber(b.BaseTxId)); err != nil { return err } if err := writeTransactionsNewDeprecated(tx, txs, b.BaseTxId+1); err != nil { return fmt.Errorf("failed to write body txs: %w", err) } // del last tx in block if err = tx.Delete(kv.EthTx, dbutils.EncodeBlockNumber(b.BaseTxId+uint64(b.TxAmount)-1)); err != nil { return err } if ASSERT { newBlock, baseTxId, txAmount := rawdb.ReadBody(tx, canonicalHash, blockNum) newBlock.Transactions, err = rawdb.CanonicalTransactions(tx, baseTxId, txAmount) for i, oldTx := range oldBlock.Transactions { newTx := newBlock.Transactions[i] if oldTx.GetNonce() != newTx.GetNonce() { panic(blockNum) } } } if err = tx.ForPrefix(kv.BlockBody, numHashBuf[:8], func(k, v []byte) error { if bytes.Equal(k, numHashBuf) { // don't delete canonical blocks return nil } bodyForStorage := new(types.BodyForStorage) if err := rlp.DecodeBytes(v, bodyForStorage); err != nil { return err } for i := bodyForStorage.BaseTxId; i < bodyForStorage.BaseTxId+uint64(bodyForStorage.TxAmount); i++ { binary.BigEndian.PutUint64(numBuf, i) if err = tx.Delete(kv.NonCanonicalTxs, numBuf); err != nil { return err } } if err = tx.Delete(kv.BlockBody, k); err != nil { return err } if err = tx.Delete(kv.Headers, k); err != nil { return err } if err = tx.Delete(kv.HeaderTD, k); err != nil { return err } if err = tx.Delete(kv.HeaderNumber, k[8:]); err != nil { return err } if err = tx.Delete(kv.HeaderNumber, k[8:]); err != nil { return err } return nil }); err != nil { return err } binary.BigEndian.PutUint64(numBuf, blockNum) if err := BeforeCommit(tx, numBuf, false); err != nil { return err } if blockNum%10_000 == 0 { if err := tx.Commit(); err != nil { return err } tx, err = db.BeginRw(context.Background()) if err != nil { return err } } } if err := tx.Commit(); err != nil { return err } return db.Update(context.Background(), func(tx kv.RwTx) error { // reset non-canonical sequence to 0 v, err := tx.ReadSequence(kv.NonCanonicalTxs) if err != nil { return err } if _, err := tx.IncrementSequence(kv.NonCanonicalTxs, -v); err != nil { return err } { c, err := tx.Cursor(kv.HeaderCanonical) if err != nil { return err } k, v, err := c.Last() if err != nil { return err } data, err := tx.GetOne(kv.BlockBody, append(k, v...)) if err != nil { return err } var newSeqValue uint64 if len(data) > 0 { bodyForStorage := new(types.BodyForStorage) if err := rlp.DecodeBytes(data, bodyForStorage); err != nil { return fmt.Errorf("rlp.DecodeBytes(bodyForStorage): %w", err) } currentSeq, err := tx.ReadSequence(kv.EthTx) if err != nil { return err } newSeqValue = bodyForStorage.BaseTxId + uint64(bodyForStorage.TxAmount) - currentSeq } if _, err := tx.IncrementSequence(kv.EthTx, newSeqValue); err != nil { return err } } // This migration is no-op, but it forces the migration mechanism to apply it and thus write the DB schema version info return BeforeCommit(tx, nil, true) }) }, } func writeRawBodyDeprecated(db kv.RwTx, hash common.Hash, number uint64, body *types.RawBody) error { baseTxId, err := db.IncrementSequence(kv.EthTx, uint64(len(body.Transactions))) if err != nil { return err } data := types.BodyForStorage{ BaseTxId: baseTxId, TxAmount: uint32(len(body.Transactions)), Uncles: body.Uncles, } if err = rawdb.WriteBodyForStorage(db, hash, number, &data); err != nil { return fmt.Errorf("failed to write body: %w", err) } if err = rawdb.WriteRawTransactions(db, body.Transactions, baseTxId); err != nil { return fmt.Errorf("failed to WriteRawTransactions: %w, blockNum=%d", err, number) } return nil } func writeTransactionsNewDeprecated(db kv.RwTx, txs []types.Transaction, baseTxId uint64) error { txId := baseTxId buf := bytes.NewBuffer(nil) for _, tx := range txs { txIdKey := make([]byte, 8) binary.BigEndian.PutUint64(txIdKey, txId) buf.Reset() if err := rlp.Encode(buf, tx); err != nil { return fmt.Errorf("broken tx rlp: %w", err) } // If next Append returns KeyExists error - it means you need to open transaction in App code before calling this func. Batch is also fine. if err := db.Put(kv.EthTx, txIdKey, common.CopyBytes(buf.Bytes())); err != nil { return err } txId++ } return nil } func readCanonicalBodyWithTransactionsDeprecated(db kv.Getter, hash common.Hash, number uint64) *types.Body { data := rawdb.ReadStorageBodyRLP(db, hash, number) if len(data) == 0 { return nil } bodyForStorage := new(types.BodyForStorage) err := rlp.DecodeBytes(data, bodyForStorage) if err != nil { log.Error("Invalid block body RLP", "hash", hash, "err", err) return nil } body := new(types.Body) body.Uncles = bodyForStorage.Uncles body.Transactions, err = rawdb.CanonicalTransactions(db, bodyForStorage.BaseTxId, bodyForStorage.TxAmount) if err != nil { log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err) return nil } return body } func makeBodiesNonCanonicalDeprecated(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error { for blockNum := from; ; blockNum++ { h, err := rawdb.ReadCanonicalHash(tx, blockNum) if err != nil { return err } if h == (common.Hash{}) { break } data := rawdb.ReadStorageBodyRLP(tx, h, blockNum) if len(data) == 0 { break } bodyForStorage := new(types.BodyForStorage) if err := rlp.DecodeBytes(data, bodyForStorage); err != nil { return err } // move txs to NonCanonical bucket, it has own sequence newBaseId, err := tx.IncrementSequence(kv.NonCanonicalTxs, uint64(bodyForStorage.TxAmount)) if err != nil { return err } id := newBaseId if err := tx.ForAmount(kv.EthTx, dbutils.EncodeBlockNumber(bodyForStorage.BaseTxId), bodyForStorage.TxAmount, func(k, v []byte) error { if err := tx.Put(kv.NonCanonicalTxs, dbutils.EncodeBlockNumber(id), v); err != nil { return err } id++ return tx.Delete(kv.EthTx, k) }); err != nil { return err } bodyForStorage.BaseTxId = newBaseId if err := rawdb.WriteBodyForStorage(tx, h, blockNum, bodyForStorage); err != nil { return err } select { case <-ctx.Done(): return ctx.Err() case <-logEvery.C: log.Info(fmt.Sprintf("[%s] Unwinding transactions...", logPrefix), "current block", blockNum) default: } } // EthTx must have canonical id's - means need decrement it's sequence on unwind c, err := tx.Cursor(kv.EthTx) if err != nil { return err } defer c.Close() k, _, err := c.Last() if err != nil { return err } var nextTxID uint64 if k != nil { nextTxID = binary.BigEndian.Uint64(k) + 1 } if err := rawdb.ResetSequence(tx, kv.EthTx, nextTxID); err != nil { return err } return nil }