mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-03 17:44:29 +00:00
068463dff4
* Store transactions individually * Store transactions individually * save progress * checkIndex * merge
179 lines
4.9 KiB
Go
179 lines
4.9 KiB
Go
package migrations
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/golang/snappy"
|
|
"github.com/ledgerwatch/turbo-geth/common/dbutils"
|
|
"github.com/ledgerwatch/turbo-geth/common/debug"
|
|
"github.com/ledgerwatch/turbo-geth/common/etl"
|
|
"github.com/ledgerwatch/turbo-geth/core/types"
|
|
"github.com/ledgerwatch/turbo-geth/ethdb"
|
|
"github.com/ledgerwatch/turbo-geth/log"
|
|
"github.com/ledgerwatch/turbo-geth/rlp"
|
|
)
|
|
|
|
var transactionsTable = Migration{
|
|
Name: "tx_table_4",
|
|
Up: func(db ethdb.Database, tmpdir string, progress []byte, CommitProgress etl.LoadCommitHandler) (err error) {
|
|
logEvery := time.NewTicker(30 * time.Second)
|
|
defer logEvery.Stop()
|
|
logPrefix := "tx_table"
|
|
|
|
decompressBlockBody := func(compressed []byte) ([]byte, error) {
|
|
if !debug.IsBlockCompressionEnabled() || len(compressed) == 0 {
|
|
return compressed, nil
|
|
}
|
|
|
|
var bodyRlp []byte
|
|
bodyRlp, err = snappy.Decode(nil, compressed)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("err on decode block: %w", err)
|
|
}
|
|
return bodyRlp, nil
|
|
}
|
|
const loadStep = "load"
|
|
reader := bytes.NewReader(nil)
|
|
buf := bytes.NewBuffer(make([]byte, 4096))
|
|
body := new(types.Body)
|
|
newK := make([]byte, 8)
|
|
|
|
collectorB, err1 := etl.NewCollectorFromFiles(tmpdir + "1") // B - stands for blocks
|
|
if err1 != nil {
|
|
return err1
|
|
}
|
|
collectorT, err1 := etl.NewCollectorFromFiles(tmpdir + "2") // T - stands for transactions
|
|
if err1 != nil {
|
|
return err1
|
|
}
|
|
switch string(progress) {
|
|
case "":
|
|
// can't use files if progress field not set, clear them
|
|
if collectorB != nil {
|
|
collectorB.Close(logPrefix)
|
|
collectorB = nil
|
|
}
|
|
|
|
if collectorT != nil {
|
|
collectorT.Close(logPrefix)
|
|
collectorT = nil
|
|
}
|
|
case loadStep:
|
|
if collectorB == nil || collectorT == nil {
|
|
return ErrMigrationETLFilesDeleted
|
|
}
|
|
defer func() {
|
|
// don't clean if error or panic happened
|
|
if err != nil {
|
|
return
|
|
}
|
|
if rec := recover(); rec != nil {
|
|
panic(rec)
|
|
}
|
|
collectorB.Close(logPrefix)
|
|
collectorT.Close(logPrefix)
|
|
}()
|
|
goto LoadStep
|
|
}
|
|
|
|
collectorB = etl.NewCriticalCollector(tmpdir+"1", etl.NewSortableBuffer(etl.BufferOptimalSize*4))
|
|
collectorT = etl.NewCriticalCollector(tmpdir+"2", etl.NewSortableBuffer(etl.BufferOptimalSize*4))
|
|
defer func() {
|
|
// don't clean if error or panic happened
|
|
if err != nil {
|
|
return
|
|
}
|
|
if rec := recover(); rec != nil {
|
|
panic(rec)
|
|
}
|
|
collectorB.Close(logPrefix)
|
|
collectorT.Close(logPrefix)
|
|
}()
|
|
|
|
if err = db.Walk(dbutils.BlockBodyPrefix, nil, 0, func(k, v []byte) (bool, error) {
|
|
select {
|
|
default:
|
|
case <-logEvery.C:
|
|
blockNum := binary.BigEndian.Uint64(k[:8])
|
|
log.Info(fmt.Sprintf("[%s] Progress2", logPrefix), "blockNum", blockNum)
|
|
}
|
|
// don't need canonical check
|
|
|
|
var bodyRlp []byte
|
|
bodyRlp, err = decompressBlockBody(v)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
reader.Reset(bodyRlp)
|
|
if err = rlp.Decode(reader, body); err != nil {
|
|
return false, fmt.Errorf("[%s]: invalid block body RLP: %w", logPrefix, err)
|
|
}
|
|
|
|
txIds := make([]uint64, len(body.Transactions))
|
|
var baseTxId uint64
|
|
baseTxId, err = db.Sequence(dbutils.EthTx, uint64(len(body.Transactions)))
|
|
if err != nil {
|
|
return false, nil
|
|
}
|
|
|
|
txId := baseTxId
|
|
for i, txn := range body.Transactions {
|
|
binary.BigEndian.PutUint64(newK, txId)
|
|
txIds[i] = txId
|
|
txId++
|
|
|
|
buf.Reset()
|
|
if err = rlp.Encode(buf, txn); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if err = collectorT.Collect(newK, buf.Bytes()); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
buf.Reset()
|
|
if err = rlp.Encode(buf, types.BodyForStorage{
|
|
BaseTxId: baseTxId,
|
|
TxAmount: uint32(len(body.Transactions)),
|
|
Uncles: body.Uncles,
|
|
}); err != nil {
|
|
return false, err
|
|
}
|
|
if err = collectorB.Collect(k, buf.Bytes()); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = db.(ethdb.BucketsMigrator).ClearBuckets(dbutils.EthTx, dbutils.BlockBodyPrefix); err != nil {
|
|
return fmt.Errorf("clearing the receipt bucket: %w", err)
|
|
}
|
|
|
|
// Commit clearing of the bucket - freelist should now be written to the database
|
|
if err = CommitProgress(db, []byte(loadStep), false); err != nil {
|
|
return fmt.Errorf("committing the removal of receipt table: %w", err)
|
|
}
|
|
|
|
LoadStep:
|
|
// Now transaction would have been re-opened, and we should be re-using the space
|
|
if err = collectorT.Load(logPrefix, db, dbutils.EthTx, etl.IdentityLoadFunc, etl.TransformArgs{
|
|
OnLoadCommit: CommitProgress,
|
|
}); err != nil {
|
|
return fmt.Errorf("loading the transformed data back into the eth_tx table: %w", err)
|
|
}
|
|
if err = collectorB.Load(logPrefix, db, dbutils.BlockBodyPrefix, etl.IdentityLoadFunc, etl.TransformArgs{
|
|
OnLoadCommit: CommitProgress,
|
|
}); err != nil {
|
|
return fmt.Errorf("loading the transformed data back into the bodies table: %w", err)
|
|
}
|
|
return nil
|
|
},
|
|
}
|