mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
rawdb methods to rw db schema version (#7653)
This commit is contained in:
parent
21625652f3
commit
c8e717c957
@ -348,15 +348,10 @@ func (b *SimulatedBackend) BlockByHash(ctx context.Context, hash libcommon.Hash)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
number := rawdb.ReadHeaderNumber(tx, hash)
|
||||
if number == nil {
|
||||
return nil, nil
|
||||
}
|
||||
block, _, err := b.BlockReader().BlockWithSenders(ctx, tx, hash, *number)
|
||||
block, err := b.BlockReader().BlockByHash(ctx, tx, hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if block != nil {
|
||||
return block, nil
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ package cli
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
@ -204,38 +203,35 @@ func subscribeToStateChanges(ctx context.Context, client StateChangesClient, cac
|
||||
|
||||
func checkDbCompatibility(ctx context.Context, db kv.RoDB) error {
|
||||
// DB schema version compatibility check
|
||||
var version []byte
|
||||
var compatErr error
|
||||
var compatTx kv.Tx
|
||||
if compatTx, compatErr = db.BeginRo(ctx); compatErr != nil {
|
||||
return fmt.Errorf("open Ro Tx for DB schema compability check: %w", compatErr)
|
||||
}
|
||||
defer compatTx.Rollback()
|
||||
if version, compatErr = compatTx.GetOne(kv.DatabaseInfo, kv.DBSchemaVersionKey); compatErr != nil {
|
||||
major, minor, patch, ok, err := rawdb.ReadDBSchemaVersion(compatTx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read version for DB schema compability check: %w", compatErr)
|
||||
}
|
||||
if len(version) != 12 {
|
||||
return fmt.Errorf("database does not have major schema version. upgrade and restart Erigon core")
|
||||
if ok {
|
||||
var compatible bool
|
||||
dbSchemaVersion := &kv.DBSchemaVersion
|
||||
if major != dbSchemaVersion.Major {
|
||||
compatible = false
|
||||
} else if minor != dbSchemaVersion.Minor {
|
||||
compatible = false
|
||||
} else {
|
||||
compatible = true
|
||||
}
|
||||
if !compatible {
|
||||
return fmt.Errorf("incompatible DB Schema versions: reader %d.%d.%d, database %d.%d.%d",
|
||||
dbSchemaVersion.Major, dbSchemaVersion.Minor, dbSchemaVersion.Patch,
|
||||
major, minor, patch)
|
||||
}
|
||||
log.Info("DB schemas compatible", "reader", fmt.Sprintf("%d.%d.%d", dbSchemaVersion.Major, dbSchemaVersion.Minor, dbSchemaVersion.Patch),
|
||||
"database", fmt.Sprintf("%d.%d.%d", major, minor, patch))
|
||||
}
|
||||
major := binary.BigEndian.Uint32(version)
|
||||
minor := binary.BigEndian.Uint32(version[4:])
|
||||
patch := binary.BigEndian.Uint32(version[8:])
|
||||
var compatible bool
|
||||
dbSchemaVersion := &kv.DBSchemaVersion
|
||||
if major != dbSchemaVersion.Major {
|
||||
compatible = false
|
||||
} else if minor != dbSchemaVersion.Minor {
|
||||
compatible = false
|
||||
} else {
|
||||
compatible = true
|
||||
}
|
||||
if !compatible {
|
||||
return fmt.Errorf("incompatible DB Schema versions: reader %d.%d.%d, database %d.%d.%d",
|
||||
dbSchemaVersion.Major, dbSchemaVersion.Minor, dbSchemaVersion.Patch,
|
||||
major, minor, patch)
|
||||
}
|
||||
log.Info("DB schemas compatible", "reader", fmt.Sprintf("%d.%d.%d", dbSchemaVersion.Major, dbSchemaVersion.Minor, dbSchemaVersion.Patch),
|
||||
"database", fmt.Sprintf("%d.%d.%d", major, minor, patch))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1789,3 +1789,30 @@ func ReadVerkleNode(tx kv.RwTx, root libcommon.Hash) (verkle.VerkleNode, error)
|
||||
}
|
||||
return verkle.ParseNode(encoded, 0, root[:])
|
||||
}
|
||||
func WriteDBSchemaVersion(tx kv.RwTx) error {
|
||||
var version [12]byte
|
||||
binary.BigEndian.PutUint32(version[:], kv.DBSchemaVersion.Major)
|
||||
binary.BigEndian.PutUint32(version[4:], kv.DBSchemaVersion.Minor)
|
||||
binary.BigEndian.PutUint32(version[8:], kv.DBSchemaVersion.Patch)
|
||||
if err := tx.Put(kv.DatabaseInfo, kv.DBSchemaVersionKey, version[:]); err != nil {
|
||||
return fmt.Errorf("writing DB schema version: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func ReadDBSchemaVersion(tx kv.Tx) (major, minor, patch uint32, ok bool, err error) {
|
||||
existingVersion, err := tx.GetOne(kv.DatabaseInfo, kv.DBSchemaVersionKey)
|
||||
if err != nil {
|
||||
return 0, 0, 0, false, fmt.Errorf("reading DB schema version: %w", err)
|
||||
}
|
||||
if len(existingVersion) == 0 {
|
||||
return 0, 0, 0, false, nil
|
||||
}
|
||||
if len(existingVersion) != 12 {
|
||||
return 0, 0, 0, false, fmt.Errorf("incorrect length of DB schema version: %d", len(existingVersion))
|
||||
}
|
||||
|
||||
major = binary.BigEndian.Uint32(existingVersion)
|
||||
minor = binary.BigEndian.Uint32(existingVersion[4:])
|
||||
patch = binary.BigEndian.Uint32(existingVersion[8:])
|
||||
return major, minor, patch, true, nil
|
||||
}
|
||||
|
@ -14,18 +14,22 @@
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package rawdb
|
||||
package rawdb_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/big"
|
||||
"testing"
|
||||
|
||||
"github.com/holiman/uint256"
|
||||
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/memdb"
|
||||
|
||||
"github.com/ledgerwatch/erigon/core/rawdb"
|
||||
"github.com/ledgerwatch/erigon/core/types"
|
||||
"github.com/ledgerwatch/erigon/turbo/services"
|
||||
"github.com/ledgerwatch/erigon/turbo/stages"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Tests that positional lookup metadata can be stored and retrieved.
|
||||
@ -37,7 +41,7 @@ func TestLookupStorage(t *testing.T) {
|
||||
{
|
||||
"DatabaseV6",
|
||||
func(db kv.Putter, block *types.Block) {
|
||||
WriteTxLookupEntries(db, block)
|
||||
rawdb.WriteTxLookupEntries(db, block)
|
||||
},
|
||||
},
|
||||
// Erigon: older databases are removed, no backward compatibility
|
||||
@ -45,7 +49,11 @@ func TestLookupStorage(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
_, tx := memdb.NewTestTx(t)
|
||||
m := stages.Mock(t)
|
||||
br, bw := m.NewBlocksIO()
|
||||
tx, err := m.DB.BeginRw(m.Ctx)
|
||||
require.NoError(t, err)
|
||||
defer tx.Rollback()
|
||||
|
||||
tx1 := types.NewTransaction(1, libcommon.BytesToAddress([]byte{0x11}), uint256.NewInt(111), 1111, uint256.NewInt(11111), []byte{0x11, 0x11, 0x11})
|
||||
tx2 := types.NewTransaction(2, libcommon.BytesToAddress([]byte{0x22}), uint256.NewInt(222), 2222, uint256.NewInt(22222), []byte{0x22, 0x22, 0x22})
|
||||
@ -56,24 +64,24 @@ func TestLookupStorage(t *testing.T) {
|
||||
|
||||
// Check that no transactions entries are in a pristine database
|
||||
for i, txn := range txs {
|
||||
if txn2, _, _, _, _ := ReadTransactionByHash(tx, txn.Hash()); txn2 != nil {
|
||||
if txn2, _, _, _, _ := readTransactionByHash(tx, txn.Hash(), br); txn2 != nil {
|
||||
t.Fatalf("txn #%d [%x]: non existent transaction returned: %v", i, txn.Hash(), txn2)
|
||||
}
|
||||
}
|
||||
// Insert all the transactions into the database, and verify contents
|
||||
if err := WriteCanonicalHash(tx, block.Hash(), block.NumberU64()); err != nil {
|
||||
if err := rawdb.WriteCanonicalHash(tx, block.Hash(), block.NumberU64()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := WriteBlock(tx, block); err != nil {
|
||||
if err := bw.WriteBlock(tx, block); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := WriteSenders(tx, block.Hash(), block.NumberU64(), block.Body().SendersFromTxs()); err != nil {
|
||||
if err := rawdb.WriteSenders(tx, block.Hash(), block.NumberU64(), block.Body().SendersFromTxs()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
tc.writeTxLookupEntries(tx, block)
|
||||
|
||||
for i, txn := range txs {
|
||||
if txn2, hash, number, index, _ := ReadTransactionByHash(tx, txn.Hash()); txn2 == nil {
|
||||
if txn2, hash, number, index, _ := readTransactionByHash(tx, txn.Hash(), br); txn2 == nil {
|
||||
t.Fatalf("txn #%d [%x]: transaction not found", i, txn.Hash())
|
||||
} else {
|
||||
if hash != block.Hash() || number != block.NumberU64() || index != uint64(i) {
|
||||
@ -86,13 +94,49 @@ func TestLookupStorage(t *testing.T) {
|
||||
}
|
||||
// Delete the transactions and check purge
|
||||
for i, txn := range txs {
|
||||
if err := DeleteTxLookupEntry(tx, txn.Hash()); err != nil {
|
||||
if err := rawdb.DeleteTxLookupEntry(tx, txn.Hash()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if txn2, _, _, _, _ := ReadTransactionByHash(tx, txn.Hash()); txn2 != nil {
|
||||
if txn2, _, _, _, _ := readTransactionByHash(tx, txn.Hash(), br); txn2 != nil {
|
||||
t.Fatalf("txn #%d [%x]: deleted transaction returned: %v", i, txn.Hash(), txn2)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ReadTransactionByHash retrieves a specific transaction from the database, along with
|
||||
// its added positional metadata.
|
||||
func readTransactionByHash(db kv.Tx, hash libcommon.Hash, br services.FullBlockReader) (types.Transaction, libcommon.Hash, uint64, uint64, error) {
|
||||
blockNumber, err := rawdb.ReadTxLookupEntry(db, hash)
|
||||
if err != nil {
|
||||
return nil, libcommon.Hash{}, 0, 0, err
|
||||
}
|
||||
if blockNumber == nil {
|
||||
return nil, libcommon.Hash{}, 0, 0, nil
|
||||
}
|
||||
blockHash, err := rawdb.ReadCanonicalHash(db, *blockNumber)
|
||||
if err != nil {
|
||||
return nil, libcommon.Hash{}, 0, 0, err
|
||||
}
|
||||
if blockHash == (libcommon.Hash{}) {
|
||||
return nil, libcommon.Hash{}, 0, 0, nil
|
||||
}
|
||||
body, _ := br.BodyWithTransactions(context.Background(), db, blockHash, *blockNumber)
|
||||
if body == nil {
|
||||
log.Error("Transaction referenced missing", "number", blockNumber, "hash", blockHash)
|
||||
return nil, libcommon.Hash{}, 0, 0, nil
|
||||
}
|
||||
senders, err1 := rawdb.ReadSenders(db, blockHash, *blockNumber)
|
||||
if err1 != nil {
|
||||
return nil, libcommon.Hash{}, 0, 0, err1
|
||||
}
|
||||
body.SendersToTxs(senders)
|
||||
for txIndex, tx := range body.Transactions {
|
||||
if tx.Hash() == hash {
|
||||
return tx, blockHash, *blockNumber, uint64(txIndex), nil
|
||||
}
|
||||
}
|
||||
log.Error("Transaction not found", "number", blockNumber, "hash", blockHash, "txhash", hash)
|
||||
return nil, libcommon.Hash{}, 0, 0, nil
|
||||
}
|
||||
|
@ -139,7 +139,7 @@ func executeBlock(
|
||||
stateStream bool,
|
||||
) error {
|
||||
blockNum := block.NumberU64()
|
||||
stateReader, stateWriter, err := newStateReaderWriter(batch, tx, block, writeChangesets, cfg.accumulator, initialCycle, stateStream)
|
||||
stateReader, stateWriter, err := newStateReaderWriter(batch, tx, block, writeChangesets, cfg.accumulator, cfg.blockReader, initialCycle, stateStream)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -204,6 +204,7 @@ func newStateReaderWriter(
|
||||
block *types.Block,
|
||||
writeChangesets bool,
|
||||
accumulator *shards.Accumulator,
|
||||
br services.FullBlockReader,
|
||||
initialCycle bool,
|
||||
stateStream bool,
|
||||
) (state.StateReader, state.WriterWithChangeSets, error) {
|
||||
@ -214,7 +215,7 @@ func newStateReaderWriter(
|
||||
stateReader = state.NewPlainStateReader(batch)
|
||||
|
||||
if !initialCycle && stateStream {
|
||||
txs, err := rawdb.RawTransactionsRange(tx, block.NumberU64(), block.NumberU64())
|
||||
txs, err := br.RawTransactions(context.Background(), tx, block.NumberU64(), block.NumberU64())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -3,13 +3,13 @@ package migrations
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/common/datadir"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon/common"
|
||||
"github.com/ledgerwatch/erigon/core/rawdb"
|
||||
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/ugorji/go/codec"
|
||||
@ -120,17 +120,11 @@ func (m *Migrator) PendingMigrations(tx kv.Tx) ([]Migration, error) {
|
||||
|
||||
func (m *Migrator) VerifyVersion(db kv.RwDB) error {
|
||||
if err := db.View(context.Background(), func(tx kv.Tx) error {
|
||||
var err error
|
||||
existingVersion, err := tx.GetOne(kv.DatabaseInfo, kv.DBSchemaVersionKey)
|
||||
major, minor, _, ok, err := rawdb.ReadDBSchemaVersion(tx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reading DB schema version: %w", err)
|
||||
}
|
||||
if len(existingVersion) != 0 && len(existingVersion) != 12 {
|
||||
return fmt.Errorf("incorrect length of DB schema version: %d", len(existingVersion))
|
||||
}
|
||||
if len(existingVersion) == 12 {
|
||||
major := binary.BigEndian.Uint32(existingVersion)
|
||||
minor := binary.BigEndian.Uint32(existingVersion[4:])
|
||||
if ok {
|
||||
if major > kv.DBSchemaVersion.Major {
|
||||
return fmt.Errorf("cannot downgrade major DB version from %d to %d", major, kv.DBSchemaVersion.Major)
|
||||
} else if major == kv.DBSchemaVersion.Major {
|
||||
@ -236,16 +230,8 @@ func (m *Migrator) Apply(db kv.RwDB, dataDir string, logger log.Logger) error {
|
||||
}
|
||||
logger.Info("Applied migration", "name", v.Name)
|
||||
}
|
||||
// Write DB schema version
|
||||
var version [12]byte
|
||||
binary.BigEndian.PutUint32(version[:], kv.DBSchemaVersion.Major)
|
||||
binary.BigEndian.PutUint32(version[4:], kv.DBSchemaVersion.Minor)
|
||||
binary.BigEndian.PutUint32(version[8:], kv.DBSchemaVersion.Patch)
|
||||
if err := db.Update(context.Background(), func(tx kv.RwTx) error {
|
||||
if err := tx.Put(kv.DatabaseInfo, kv.DBSchemaVersionKey, version[:]); err != nil {
|
||||
return fmt.Errorf("writing DB schema version: %w", err)
|
||||
}
|
||||
return nil
|
||||
return rawdb.WriteDBSchemaVersion(tx)
|
||||
}); err != nil {
|
||||
return fmt.Errorf("migrator.Apply: %w", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user