kv_remote: server to support thread-safe multi-streams per 1 tx (#6402)

This commit is contained in:
Alex Sharov 2022-12-24 13:11:15 +07:00 committed by GitHub
parent d2bda25b29
commit ade933be6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 129 additions and 55 deletions

View File

@ -15,6 +15,7 @@ package lightclient
import (
"context"
"errors"
"runtime"
"time"
@ -206,12 +207,18 @@ func (l *LightClient) Start() {
case <-logPeers.C:
peers, err := l.rpc.Peers()
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Warn("could not read peers", "err", err)
continue
}
log.Info("[LightClient] P2P", "peers", peers)
case <-updateStatusSentinel.C:
if err := l.updateStatus(); err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Error("Could not update sentinel status", "err", err)
return
}

View File

@ -13,7 +13,6 @@ import (
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/core/state/temporal"
"github.com/ledgerwatch/log/v3"
@ -132,7 +131,7 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) (t
}
if api.historyV3(tx) {
return api.getLogsV3(ctx, tx, begin, end, crit)
return api.getLogsV3(ctx, tx.(kv.TemporalTx), begin, end, crit)
}
blockNumbers := bitmapdb.NewBitmap()
@ -276,7 +275,7 @@ func getTopicsBitmap(c kv.Tx, topics [][]common.Hash, from, to uint32) (*roaring
return result, nil
}
func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64, crit filters.FilterCriteria) ([]*types.Log, error) {
func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.TemporalTx, begin, end uint64, crit filters.FilterCriteria) ([]*types.Log, error) {
logs := []*types.Log{}
var fromTxNum, toTxNum uint64
@ -295,10 +294,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
txNumbers := roaring64.New()
txNumbers.AddRange(fromTxNum, toTxNum) // [min,max)
ac := api._agg.MakeContext()
ac.SetTx(tx)
topicsBitmap, err := getTopicsBitmapV3(ac, tx, crit.Topics, fromTxNum, toTxNum)
topicsBitmap, err := getTopicsBitmapV3(tx, crit.Topics, fromTxNum, toTxNum)
if err != nil {
return nil, err
}
@ -310,7 +306,10 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
var addrBitmap *roaring64.Bitmap
for _, addr := range crit.Addresses {
var bitmapForORing roaring64.Bitmap
it := ac.LogAddrIterator(addr.Bytes(), fromTxNum, toTxNum, tx)
it, err := tx.IndexRange(temporal.LogAddr, addr.Bytes(), fromTxNum, toTxNum)
if err != nil {
return nil, err
}
for it.HasNext() {
n, err := it.NextBatch()
if err != nil {
@ -338,8 +337,9 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
var signer *types.Signer
var rules *params.Rules
var skipAnalysis bool
stateReader := state.NewHistoryReaderV3(ac)
stateReader := state.NewHistoryReaderV3()
stateReader.SetTx(tx)
//stateReader.SetAc(ac)
ibs := state.New(stateReader)
//stateReader.SetTrace(true)
@ -425,10 +425,13 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
gp := new(core.GasPool).AddGas(msg.Gas())
_, err = core.ApplyMessage(evm, msg, gp, true /* refunds */, false /* gasBailout */)
if err != nil {
return nil, fmt.Errorf("%w: blockNum=%d, txNum=%d", err, blockNum, txNum)
return nil, fmt.Errorf("%w: blockNum=%d, txNum=%d, %s", err, blockNum, txNum, ibs.Error())
}
rawLogs := ibs.GetLogs(txHash)
var logIndex uint
//TODO: logIndex within the block! no way to calc it now
logIndex := uint(0)
for _, log := range rawLogs {
log.Index = logIndex
logIndex++
@ -458,26 +461,21 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.Tx, begin, end uint64,
// {{}, {B}} matches any topic in first position AND B in second position
// {{A}, {B}} matches topic A in first position AND B in second position
// {{A, B}, {C, D}} matches topic (A OR B) in first position AND (C OR D) in second position
func getTopicsBitmapV3(ac *libstate.Aggregator22Context, tx kv.Tx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) {
func getTopicsBitmapV3(tx kv.TemporalTx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) {
var result *roaring64.Bitmap
for _, sub := range topics {
var bitmapForORing roaring64.Bitmap
for _, topic := range sub {
if ttx, casted := tx.(kv.TemporalTx); casted {
it, err := ttx.InvertedIndexRange(temporal.LogTopic, topic.Bytes(), from, to)
it, err := tx.IndexRange(temporal.LogTopic, topic.Bytes(), from, to)
if err != nil {
return nil, err
}
for it.HasNext() {
n, err := it.NextBatch()
if err != nil {
return nil, err
}
for it.HasNext() {
n, err := it.NextBatch()
if err != nil {
return nil, err
}
bitmapForORing.AddMany(n)
}
} else {
it := ac.LogTopicIterator(topic.Bytes(), from, to, tx)
bitmapForORing.Or(it.ToBitamp())
bitmapForORing.AddMany(n)
}
}

View File

@ -624,8 +624,9 @@ func (api *TraceAPIImpl) filterV3(ctx context.Context, dbtx kv.Tx, fromBlock, to
var lastHeader *types.Header
var lastSigner *types.Signer
var lastRules *params.Rules
stateReader := state.NewHistoryReaderV3(ac)
stateReader := state.NewHistoryReaderV3()
stateReader.SetTx(dbtx)
stateReader.SetAc(ac)
noop := state.NewNoopWriter()
for it.HasNext() {
txNum := it.Next()

View File

@ -59,7 +59,10 @@ func (fw *FillWorker) FillAccounts(plainStateCollector *etl.Collector) {
it := fw.as.IterateAccountsHistory(fw.txNum)
value := make([]byte, 1024)
for it.HasNext() {
key, val := it.Next()
key, val, err := it.Next()
if err != nil {
panic(err)
}
if len(val) > 0 {
var a accounts.Account
a.Reset()
@ -109,7 +112,10 @@ func (fw *FillWorker) FillStorage(plainStateCollector *etl.Collector) {
var compositeKey = make([]byte, length.Addr+length.Incarnation+length.Hash)
binary.BigEndian.PutUint64(compositeKey[20:], state.FirstContractIncarnation)
for it.HasNext() {
key, val := it.Next()
key, val, err := it.Next()
if err != nil {
panic(err)
}
copy(compositeKey[:20], key[:20])
copy(compositeKey[20+8:], key[20:])
if len(val) > 0 {
@ -131,7 +137,10 @@ func (fw *FillWorker) FillCode(codeCollector, plainContractCollector *etl.Collec
binary.BigEndian.PutUint64(compositeKey[length.Addr:], state.FirstContractIncarnation)
for it.HasNext() {
key, val := it.Next()
key, val, err := it.Next()
if err != nil {
panic(err)
}
copy(compositeKey, key)
if len(val) > 0 {
@ -157,7 +166,11 @@ func (fw *FillWorker) FillCode(codeCollector, plainContractCollector *etl.Collec
func (sw *ScanWorker) BitmapAccounts() error {
it := sw.as.IterateAccountsTxs()
for it.HasNext() {
sw.bitmap.Add(it.Next())
v, err := it.Next()
if err != nil {
return err
}
sw.bitmap.Add(v)
}
return nil
}
@ -165,7 +178,11 @@ func (sw *ScanWorker) BitmapAccounts() error {
func (sw *ScanWorker) BitmapStorage() error {
it := sw.as.IterateStorageTxs()
for it.HasNext() {
sw.bitmap.Add(it.Next())
v, err := it.Next()
if err != nil {
return err
}
sw.bitmap.Add(v)
}
return nil
}
@ -173,7 +190,11 @@ func (sw *ScanWorker) BitmapStorage() error {
func (sw *ScanWorker) BitmapCode() error {
it := sw.as.IterateCodeTxs()
for it.HasNext() {
sw.bitmap.Add(it.Next())
v, err := it.Next()
if err != nil {
return err
}
sw.bitmap.Add(v)
}
return nil
}

View File

@ -20,10 +20,13 @@ type HistoryReaderV3 struct {
ttx kv.TemporalTx
}
func NewHistoryReaderV3(ac *libstate.Aggregator22Context) *HistoryReaderV3 {
return &HistoryReaderV3{ac: ac}
func NewHistoryReaderV3() *HistoryReaderV3 {
return &HistoryReaderV3{}
}
func (hr *HistoryReaderV3) SetAc(ac *libstate.Aggregator22Context) {
hr.ac = ac
}
func (hr *HistoryReaderV3) SetTx(tx kv.Tx) {
if ttx, casted := tx.(kv.TemporalTx); casted {
hr.ttx = ttx
@ -38,7 +41,7 @@ func (hr *HistoryReaderV3) ReadAccountData(address common.Address) (*accounts.Ac
var ok bool
var err error
if hr.ttx != nil {
enc, ok, err = hr.ttx.HistoryGetNoState(temporal.Accounts, address.Bytes(), hr.txNum)
enc, ok, err = hr.ttx.HistoryGet(temporal.Accounts, address.Bytes(), hr.txNum)
} else {
enc, ok, err = hr.ac.ReadAccountDataNoStateWithRecent(address.Bytes(), hr.txNum)
}
@ -65,6 +68,7 @@ func (hr *HistoryReaderV3) ReadAccountData(address common.Address) (*accounts.Ac
if err != nil {
return nil, err
}
if len(enc) == 0 {
if hr.trace {
fmt.Printf("ReadAccountData [%x] => []\n", address)
@ -87,7 +91,7 @@ func (hr *HistoryReaderV3) ReadAccountStorage(address common.Address, incarnatio
var ok bool
var err error
if hr.ttx != nil {
enc, ok, err = hr.ttx.HistoryGetNoState(temporal.Storage, append(address.Bytes(), key.Bytes()...), hr.txNum)
enc, ok, err = hr.ttx.HistoryGet(temporal.Storage, append(address.Bytes(), key.Bytes()...), hr.txNum)
} else {
enc, ok, err = hr.ac.ReadAccountStorageNoStateWithRecent(address.Bytes(), key.Bytes(), hr.txNum)
}
@ -122,7 +126,7 @@ func (hr *HistoryReaderV3) ReadAccountCode(address common.Address, incarnation u
var ok bool
var err error
if hr.ttx != nil {
enc, ok, err = hr.ttx.HistoryGetNoState(temporal.Code, address.Bytes(), hr.txNum)
enc, ok, err = hr.ttx.HistoryGet(temporal.Code, address.Bytes(), hr.txNum)
} else {
enc, ok, err = hr.ac.ReadAccountCodeNoStateWithRecent(address.Bytes(), hr.txNum)
}
@ -147,7 +151,7 @@ func (hr *HistoryReaderV3) ReadAccountCodeSize(address common.Address, incarnati
var size int
var err error
if hr.ttx != nil {
enc, ok, err = hr.ttx.HistoryGetNoState(temporal.Code, address.Bytes(), hr.txNum)
enc, ok, err = hr.ttx.HistoryGet(temporal.Code, address.Bytes(), hr.txNum)
} else {
enc, ok, err = hr.ac.ReadAccountCodeNoStateWithRecent(address.Bytes(), hr.txNum)
}

View File

@ -2,6 +2,7 @@ package historyv2read
import (
"encoding/binary"
"fmt"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/kv"
@ -15,13 +16,14 @@ const DefaultIncarnation = uint64(1)
func GetAsOfV3(tx kv.TemporalTx, storage bool, key []byte, timestamp uint64, histV3 bool) (v []byte, err error) {
var ok bool
if storage {
v, ok, err = tx.HistoryGetNoState(temporal.Storage, key, timestamp)
v, ok, err = tx.HistoryGet(temporal.Storage, key, timestamp)
} else {
v, ok, err = tx.HistoryGetNoState(temporal.Accounts, key, timestamp)
v, ok, err = tx.HistoryGet(temporal.Accounts, key, timestamp)
}
if err != nil {
return nil, err
}
fmt.Printf("GetAsOfV3: %x, %d\n", key, timestamp)
if !ok {
return tx.GetOne(kv.PlainState, key)
}

View File

@ -11,7 +11,7 @@ import (
"github.com/ledgerwatch/erigon-lib/state"
)
//Naming:
//Variables Naming:
// ts - TimeStamp
// tx - Database Transaction
// txn - Ethereum Transaction (and TxNum - is also number of Etherum Transaction)
@ -20,6 +20,13 @@ import (
// k - key
// v - value
//Methods Naming:
// Get: exact match of criterias
// Range: [from, to)
// Each: [from, INF)
// Prefix: Has(k, prefix)
// Amount: [from, INF) AND maximum N records
type DB struct {
kv.RwDB
agg *state.Aggregator22
@ -37,6 +44,7 @@ func (db *DB) BeginTemporalRo(ctx context.Context) (kv.TemporalTx, error) {
tx := &Tx{Tx: kvTx, hitoryV3: db.hitoryV3}
if db.hitoryV3 {
tx.agg = db.agg.MakeContext()
tx.agg.SetTx(kvTx)
} else {
tx.accHistoryC, _ = tx.Cursor(kv.AccountsHistory)
tx.storageHistoryC, _ = tx.Cursor(kv.StorageHistory)
@ -76,7 +84,23 @@ type Tx struct {
accChangesC, storageChangesC kv.CursorDupSort
//HistoryV3 fields
hitoryV3 bool
hitoryV3 bool
resourcesToClose []kv.Closer
}
func (tx *Tx) Rollback() {
for _, closer := range tx.resourcesToClose {
closer.Close()
}
tx.agg.Close()
tx.Tx.Rollback()
}
func (tx *Tx) Commit() error {
for _, closer := range tx.resourcesToClose {
closer.Close()
}
return tx.Tx.Commit()
}
const (
@ -92,7 +116,7 @@ const (
TracesTo kv.InvertedIdx = "TracesTo"
)
func (tx *Tx) HistoryGetNoState(name kv.History, key []byte, ts uint64) (v []byte, ok bool, err error) {
func (tx *Tx) HistoryGet(name kv.History, key []byte, ts uint64) (v []byte, ok bool, err error) {
if tx.hitoryV3 {
switch name {
case Accounts:
@ -131,20 +155,24 @@ type Cursor struct {
}
// [fromTs, toTs)
func (tx *Tx) InvertedIndexRange(name kv.InvertedIdx, key []byte, fromTs, toTs uint64) (timestamps kv.Iter[uint64], err error) {
func (tx *Tx) IndexRange(name kv.InvertedIdx, key []byte, fromTs, toTs uint64) (timestamps kv.UnaryStream[uint64], err error) {
if tx.hitoryV3 {
switch name {
case LogTopic:
t := tx.agg.LogTopicIterator(key, fromTs, toTs, tx)
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
case LogAddr:
t := tx.agg.LogAddrIterator(key, fromTs, toTs, tx)
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
case TracesFrom:
t := tx.agg.TraceFromIterator(key, fromTs, toTs, tx)
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
case TracesTo:
t := tx.agg.TraceToIterator(key, fromTs, toTs, tx)
tx.resourcesToClose = append(tx.resourcesToClose, t)
return t, nil
default:
panic(fmt.Sprintf("unexpected: %s", name))
@ -167,6 +195,6 @@ func (tx *Tx) InvertedIndexRange(name kv.InvertedIdx, key []byte, fromTs, toTs u
if err != nil {
return nil, err
}
return kv.IterFromArray(bm.ToArray()), nil
return kv.StreamArray(bm.ToArray()), nil
}
}

View File

@ -286,6 +286,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
if config.HistoryV3 {
backend.chainDB = temporal.New(backend.chainDB, agg)
chainKv = backend.chainDB
}
kvRPC := remotedbserver.NewKvServer(ctx, chainKv, allSnapshots, agg)

View File

@ -464,10 +464,10 @@ Loop:
if err = batch.Commit(); err != nil {
return err
}
if err = s.Update(tx, stageProgress); err != nil {
return err
}
if !useExternalTx {
if err = s.Update(tx, stageProgress); err != nil {
return err
}
if err = tx.Commit(); err != nil {
return err
}
@ -502,7 +502,7 @@ Loop:
_, err = rawdb.IncrementStateVersion(tx)
if err != nil {
log.Error("writing plain state version", "err", err)
return fmt.Errorf("writing plain state version: %w", err)
}
if !useExternalTx {

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.18
require (
github.com/ledgerwatch/erigon-lib v0.0.0-20221222181157-afe3703d2830
github.com/ledgerwatch/erigon-lib v0.0.0-20221224050815-338bc497b314
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221223003841-487873d31492
github.com/ledgerwatch/log/v3 v3.6.0
github.com/ledgerwatch/secp256k1 v1.0.0

4
go.sum
View File

@ -561,8 +561,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20221222181157-afe3703d2830 h1:h3h0lrCTHd0U/PWQFbLoLWJ43jAVLhnemvn6J5VFBVU=
github.com/ledgerwatch/erigon-lib v0.0.0-20221222181157-afe3703d2830/go.mod h1:LyB6kRLWw+SyFJq1DFSiK+HB0hhYUoKg/dlR1xKPZ/g=
github.com/ledgerwatch/erigon-lib v0.0.0-20221224050815-338bc497b314 h1:bAN9oPyH3l9a0G7w5DfXa6uQLP4ktfFJdLz02DEXgsM=
github.com/ledgerwatch/erigon-lib v0.0.0-20221224050815-338bc497b314/go.mod h1:sA6VSmEch99WjPSMNlwQ9uElzIVKbDbjWudDkgjARkk=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221223003841-487873d31492 h1:SSYvbAzdreVrXdy8z8A92ug36c7zsGQLzXFrSiw92Zc=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221223003841-487873d31492/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.6.0 h1:JBUSK1epPyutUrz7KYDTcJtQLEHnehECRpKbM1ugy5M=

View File

@ -17,10 +17,13 @@
package node
import (
"context"
"errors"
"net"
"net/http"
"time"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/rpc/rpccfg"
"github.com/ledgerwatch/log/v3"
@ -48,7 +51,7 @@ func StartHTTPEndpoint(endpoint string, timeouts rpccfg.HTTPTimeouts, handler ht
}
go func() {
serveErr := httpSrv.Serve(listener)
if serveErr != nil {
if serveErr != nil && !errors.Is(serveErr, context.Canceled) && !errors.Is(serveErr, libcommon.ErrStopped) {
log.Warn("Failed to serve http endpoint", "err", serveErr)
}
}()

View File

@ -120,12 +120,14 @@ func CreateHistoryStateReader(tx kv.Tx, blockNumber, txnIndex uint64, agg *state
}
aggCtx := agg.MakeContext()
aggCtx.SetTx(tx)
r := state.NewHistoryReaderV3(aggCtx)
r := state.NewHistoryReaderV3()
r.SetTx(tx)
r.SetAc(aggCtx)
minTxNum, err := rawdb.TxNums.Min(tx, blockNumber)
if err != nil {
return nil, err
}
fmt.Printf("hist reader v3: bn=%d, txNum=%d\n", tx, blockNumber)
r.SetTxNum(minTxNum + txnIndex)
return r, nil
}

View File

@ -176,7 +176,13 @@ func (back *RemoteBlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter
if b == nil {
return nil, nil
}
return b.Transactions[1+i], nil
if i < 0 {
return nil, nil
}
if len(b.Transactions) <= i {
return nil, nil
}
return b.Transactions[i], nil
}
func (back *RemoteBlockReader) BlockWithSenders(ctx context.Context, _ kv.Getter, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error) {

View File

@ -651,8 +651,9 @@ func (ms *MockSentry) NewHistoricalStateReader(blockNum uint64, tx kv.Tx) state.
if ms.HistoryV3 {
aggCtx := ms.agg.MakeContext()
aggCtx.SetTx(tx)
r := state.NewHistoryReaderV3(aggCtx)
r := state.NewHistoryReaderV3()
r.SetTx(tx)
r.SetAc(aggCtx)
minTxNum, err := rawdb.TxNums.Min(tx, blockNum)
if err != nil {
panic(err)