Switch to cbor (#1172)

* switch receipts to cbor

* switch receipts to cbor

* rpcdaemon to cache chainconfig

* rpcdaemon to cache chainconfig

* rpcdaemon to cache chainconfig

* rpcdaemon to cache chainconfig
This commit is contained in:
Alex Sharov 2020-10-02 19:51:20 +07:00 committed by GitHub
parent 6ccf57dace
commit 809d79b15f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 87 additions and 65 deletions

View File

@ -1679,7 +1679,6 @@ func zstd(chaindata string) error {
count, _ := c.Count() count, _ := c.Count()
blockNBytes := make([]byte, 8) blockNBytes := make([]byte, 8)
trainFrom := count - 2_000_000 trainFrom := count - 2_000_000
readerForRlp := bytes.NewReader(nil)
for blockN := trainFrom; blockN < count; blockN += 2_000_000 / 4_000 { for blockN := trainFrom; blockN < count; blockN += 2_000_000 / 4_000 {
binary.BigEndian.PutUint64(blockNBytes, blockN) binary.BigEndian.PutUint64(blockNBytes, blockN)
var v []byte var v []byte
@ -1688,9 +1687,8 @@ func zstd(chaindata string) error {
return err return err
} }
storageReceipts := []*types.ReceiptForStorage{} storageReceipts := types.Receipts{}
readerForRlp.Reset(v) err = cbor.Unmarshal(&storageReceipts, v)
err = rlp.Decode(readerForRlp, &storageReceipts)
check(err) check(err)
samples1 = append(samples1, v) samples1 = append(samples1, v)
@ -1869,16 +1867,10 @@ func benchRlp(chaindata string) error {
fmt.Printf("bucket: %s\n", bucket) fmt.Printf("bucket: %s\n", bucket)
c := tx.(ethdb.HasTx).Tx().Cursor(bucket) c := tx.(ethdb.HasTx).Tx().Cursor(bucket)
//total_rlp := 0
total_compress_rlp := 0
total_cbor := 0 total_cbor := 0
total_compress_cbor := 0 total_compress_cbor := 0
total = 0 total = 0
//var rlp_encode time.Duration
var rlp_decode time.Duration
var rlp_compress time.Duration
var cbor_encode time.Duration var cbor_encode time.Duration
var cbor_decode time.Duration var cbor_decode time.Duration
var cbor_decode2 time.Duration var cbor_decode2 time.Duration
@ -1890,8 +1882,6 @@ func benchRlp(chaindata string) error {
var samplesCbor [][]byte var samplesCbor [][]byte
readerForRlp := bytes.NewReader(nil)
count, _ := c.Count() count, _ := c.Count()
blockNBytes := make([]byte, 8) blockNBytes := make([]byte, 8)
trainFrom := count - 2_000_000 trainFrom := count - 2_000_000
@ -1903,14 +1893,10 @@ func benchRlp(chaindata string) error {
return err return err
} }
storageReceipts := []*types.ReceiptForStorage{} receipts := types.Receipts{}
readerForRlp.Reset(v) err = cbor.Unmarshal(&receipts, v)
err = rlp.Decode(readerForRlp, &storageReceipts)
check(err) check(err)
cbor.MustMarshal(&bufSlice, storageReceipts)
samplesCbor = append(samplesCbor, common.CopyBytes(bufSlice))
select { select {
default: default:
case <-logEvery.C: case <-logEvery.C:
@ -1930,14 +1916,11 @@ func benchRlp(chaindata string) error {
total += len(v) total += len(v)
blockNum := binary.BigEndian.Uint64(k) blockNum := binary.BigEndian.Uint64(k)
storageReceipts := make([]*types.ReceiptForStorage, 0, 1024) storageReceipts := types.Receipts{}
readerForRlp.Reset(v) err = cbor.Unmarshal(&storageReceipts, v)
t := time.Now()
err = rlp.Decode(readerForRlp, &storageReceipts)
rlp_decode += time.Since(t)
check(err) check(err)
t = time.Now() t := time.Now()
err = cbor.Marshal(&bufSlice, storageReceipts) err = cbor.Marshal(&bufSlice, storageReceipts)
cbor_encode += time.Since(t) cbor_encode += time.Since(t)
total_cbor += len(bufSlice) total_cbor += len(bufSlice)
@ -1959,10 +1942,10 @@ func benchRlp(chaindata string) error {
case <-logEvery.C: case <-logEvery.C:
totalf := float64(total) totalf := float64(total)
log.Info("Progress 8", "blockNum", blockNum, "before", common.StorageSize(total), log.Info("Progress 8", "blockNum", blockNum, "before", common.StorageSize(total),
"rlp_decode", rlp_decode, //"rlp_decode", rlp_decode,
"total_cbor", fmt.Sprintf("%.2f", float64(total_cbor)/totalf), "cbor_encode", cbor_encode, "cbor_decode", cbor_decode, "total_cbor", fmt.Sprintf("%.2f", float64(total_cbor)/totalf), "cbor_encode", cbor_encode, "cbor_decode", cbor_decode,
"cbor_decode2", cbor_decode2, "cbor_decode3", cbor_decode3, "cbor_decode2", cbor_decode2, "cbor_decode3", cbor_decode3,
"compress_rlp_ratio", fmt.Sprintf("%.2f", totalf/float64(total_compress_rlp)), "rlp_compress", rlp_compress, //"compress_rlp_ratio", fmt.Sprintf("%.2f", totalf/float64(total_compress_rlp)), "rlp_compress", rlp_compress,
"compress_cbor_ratio", fmt.Sprintf("%.2f", totalf/float64(total_compress_cbor)), "cbor_compress", cbor_compress, "compress_cbor_ratio", fmt.Sprintf("%.2f", totalf/float64(total_compress_cbor)), "cbor_compress", cbor_compress,
) )
} }

View File

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"github.com/ledgerwatch/turbo-geth/ethdb/cbor"
"math/big" "math/big"
"github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common"
@ -407,16 +408,11 @@ func ReadRawReceipts(db DatabaseReader, hash common.Hash, number uint64) types.R
if len(data) == 0 { if len(data) == 0 {
return nil return nil
} }
// Convert the receipts from their storage form to their internal representation receipts := types.Receipts{}
storageReceipts := []*types.ReceiptForStorage{} if err := cbor.Unmarshal(&receipts, data); err != nil {
if err := rlp.DecodeBytes(data, &storageReceipts); err != nil { log.Error("receipt unmarshal failed", "hash", hash, "err", err)
log.Error("Invalid receipt array RLP", "hash", hash, "err", err)
return nil return nil
} }
receipts := make(types.Receipts, len(storageReceipts))
for i, storageReceipt := range storageReceipts {
receipts[i] = (*types.Receipt)(storageReceipt)
}
return receipts return receipts
} }
@ -448,17 +444,14 @@ func ReadReceipts(db DatabaseReader, hash common.Hash, number uint64) types.Rece
// WriteReceipts stores all the transaction receipts belonging to a block. // WriteReceipts stores all the transaction receipts belonging to a block.
func WriteReceipts(db DatabaseWriter, hash common.Hash, number uint64, receipts types.Receipts) { func WriteReceipts(db DatabaseWriter, hash common.Hash, number uint64, receipts types.Receipts) {
// Convert the receipts into their storage form and serialize them newV := make([]byte, 0, 1024)
storageReceipts := make([]*types.ReceiptForStorage, len(receipts)) err := cbor.Marshal(&newV, receipts)
for i, receipt := range receipts {
storageReceipts[i] = (*types.ReceiptForStorage)(receipt)
}
bytes, err := rlp.EncodeToBytes(storageReceipts)
if err != nil { if err != nil {
log.Crit("Failed to encode block receipts", "err", err) log.Crit("Failed to encode block receipts", "err", err)
} }
// Store the flattened receipt slice // Store the flattened receipt slice
if err := db.Put(dbutils.BlockReceiptsPrefix, dbutils.BlockReceiptsKey(number, hash), bytes); err != nil { if err := db.Put(dbutils.BlockReceiptsPrefix, dbutils.BlockReceiptsKey(number, hash), newV); err != nil {
log.Crit("Failed to store block receipts", "err", err) log.Crit("Failed to store block receipts", "err", err)
} }
} }

View File

@ -31,7 +31,7 @@ import (
) )
// go:generate gencodec -type Receipt -field-override receiptMarshaling -out gen_receipt_json.go // go:generate gencodec -type Receipt -field-override receiptMarshaling -out gen_receipt_json.go
//go:generate codecgen -o receipt_codecgen_gen.go -r="ReceiptsForStorage|ReceiptForStorage|Receipts|Receipt|Log" -rt="codec" -nx=true -d=2 receipt.go log.go //go:generate codecgen -o receipt_codecgen_gen.go -r="Receipts|Receipt|Log" -rt="codec" -nx=true -d=2 receipt.go log.go
var ( var (
receiptStatusFailedRLP = []byte{} receiptStatusFailedRLP = []byte{}

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/ledgerwatch/turbo-geth/ethdb/cbor"
"os" "os"
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
@ -21,7 +22,6 @@ import (
"github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/params" "github.com/ledgerwatch/turbo-geth/params"
"github.com/ledgerwatch/turbo-geth/rlp"
) )
const ( const (
@ -201,18 +201,13 @@ func logProgress(prev, now uint64, batch ethdb.DbWithPendingMutations) uint64 {
} }
func appendReceipts(tx ethdb.DbWithPendingMutations, receipts types.Receipts, blockNumber uint64, blockHash common.Hash) error { func appendReceipts(tx ethdb.DbWithPendingMutations, receipts types.Receipts, blockNumber uint64, blockHash common.Hash) error {
// Convert the receipts into their storage form and serialize them newV := make([]byte, 0, 1024)
storageReceipts := make([]*types.ReceiptForStorage, len(receipts)) err := cbor.Marshal(&newV, receipts)
for i, receipt := range receipts { if err != nil {
storageReceipts[i] = (*types.ReceiptForStorage)(receipt)
}
var bytes []byte
var err error
if bytes, err = rlp.EncodeToBytes(storageReceipts); err != nil {
return fmt.Errorf("encode block receipts for block %d: %v", blockNumber, err) return fmt.Errorf("encode block receipts for block %d: %v", blockNumber, err)
} }
// Store the flattened receipt slice // Store the flattened receipt slice
if err = tx.Append(dbutils.BlockReceiptsPrefix, dbutils.BlockReceiptsKey(blockNumber, blockHash), bytes); err != nil { if err = tx.Append(dbutils.BlockReceiptsPrefix, dbutils.BlockReceiptsKey(blockNumber, blockHash), newV); err != nil {
return fmt.Errorf("writing receipts for block %d: %v", blockNumber, err) return fmt.Errorf("writing receipts for block %d: %v", blockNumber, err)
} }
return nil return nil

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"github.com/ledgerwatch/turbo-geth/ethdb/cbor"
"runtime" "runtime"
"sort" "sort"
"time" "time"
@ -16,7 +17,6 @@ import (
"github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/ethdb/bitmapdb" "github.com/ledgerwatch/turbo-geth/ethdb/bitmapdb"
"github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/rlp"
) )
const ( const (
@ -125,13 +125,12 @@ func promoteLogIndex(db ethdb.DbWithPendingMutations, start uint64, quit <-chan
} }
} }
// Convert the receipts from their storage form to their internal representation receipts := types.Receipts{}
storageReceipts := []*types.ReceiptForStorage{} if err := cbor.Unmarshal(&receipts, v); err != nil {
if err := rlp.DecodeBytes(v, &storageReceipts); err != nil { return fmt.Errorf("receipt unmarshal failed: %w, blocl=%d", err, blockNum)
return fmt.Errorf("invalid receipt array RLP: %w, blocl=%d", err, blockNum)
} }
for _, receipt := range storageReceipts { for _, receipt := range receipts {
for _, log := range receipt.Logs { for _, log := range receipt.Logs {
for _, topic := range log.Topics { for _, topic := range log.Topics {
topicStr := string(topic.Bytes()) topicStr := string(topic.Bytes())
@ -210,14 +209,13 @@ func unwindLogIndex(db ethdb.DbWithPendingMutations, from, to uint64, quitCh <-c
if err := common.Stopped(quitCh); err != nil { if err := common.Stopped(quitCh); err != nil {
return err return err
} }
// Convert the receipts from their storage form to their internal representation receipts := types.Receipts{}
storageReceipts := []*types.ReceiptForStorage{} if err := cbor.Unmarshal(&receipts, v); err != nil {
if err := rlp.DecodeBytes(v, &storageReceipts); err != nil { return fmt.Errorf("receipt unmarshal failed: %w, k=%x", err, k)
return fmt.Errorf("invalid receipt array RLP: %w, k=%x", err, k)
} }
for _, storageReceipt := range storageReceipts { for _, receipt := range receipts {
for _, log := range storageReceipt.Logs { for _, log := range receipt.Logs {
for _, topic := range log.Topics { for _, topic := range log.Topics {
topics[string(topic.Bytes())] = true topics[string(topic.Bytes())] = true
} }

View File

@ -2,7 +2,6 @@ package migrations
import ( import (
"fmt" "fmt"
"github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/common/etl" "github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages" "github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"

View File

@ -63,6 +63,7 @@ var migrations = []Migration{
dupSortIH, dupSortIH,
clearIndices, clearIndices,
resetIHBucketToRecoverDB, resetIHBucketToRecoverDB,
receiptsCborEncode,
} }
type Migration struct { type Migration struct {

53
migrations/receipts.go Normal file
View File

@ -0,0 +1,53 @@
package migrations
import (
"encoding/binary"
"fmt"
"runtime"
"time"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/ethdb/cbor"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/rlp"
)
var receiptsCborEncode = Migration{
Name: "receipts_cbor_encode",
Up: func(db ethdb.Database, datadir string, OnLoadCommit etl.LoadCommitHandler) error {
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
buf := make([]byte, 0, 100_000)
if err := db.Walk(dbutils.BlockReceiptsPrefix, nil, 0, func(k, v []byte) (bool, error) {
blockNum := binary.BigEndian.Uint64(k[:8])
select {
default:
case <-logEvery.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info("Migration progress", "blockNum", blockNum, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys))
}
// Convert the receipts from their storage form to their internal representation
storageReceipts := []*types.ReceiptForStorage{}
if err := rlp.DecodeBytes(v, &storageReceipts); err != nil {
return false, fmt.Errorf("invalid receipt array RLP: %w, k=%x", err, k)
}
buf = buf[:0]
if err := cbor.Marshal(&buf, storageReceipts); err != nil {
return false, err
}
return true, db.Put(dbutils.BlockReceiptsPrefix, common.CopyBytes(k), common.CopyBytes(buf))
}); err != nil {
return err
}
return OnLoadCommit(db, nil, true)
},
}