Request Missing Logs (#3173)

* add request for missing logs

* fix formatting

* add metric

* add test

* fix test
This commit is contained in:
Nishant Das 2019-08-12 16:29:57 +05:30 committed by Preston Van Loon
parent 06950907c8
commit 22df351e89
3 changed files with 123 additions and 13 deletions

View File

@ -35,11 +35,13 @@ func (w *Web3Service) ETH2GenesisTime() (uint64, *big.Int) {
// ProcessLog is the main method which handles the processing of all
// logs from the deposit contract on the ETH1.0 chain.
func (w *Web3Service) ProcessLog(depositLog gethTypes.Log) error {
w.processingLock.Lock()
defer w.processingLock.Unlock()
w.processingLock.RLock()
defer w.processingLock.RUnlock()
// Process logs according to their event signature.
if depositLog.Topics[0] == hashutil.HashKeccak256(depositEventSignature) {
w.ProcessDepositLog(depositLog)
if err := w.ProcessDepositLog(depositLog); err != nil {
return errors.Wrap(err, "Could not process deposit log")
}
if !w.chainStarted {
if depositLog.BlockHash == [32]byte{} {
return errors.New("got empty blockhash from powchain service")
@ -67,11 +69,10 @@ func (w *Web3Service) ProcessLog(depositLog gethTypes.Log) error {
// ProcessDepositLog processes the log which had been received from
// the ETH1.0 chain by trying to ascertain which participant deposited
// in the contract.
func (w *Web3Service) ProcessDepositLog(depositLog gethTypes.Log) {
func (w *Web3Service) ProcessDepositLog(depositLog gethTypes.Log) error {
pubkey, withdrawalCredentials, amount, signature, merkleTreeIndex, err := contracts.UnpackDepositLogData(depositLog.Data)
if err != nil {
log.Errorf("Could not unpack log: %v", err)
return
return errors.Wrap(err, "Could not unpack log")
}
// If we have already seen this Merkle index, skip processing the log.
// This can happen sometimes when we receive the same log twice from the
@ -79,7 +80,14 @@ func (w *Web3Service) ProcessDepositLog(depositLog gethTypes.Log) {
// with the same log twice, causing an inconsistent state root.
index := binary.LittleEndian.Uint64(merkleTreeIndex)
if int64(index) <= w.lastReceivedMerkleIndex {
return
return nil
}
if int64(index) != w.lastReceivedMerkleIndex+1 {
missedDepositLogsCount.Inc()
if err := w.requestMissingLogs(depositLog.BlockNumber, int64(index-1)); err != nil {
return errors.Wrap(err, "Could not get correct merkle index")
}
}
w.lastReceivedMerkleIndex = int64(index)
@ -95,19 +103,16 @@ func (w *Web3Service) ProcessDepositLog(depositLog gethTypes.Log) {
depositHash, err := ssz.HashTreeRoot(depositData)
if err != nil {
log.Errorf("Unable to determine hashed value of deposit %v", err)
return
return errors.Wrap(err, "Unable to determine hashed value of deposit")
}
if err := w.depositTrie.InsertIntoTrie(depositHash[:], int(index)); err != nil {
log.Errorf("Unable to insert deposit into trie %v", err)
return
return errors.Wrap(err, "Unable to insert deposit into trie")
}
proof, err := w.depositTrie.MerkleProof(int(index))
if err != nil {
log.Errorf("Unable to generate merkle proof for deposit %v", err)
return
return errors.Wrap(err, "Unable to generate merkle proof for deposit")
}
deposit := &ethpb.Deposit{
@ -154,6 +159,7 @@ func (w *Web3Service) ProcessDepositLog(depositLog gethTypes.Log) {
"merkleTreeIndex": index,
}).Info("Invalid deposit registered in deposit contract")
}
return nil
}
// ProcessChainStart processes the log which had been received from
@ -276,6 +282,39 @@ func (w *Web3Service) requestBatchedLogs() error {
return nil
}
// requestMissingLogs requests any logs that were missed by requesting from previous blocks
// until the current block(exclusive).
func (w *Web3Service) requestMissingLogs(blkNumber uint64, wantedIndex int64) error {
// We request from the last requested block till the current block(exclusive)
beforeCurrentBlk := big.NewInt(int64(blkNumber) - 1)
query := ethereum.FilterQuery{
Addresses: []common.Address{
w.depositContractAddress,
},
FromBlock: big.NewInt(0).Add(w.lastRequestedBlock, big.NewInt(1)),
ToBlock: beforeCurrentBlk,
}
logs, err := w.httpLogger.FilterLogs(w.ctx, query)
if err != nil {
return err
}
// Only process log slices which are larger than zero.
if len(logs) > 0 {
for _, log := range logs {
if err := w.ProcessLog(log); err != nil {
return errors.Wrap(err, "could not process log")
}
}
}
if w.lastReceivedMerkleIndex != wantedIndex {
return fmt.Errorf("despite requesting missing logs, latest index observed is not accurate. "+
"Wanted %d but got %d", wantedIndex, w.lastReceivedMerkleIndex)
}
return nil
}
// ChainStartDepositHashes returns the hashes of all the chainstart deposits
// stored in memory.
func (w *Web3Service) ChainStartDepositHashes() ([][]byte, error) {

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"io/ioutil"
"math/big"
"testing"
"time"
@ -378,3 +379,69 @@ func TestProcessETH2GenesisLog(t *testing.T) {
hook.Reset()
}
func TestWeb3ServiceProcessDepositLog_RequestMissedDeposits(t *testing.T) {
hook := logTest.NewGlobal()
testAcc, err := contracts.Setup()
if err != nil {
t.Fatalf("Unable to set up simulated backend %v", err)
}
web3Service, err := NewWeb3Service(context.Background(), &Web3ServiceConfig{
Endpoint: endpoint,
DepositContract: testAcc.ContractAddr,
Reader: &goodReader{},
Logger: &goodLogger{},
HTTPLogger: testAcc.Backend,
ContractBackend: testAcc.Backend,
BeaconDB: &db.BeaconDB{},
BlockFetcher: &goodFetcher{},
})
if err != nil {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
bConfig := params.MinimalSpecConfig()
bConfig.MinGenesisTime = 0
params.OverrideBeaconConfig(bConfig)
testAcc.Backend.Commit()
testAcc.Backend.AdjustTime(time.Duration(int64(time.Now().Nanosecond())))
depositsWanted := 10
deposits, _ := testutil.SetupInitialDeposits(t, uint64(depositsWanted))
for i := 0; i < depositsWanted; i++ {
data := deposits[i].Data
testAcc.TxOpts.Value = contracts.Amount32Eth()
testAcc.TxOpts.GasLimit = 1000000
if _, err := testAcc.Contract.Deposit(testAcc.TxOpts, data.PublicKey, data.WithdrawalCredentials, data.Signature); err != nil {
t.Fatalf("Could not deposit to deposit contract %v", err)
}
testAcc.Backend.Commit()
}
query := ethereum.FilterQuery{
Addresses: []common.Address{
web3Service.depositContractAddress,
},
}
logs, err := testAcc.Backend.FilterLogs(web3Service.ctx, query)
if err != nil {
t.Fatalf("Unable to retrieve logs %v", err)
}
logsToBeProcessed := append(logs[:depositsWanted-3], logs[depositsWanted-2:]...)
// we purposely miss processing the middle two logs so that the service, re-requests them
for _, log := range logsToBeProcessed {
if err := web3Service.ProcessLog(log); err != nil {
t.Fatal(err)
}
web3Service.lastRequestedBlock.Set(big.NewInt(int64(log.BlockNumber)))
}
if web3Service.lastReceivedMerkleIndex != int64(depositsWanted-1) {
t.Errorf("missing logs were not re-requested. Wanted Index %d but got %d", depositsWanted-1, web3Service.lastReceivedMerkleIndex)
}
hook.Reset()
}

View File

@ -38,6 +38,10 @@ var (
Name: "powchain_block_number",
Help: "The current block number in the proof-of-work chain",
})
missedDepositLogsCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "powchain_missed_deposit_logs",
Help: "The number of times a missed deposit log is detected",
})
)
// Reader defines a struct that can fetch latest header events from a web3 endpoint.