mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
Caplin<->Erigon: Added Range methods (#8087)
This commit is contained in:
parent
8e653b54ee
commit
93cdbae2e5
@ -91,6 +91,7 @@ func (b *SignedBeaconBlock) EncodeForStorage(buf []byte) ([]byte, error) {
|
||||
}
|
||||
|
||||
func (b *SignedBeaconBlock) DecodeForStorage(buf []byte, s int) error {
|
||||
b.Block.Body.Version = clparams.StateVersion(s)
|
||||
if len(buf) < b.EncodingSizeSSZ() {
|
||||
return fmt.Errorf("[BeaconBody] err: %s", ssz.ErrLowBufferSize)
|
||||
}
|
||||
|
@ -238,7 +238,7 @@ func (b *Eth1Block) RlpHeader() (*types.Header, error) {
|
||||
|
||||
// If the header hash does not match the block hash, return an error.
|
||||
if header.Hash() != b.BlockHash {
|
||||
return nil, fmt.Errorf("cannot derive rlp header: mismatching hash")
|
||||
return nil, fmt.Errorf("cannot derive rlp header: mismatching hash: %s != %s", header.Hash(), b.BlockHash)
|
||||
}
|
||||
|
||||
return header, nil
|
||||
|
@ -75,7 +75,7 @@ func GenerateBlockIndicies(ctx context.Context, db SQLObject, block *cltypes.Bea
|
||||
return fmt.Errorf("failed to write block root to beacon_indicies: %v", err)
|
||||
}
|
||||
}
|
||||
_, err = db.ExecContext(ctx, "INSERT OR IGNORE INTO beacon_indicies (slot, proposer_index, beacon_block_root, state_root, parent_block_root, canonical) VALUES (?, ?, ?, ?, ?, 0);", block.Slot, block.ProposerIndex, blockRoot[:], block.StateRoot[:], block.ParentRoot[:])
|
||||
_, err = db.ExecContext(ctx, "INSERT OR IGNORE INTO beacon_indicies (slot, proposer_index, beacon_block_root, state_root, parent_block_root, canonical) VALUES (?, ?, ?, ?, ?, ?);", block.Slot, block.ProposerIndex, blockRoot[:], block.StateRoot[:], block.ParentRoot[:], forceCanonical)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write block root to beacon_indicies: %v", err)
|
||||
@ -125,7 +125,7 @@ func PruneIndicies(ctx context.Context, db SQLObject, fromSlot, toSlot uint64) e
|
||||
}
|
||||
|
||||
func IterateBeaconIndicies(ctx context.Context, db SQLObject, fromSlot, toSlot uint64, fn func(slot uint64, beaconBlockRoot, parentBlockRoot, stateRoot libcommon.Hash, canonical bool) bool) error {
|
||||
rows, err := db.QueryContext(ctx, "SELECT slot, beacon_block_root, state_root, parent_block_root, canonical FROM beacon_indicies WHERE slot >= ? AND slot <= ?", fromSlot, toSlot)
|
||||
rows, err := db.QueryContext(ctx, "SELECT slot, beacon_block_root, state_root, parent_block_root, canonical FROM beacon_indicies WHERE slot BETWEEN ? AND ?", fromSlot, toSlot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -153,3 +153,29 @@ func IterateBeaconIndicies(ctx context.Context, db SQLObject, fromSlot, toSlot u
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ReadBeaconBlockRootsInSlotRange(ctx context.Context, db SQLObject, fromSlot, count uint64) ([]libcommon.Hash, []uint64, error) {
|
||||
rows, err := db.QueryContext(ctx, "SELECT slot, beacon_block_root FROM beacon_indicies WHERE slot >= ? AND canonical > 0 LIMIT ?", fromSlot, count)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
roots := []libcommon.Hash{}
|
||||
slots := []uint64{}
|
||||
for rows.Next() {
|
||||
var beaconBlockRoot libcommon.Hash
|
||||
var slot uint64
|
||||
err := rows.Scan(&slot, &beaconBlockRoot)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
roots = append(roots, beaconBlockRoot)
|
||||
slots = append(slots, slot)
|
||||
}
|
||||
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return roots, slots, nil
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package persistence
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
@ -11,12 +12,14 @@ import (
|
||||
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon/cl/clparams"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
|
||||
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/peers"
|
||||
"github.com/ledgerwatch/erigon/cl/utils"
|
||||
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/communication/ssz_snappy"
|
||||
"github.com/ledgerwatch/erigon/common/dbutils"
|
||||
"github.com/ledgerwatch/erigon/core/types"
|
||||
"github.com/spf13/afero"
|
||||
)
|
||||
|
||||
@ -30,7 +33,7 @@ type beaconChainDatabaseFilesystem struct {
|
||||
indiciesDB *sql.DB
|
||||
}
|
||||
|
||||
func NewbeaconChainDatabaseFilesystem(fs afero.Fs, executionEngine execution_client.ExecutionEngine, fullBlocks bool, cfg *clparams.BeaconChainConfig, indiciesDB *sql.DB) BeaconChainDatabase {
|
||||
func NewBeaconChainDatabaseFilesystem(fs afero.Fs, executionEngine execution_client.ExecutionEngine, fullBlocks bool, cfg *clparams.BeaconChainConfig, indiciesDB *sql.DB) BeaconChainDatabase {
|
||||
return beaconChainDatabaseFilesystem{
|
||||
fs: fs,
|
||||
cfg: cfg,
|
||||
@ -41,7 +44,128 @@ func NewbeaconChainDatabaseFilesystem(fs afero.Fs, executionEngine execution_cli
|
||||
}
|
||||
|
||||
func (b beaconChainDatabaseFilesystem) GetRange(ctx context.Context, from uint64, count uint64) ([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], error) {
|
||||
panic("not imlemented")
|
||||
tx, err := b.indiciesDB.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
// Retrieve block roots for each ranged slot
|
||||
beaconBlockRooots, slots, err := beacon_indicies.ReadBeaconBlockRootsInSlotRange(ctx, tx, from, count)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(beaconBlockRooots) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var startELNumber *uint64
|
||||
var firstPostBellatrixBlock *int
|
||||
|
||||
elBlockCount := uint64(0)
|
||||
blocks := []*peers.PeeredObject[*cltypes.SignedBeaconBlock]{}
|
||||
for idx, blockRoot := range beaconBlockRooots {
|
||||
slot := slots[idx]
|
||||
_, path := RootToPaths(blockRoot, b.cfg)
|
||||
|
||||
fp, err := b.fs.OpenFile(path, os.O_RDONLY, 0o755)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer fp.Close()
|
||||
block := cltypes.NewSignedBeaconBlock(b.cfg)
|
||||
version := b.cfg.GetCurrentStateVersion(slot / b.cfg.SlotsPerEpoch)
|
||||
if b.fullBlocks {
|
||||
if err := ssz_snappy.DecodeAndReadNoForkDigest(fp, block, version); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// Below is a frankenstein monster, i am sorry.
|
||||
executionPayloadHeader := cltypes.NewEth1Header(version)
|
||||
if version >= clparams.BellatrixVersion {
|
||||
elBlockCount++
|
||||
|
||||
// If there is no execution engine, abort.
|
||||
if b.executionEngine == nil {
|
||||
return nil, nil
|
||||
}
|
||||
executionPayloadLengthBytes := make([]byte, 8)
|
||||
if _, err := fp.Read(executionPayloadLengthBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
executionPayloadLength := binary.BigEndian.Uint64(executionPayloadLengthBytes)
|
||||
|
||||
executionPayloadBytes := make([]byte, executionPayloadLength)
|
||||
if _, err := fp.Read(executionPayloadBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := executionPayloadHeader.DecodeSSZ(executionPayloadBytes, int(version)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if startELNumber == nil {
|
||||
startELNumber = new(uint64)
|
||||
*startELNumber = executionPayloadHeader.BlockNumber
|
||||
firstPostBellatrixBlock = new(int)
|
||||
*firstPostBellatrixBlock = len(blocks)
|
||||
}
|
||||
}
|
||||
// Read beacon part of the block
|
||||
beaconBlockLengthBytes := make([]byte, 8)
|
||||
if _, err := fp.Read(beaconBlockLengthBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
beaconBlockLength := binary.BigEndian.Uint64(beaconBlockLengthBytes)
|
||||
|
||||
beaconBlockBytes := make([]byte, beaconBlockLength)
|
||||
if _, err := fp.Read(beaconBlockBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if beaconBlockBytes, err = utils.DecompressSnappy(beaconBlockBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := block.DecodeForStorage(beaconBlockBytes, int(version)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Write execution payload except for body part (withdrawals and transactions)
|
||||
if version >= clparams.BellatrixVersion {
|
||||
block.Block.Body.ExecutionPayload = cltypes.NewEth1Block(block.Version(), b.cfg)
|
||||
block.Block.Body.ExecutionPayload.ParentHash = executionPayloadHeader.ParentHash
|
||||
block.Block.Body.ExecutionPayload.FeeRecipient = executionPayloadHeader.FeeRecipient
|
||||
block.Block.Body.ExecutionPayload.StateRoot = executionPayloadHeader.StateRoot
|
||||
block.Block.Body.ExecutionPayload.ReceiptsRoot = executionPayloadHeader.ReceiptsRoot
|
||||
block.Block.Body.ExecutionPayload.LogsBloom = executionPayloadHeader.LogsBloom
|
||||
block.Block.Body.ExecutionPayload.PrevRandao = executionPayloadHeader.PrevRandao
|
||||
block.Block.Body.ExecutionPayload.BlockNumber = executionPayloadHeader.BlockNumber
|
||||
block.Block.Body.ExecutionPayload.GasLimit = executionPayloadHeader.GasLimit
|
||||
block.Block.Body.ExecutionPayload.GasUsed = executionPayloadHeader.GasUsed
|
||||
block.Block.Body.ExecutionPayload.Time = executionPayloadHeader.Time
|
||||
block.Block.Body.ExecutionPayload.Extra = executionPayloadHeader.Extra
|
||||
block.Block.Body.ExecutionPayload.BaseFeePerGas = executionPayloadHeader.BaseFeePerGas
|
||||
block.Block.Body.ExecutionPayload.BlockHash = executionPayloadHeader.BlockHash
|
||||
block.Block.Body.ExecutionPayload.BlobGasUsed = executionPayloadHeader.BlobGasUsed
|
||||
block.Block.Body.ExecutionPayload.ExcessBlobGas = executionPayloadHeader.ExcessBlobGas
|
||||
}
|
||||
}
|
||||
blocks = append(blocks, &peers.PeeredObject[*cltypes.SignedBeaconBlock]{Data: block})
|
||||
}
|
||||
if startELNumber != nil {
|
||||
bodies, err := b.executionEngine.GetBodiesByRange(*startELNumber, count)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(bodies) != int(elBlockCount) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
for beaconBlockIdx, bodyIdx := *firstPostBellatrixBlock, 0; beaconBlockIdx < len(blocks); beaconBlockIdx, bodyIdx = beaconBlockIdx+1, bodyIdx+1 {
|
||||
body := bodies[bodyIdx]
|
||||
blocks[beaconBlockIdx].Data.Block.Body.ExecutionPayload.Transactions = solid.NewTransactionsSSZFromTransactions(bodies[bodyIdx].Transactions)
|
||||
blocks[beaconBlockIdx].Data.Block.Body.ExecutionPayload.Withdrawals = solid.NewDynamicListSSZFromList[*types.Withdrawal](body.Withdrawals, int(b.cfg.MaxWithdrawalsPerPayload))
|
||||
}
|
||||
}
|
||||
return blocks, nil
|
||||
|
||||
}
|
||||
|
||||
func (b beaconChainDatabaseFilesystem) PurgeRange(ctx context.Context, from uint64, count uint64) error {
|
||||
@ -103,23 +227,24 @@ func (b beaconChainDatabaseFilesystem) WriteBlock(ctx context.Context, block *cl
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Need to reference EL somehow on read.
|
||||
if _, err := fp.Write(dbutils.EncodeBlockNumber(uint64(len(encodedPayloadHeader)))); err != nil {
|
||||
return err
|
||||
}
|
||||
// Need to reference EL somehow on read.
|
||||
if _, err := fp.Write(encodedPayloadHeader); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := fp.Write(dbutils.EncodeBlockNumber(block.Block.Body.ExecutionPayload.BlockNumber)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
encoded, err := block.EncodeForStorage(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := fp.Write(utils.CompressSnappy(encoded)); err != nil {
|
||||
compressedEncoded := utils.CompressSnappy(encoded)
|
||||
if _, err := fp.Write(dbutils.EncodeBlockNumber(uint64(len(compressedEncoded)))); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := fp.Write(compressedEncoded); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
175
cl/persistence/block_saver_test.go
Normal file
175
cl/persistence/block_saver_test.go
Normal file
@ -0,0 +1,175 @@
|
||||
package persistence
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
|
||||
_ "embed"
|
||||
|
||||
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon/cl/clparams"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence/sql_migrations"
|
||||
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
|
||||
"github.com/ledgerwatch/erigon/cl/utils"
|
||||
"github.com/ledgerwatch/erigon/core/types"
|
||||
"github.com/spf13/afero"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockEngine struct {
|
||||
blocks map[uint64]*types.Block
|
||||
}
|
||||
|
||||
func newMockEngine() execution_client.ExecutionEngine {
|
||||
return &mockEngine{
|
||||
blocks: make(map[uint64]*types.Block),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockEngine) ForkChoiceUpdate(finalized libcommon.Hash, head libcommon.Hash) error {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (m *mockEngine) NewPayload(payload *cltypes.Eth1Block, beaconParentRoot *libcommon.Hash) (bool, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (m *mockEngine) SupportInsertion() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *mockEngine) InsertBlocks([]*types.Block) error {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (m *mockEngine) IsCanonicalHash(libcommon.Hash) (bool, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (m *mockEngine) Ready() (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (m *mockEngine) InsertBlock(b *types.Block) error {
|
||||
m.blocks[b.NumberU64()] = b
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockEngine) GetBodiesByRange(start, count uint64) ([]*types.RawBody, error) {
|
||||
bds := []*types.RawBody{}
|
||||
for i := start; i < start+count; i++ {
|
||||
|
||||
blk, ok := m.blocks[i]
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
bds = append(bds, blk.RawBody())
|
||||
}
|
||||
return bds, nil
|
||||
}
|
||||
|
||||
func (m *mockEngine) GetBodiesByHashes(hashes []libcommon.Hash) ([]*types.RawBody, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
//go:embed test_data/test_block.ssz_snappy
|
||||
var testBlock []byte
|
||||
|
||||
func getTestBlock() *cltypes.SignedBeaconBlock {
|
||||
enc, err := utils.DecompressSnappy(testBlock)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
bcBlock := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig)
|
||||
if err := bcBlock.DecodeSSZ(enc, int(clparams.CapellaVersion)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
bcBlock.Block.Slot = (clparams.MainnetBeaconConfig.CapellaForkEpoch + 1) * 32
|
||||
bcBlock.Block.Body.ExecutionPayload.Transactions = solid.NewTransactionsSSZFromTransactions(nil)
|
||||
bcBlock.Block.Body.ExecutionPayload.BlockNumber = 100
|
||||
bcBlock.Block.Body.ExecutionPayload.BlockHash = libcommon.HexToHash("0x78e6ce0d5a80c7416138af475d20c0a0a22124ae67b6dc5a0d0d0fe6f95e365d")
|
||||
return bcBlock
|
||||
}
|
||||
|
||||
func setupStore(t *testing.T, full bool) (BeaconChainDatabase, *sql.DB, execution_client.ExecutionEngine) {
|
||||
// Open an in-memory SQLite database for testing
|
||||
db, err := sql.Open("sqlite3", ":memory:")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to open database: %v", err)
|
||||
}
|
||||
|
||||
// Start a transaction for testing
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start transaction: %v", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Call ApplyMigrations with the test transaction
|
||||
err = sql_migrations.ApplyMigrations(context.Background(), tx)
|
||||
if err != nil {
|
||||
t.Fatalf("ApplyMigrations failed: %v", err)
|
||||
}
|
||||
tx.Commit()
|
||||
// Create an in-memory filesystem
|
||||
fs := afero.NewMemMapFs()
|
||||
engine := newMockEngine()
|
||||
return NewBeaconChainDatabaseFilesystem(fs, engine, full, &clparams.MainnetBeaconConfig, db), db, engine
|
||||
}
|
||||
|
||||
func TestBlockSaverStoreLoadPurgeFull(t *testing.T) {
|
||||
store, db, _ := setupStore(t, true)
|
||||
defer db.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
block := getTestBlock()
|
||||
require.NoError(t, store.WriteBlock(ctx, block, true))
|
||||
|
||||
blks, err := store.GetRange(context.Background(), block.Block.Slot, 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(blks), 1)
|
||||
|
||||
expectedRoot, err := block.HashSSZ()
|
||||
require.NoError(t, err)
|
||||
|
||||
haveRoot, err := blks[0].Data.HashSSZ()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expectedRoot, haveRoot)
|
||||
|
||||
require.NoError(t, store.PurgeRange(ctx, 0, 99999999999)) // THE PUURGE
|
||||
|
||||
newBlks, err := store.GetRange(context.Background(), block.Block.Slot, 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(newBlks), 0)
|
||||
}
|
||||
|
||||
func TestBlockSaverStoreAndLoadPartial(t *testing.T) {
|
||||
store, db, engine := setupStore(t, false)
|
||||
defer db.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
block := getTestBlock()
|
||||
require.NoError(t, store.WriteBlock(ctx, block, true))
|
||||
|
||||
eth1Block := block.Block.Body.ExecutionPayload
|
||||
header, err := eth1Block.RlpHeader()
|
||||
require.NoError(t, err)
|
||||
|
||||
engine.InsertBlock(types.NewBlock(header, nil, nil, nil, eth1Block.Body().Withdrawals))
|
||||
|
||||
blks, err := store.GetRange(context.Background(), block.Block.Slot, 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(blks), 1)
|
||||
|
||||
expectedRoot, err := block.HashSSZ()
|
||||
require.NoError(t, err)
|
||||
|
||||
haveRoot, err := blks[0].Data.HashSSZ()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expectedRoot, haveRoot)
|
||||
}
|
BIN
cl/persistence/test_data/test_block.ssz_snappy
Normal file
BIN
cl/persistence/test_data/test_block.ssz_snappy
Normal file
Binary file not shown.
@ -85,3 +85,14 @@ func (cc *ExecutionClientDirect) IsCanonicalHash(hash libcommon.Hash) (bool, err
|
||||
func (cc *ExecutionClientDirect) Ready() (bool, error) {
|
||||
return cc.chainRW.Ready()
|
||||
}
|
||||
|
||||
// GetBodiesByRange gets block bodies in given block range
|
||||
func (cc *ExecutionClientDirect) GetBodiesByRange(start, count uint64) ([]*types.RawBody, error) {
|
||||
return cc.chainRW.GetBodiesByRange(start, count), nil
|
||||
|
||||
}
|
||||
|
||||
// GetBodiesByHashes gets block bodies with given hashes
|
||||
func (cc *ExecutionClientDirect) GetBodiesByHashes(hashes []libcommon.Hash) ([]*types.RawBody, error) {
|
||||
return cc.chainRW.GetBodiesByHases(hashes), nil
|
||||
}
|
||||
|
@ -184,3 +184,43 @@ func (cc *ExecutionClientRpc) IsCanonicalHash(libcommon.Hash) (bool, error) {
|
||||
func (cc *ExecutionClientRpc) Ready() (bool, error) {
|
||||
return true, nil // Engine API is always ready
|
||||
}
|
||||
|
||||
// Range methods
|
||||
|
||||
// GetBodiesByRange gets block bodies in given block range
|
||||
func (cc *ExecutionClientRpc) GetBodiesByRange(start, count uint64) ([]*types.RawBody, error) {
|
||||
result := []*engine_types.ExecutionPayloadBodyV1{}
|
||||
|
||||
if err := cc.client.CallContext(cc.ctx, &result, rpc_helper.GetPayloadBodiesByRangeV1, hexutil.Uint64(start), hexutil.Uint64(count)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret := make([]*types.RawBody, len(result))
|
||||
for i := range result {
|
||||
ret[i] = &types.RawBody{
|
||||
Withdrawals: result[i].Withdrawals,
|
||||
}
|
||||
for _, txn := range result[i].Transactions {
|
||||
ret[i].Transactions = append(ret[i].Transactions, txn)
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// GetBodiesByHashes gets block bodies with given hashes
|
||||
func (cc *ExecutionClientRpc) GetBodiesByHashes(hashes []libcommon.Hash) ([]*types.RawBody, error) {
|
||||
result := []*engine_types.ExecutionPayloadBodyV1{}
|
||||
|
||||
if err := cc.client.CallContext(cc.ctx, &result, rpc_helper.GetPayloadBodiesByHashV1, hashes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret := make([]*types.RawBody, len(result))
|
||||
for i := range result {
|
||||
ret[i] = &types.RawBody{
|
||||
Withdrawals: result[i].Withdrawals,
|
||||
}
|
||||
for _, txn := range result[i].Transactions {
|
||||
ret[i].Transactions = append(ret[i].Transactions, txn)
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
@ -17,8 +17,9 @@ type ExecutionEngine interface {
|
||||
SupportInsertion() bool
|
||||
InsertBlocks([]*types.Block) error
|
||||
InsertBlock(*types.Block) error
|
||||
// GetBodiesByRange(start, count uint64) *types.RawBody TODO(Giulio2002): implement
|
||||
// GetBodiesByHashes([]) *types.RawBody TODO(Giulio2002): implement
|
||||
IsCanonicalHash(libcommon.Hash) (bool, error)
|
||||
Ready() (bool, error)
|
||||
// Range methods
|
||||
GetBodiesByRange(start, count uint64) ([]*types.RawBody, error)
|
||||
GetBodiesByHashes(hashes []libcommon.Hash) ([]*types.RawBody, error)
|
||||
}
|
||||
|
@ -7,3 +7,6 @@ const EngineNewPayloadV3 = "engine_newPayloadV3"
|
||||
const ForkChoiceUpdatedV1 = "engine_forkchoiceUpdatedV1"
|
||||
const ForkChoiceUpdatedV2 = "engine_forkchoiceUpdatedV2"
|
||||
const ForkChoiceUpdatedV3 = "engine_forkchoiceUpdatedV3"
|
||||
|
||||
const GetPayloadBodiesByHashV1 = "engine_getPayloadBodiesByHashV1"
|
||||
const GetPayloadBodiesByRangeV1 = "engine_getPayloadBodiesByRangeV1"
|
||||
|
@ -108,7 +108,7 @@ func (b *Blocks) Run(ctx *Context) error {
|
||||
return err
|
||||
}
|
||||
defer sqlDB.Close()
|
||||
beaconDB := persistence.NewbeaconChainDatabaseFilesystem(aferoFS, nil, false, beaconConfig, sqlDB)
|
||||
beaconDB := persistence.NewBeaconChainDatabaseFilesystem(aferoFS, nil, false, beaconConfig, sqlDB)
|
||||
for _, vv := range resp {
|
||||
err := beaconDB.WriteBlock(ctx, vv, true)
|
||||
if err != nil {
|
||||
@ -149,7 +149,7 @@ func (b *Epochs) Run(cctx *Context) error {
|
||||
return err
|
||||
}
|
||||
defer sqlDB.Close()
|
||||
beaconDB := persistence.NewbeaconChainDatabaseFilesystem(aferoFS, nil, false, beaconConfig, sqlDB)
|
||||
beaconDB := persistence.NewBeaconChainDatabaseFilesystem(aferoFS, nil, false, beaconConfig, sqlDB)
|
||||
|
||||
beacon := rpc.NewBeaconRpcP2P(ctx, s, beaconConfig, genesisConfig)
|
||||
rpcSource := persistence.NewBeaconRpcSource(beacon)
|
||||
|
@ -102,7 +102,7 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient,
|
||||
}
|
||||
}()
|
||||
}
|
||||
beaconDB := persistence.NewbeaconChainDatabaseFilesystem(afero.NewBasePathFs(dataDirFs, dirs.DataDir), engine, haveDatabaseConfig.FullBlocks, beaconConfig, db)
|
||||
beaconDB := persistence.NewBeaconChainDatabaseFilesystem(afero.NewBasePathFs(dataDirFs, dirs.DataDir), engine, haveDatabaseConfig.FullBlocks, beaconConfig, db)
|
||||
|
||||
if cfg.Active {
|
||||
apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, beaconDB, db)
|
||||
|
@ -36,11 +36,18 @@ var writerPool = sync.Pool{
|
||||
}
|
||||
|
||||
func EncodeAndWrite(w io.Writer, val ssz.Marshaler, prefix ...byte) error {
|
||||
enc := make([]byte, 0, val.EncodingSizeSSZ())
|
||||
var err error
|
||||
enc, err = val.EncodeSSZ(enc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// create prefix for length of packet
|
||||
lengthBuf := make([]byte, 10)
|
||||
vin := binary.PutUvarint(lengthBuf, uint64(val.EncodingSizeSSZ()))
|
||||
vin := binary.PutUvarint(lengthBuf, uint64(len(enc)))
|
||||
|
||||
// Create writer size
|
||||
wr := bufio.NewWriterSize(w, 10+val.EncodingSizeSSZ())
|
||||
wr := bufio.NewWriterSize(w, 10+len(enc))
|
||||
defer wr.Flush()
|
||||
// Write length of packet
|
||||
wr.Write(prefix)
|
||||
@ -53,12 +60,6 @@ func EncodeAndWrite(w io.Writer, val ssz.Marshaler, prefix ...byte) error {
|
||||
writerPool.Put(sw)
|
||||
}()
|
||||
// Marshall and snap it
|
||||
enc := make([]byte, 0, val.EncodingSizeSSZ())
|
||||
var err error
|
||||
enc, err = val.EncodeSSZ(enc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = sw.Write(enc)
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user