[download-v2] Use Clique instead of EtHash for clique networks (#1832)

* Use Clique instead of EtHash for clique networks

* Start tx fetcher, enable GetPooledTx in sentry

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2021-04-28 14:44:29 +01:00 committed by GitHub
parent 03beaf4df7
commit 6cbfff354d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 12 deletions

View File

@ -687,6 +687,8 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent
msgcode = eth.ReceiptsMsg
case proto_sentry.MessageId_PooledTransactions:
msgcode = eth.PooledTransactionsMsg
case proto_sentry.MessageId_GetPooledTransactions:
msgcode = eth.GetPooledTransactionsMsg
default:
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageById not implemented for message Id: %s", inreq.Data.Id)
}

View File

@ -36,7 +36,7 @@ func NewTxPoolServer(sentries []proto_sentry.SentryClient, txPool *core.TxPool)
func (tp *TxPoolServer) newPooledTransactionHashes(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error {
var query eth.NewPooledTransactionHashesPacket
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding GetBlockHeader: %v, data: %x", err, inreq.Data)
return fmt.Errorf("decoding NewPooledTransactionHashesPacket: %v, data: %x", err, inreq.Data)
}
return tp.TxFetcher.Notify(string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query)
}
@ -44,7 +44,7 @@ func (tp *TxPoolServer) newPooledTransactionHashes(ctx context.Context, inreq *p
func (tp *TxPoolServer) pooledTransactions(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error {
var query eth.PooledTransactionsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding GetBlockHeader: %v, data: %x", err, inreq.Data)
return fmt.Errorf("decoding PooledTransactionsPacket66: %v, data: %x", err, inreq.Data)
}
return tp.TxFetcher.Enqueue(string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query.PooledTransactionsPacket, true)
@ -56,7 +56,7 @@ func (tp *TxPoolServer) transactions(ctx context.Context, inreq *proto_sentry.In
}
var query eth.TransactionsPacket
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding GetBlockHeader: %v, data: %x", err, inreq.Data)
return fmt.Errorf("decoding TransactionsPacket: %v, data: %x", err, inreq.Data)
}
return tp.TxFetcher.Enqueue(string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query, false)
}
@ -67,7 +67,7 @@ func (tp *TxPoolServer) getPooledTransactions(ctx context.Context, inreq *proto_
}
var query eth.GetPooledTransactionsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding GetBlockHeader: %v, data: %x", err, inreq.Data)
return fmt.Errorf("decoding GetPooledTransactionsPacket66: %v, data: %x", err, inreq.Data)
}
_, txs := eth.AnswerGetPooledTransactions(tp.txPool, query.GetPooledTransactionsPacket)
b, err := rlp.EncodeToBytes(&eth.PooledTransactionsRLPPacket66{
@ -75,7 +75,7 @@ func (tp *TxPoolServer) getPooledTransactions(ctx context.Context, inreq *proto_
PooledTransactionsRLPPacket: txs,
})
if err != nil {
return fmt.Errorf("encode header response: %v", err)
return fmt.Errorf("encode GetPooledTransactionsPacket66 response: %v", err)
}
// TODO: implement logic from perr.ReplyPooledTransactionsRLP - to remember tx ids
outreq := proto_sentry.SendMessageByIdRequest{
@ -108,7 +108,7 @@ func (tp *TxPoolServer) SendTxsRequest(ctx context.Context, peerID string, hashe
for i, ok, next := tp.randSentryIndex(); ok; i, ok = next() {
sentPeers, err1 := tp.sentries[i].SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
if err1 != nil {
log.Error("Could not send block bodies request", "err", err1)
log.Error("Could not send get pooled tx request", "err", err1)
continue
}
if sentPeers == nil || len(sentPeers.Peers) == 0 {
@ -152,14 +152,14 @@ func RecvTxMessage(ctx context.Context, sentry proto_sentry.SentryClient, handle
receiveClient, err2 := sentry.ReceiveTxMessages(streamCtx, &empty.Empty{}, &grpc.EmptyCallOption{})
if err2 != nil {
log.Error("Receive messages failed", "error", err2)
log.Error("ReceiveTx messages failed", "error", err2)
return
}
for req, err := receiveClient.Recv(); ; req, err = receiveClient.Recv() {
if err != nil {
if !errors.Is(err, io.EOF) {
log.Error("Receive loop terminated", "error", err)
log.Error("ReceiveTx loop terminated", "error", err)
return
}
return

View File

@ -271,7 +271,6 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu
config: config,
chainDB: chainDb,
chainKV: chainDb.(ethdb.HasRwKV).RwKV(),
engine: ethconfig.CreateConsensusEngine(chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify),
networkID: config.NetworkID,
etherbase: config.Miner.Etherbase,
p2pServer: stack.Server(),
@ -289,9 +288,7 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu
consensusConfig = &config.Ethash
}
if !eth.config.EnableDownloadV2 {
eth.engine = ethconfig.CreateConsensusEngine(chainConfig, consensusConfig, config.Miner.Notify, config.Miner.Noverify)
}
eth.engine = ethconfig.CreateConsensusEngine(chainConfig, consensusConfig, config.Miner.Notify, config.Miner.Noverify)
log.Info("Initialising Ethereum protocol", "network", config.NetworkID)
@ -432,6 +429,7 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu
}
eth.txPoolServer.TxFetcher = fetcher.NewTxFetcher(eth.txPool.Has, eth.txPool.AddRemotes, fetchTx)
eth.txPoolServer.TxFetcher.Start()
bodyDownloadTimeoutSeconds := 30 // TODO: convert to duration, make configurable
eth.stagedSync2, err = download.NewStagedSync(