[devnet] separare logging - headers download (#7551)

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro-2.local>
This commit is contained in:
ledgerwatch 2023-05-20 07:00:19 +01:00 committed by GitHub
parent c919283b0c
commit 2a872b4d54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 57 additions and 43 deletions

View File

@ -296,6 +296,7 @@ func NewMultiClient(
1024*1024, /* linkLimit */
engine,
blockReader,
logger,
)
if chainConfig.TerminalTotalDifficultyPassed {
hd.SetPOSSync(true)

View File

@ -88,15 +88,16 @@ func makeTestDb(ctx context.Context, db kv.RwDB) {
}
func TestMockDownloadRequest(t *testing.T) {
logger := log.New()
db := memdb.NewTestDB(t)
ctx := context.Background()
require := require.New(t)
makeTestDb(ctx, db)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil, logger)
hd.SetPOSSync(true)
events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, log.New())
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, logger)
var err error
var reply *remote.EnginePayloadStatus
@ -145,17 +146,18 @@ func TestMockDownloadRequest(t *testing.T) {
}
func TestMockValidExecution(t *testing.T) {
logger := log.New()
db := memdb.NewTestDB(t)
ctx := context.Background()
require := require.New(t)
makeTestDb(ctx, db)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil, logger)
hd.SetPOSSync(true)
events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, log.New())
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, logger)
var err error
var reply *remote.EnginePayloadStatus
@ -181,17 +183,18 @@ func TestMockValidExecution(t *testing.T) {
}
func TestMockInvalidExecution(t *testing.T) {
logger := log.New()
db := memdb.NewTestDB(t)
ctx := context.Background()
require := require.New(t)
makeTestDb(ctx, db)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil, logger)
hd.SetPOSSync(true)
events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, log.New())
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{TerminalTotalDifficulty: libcommon.Big1}, nil, hd, false, logger)
var err error
var reply *remote.EnginePayloadStatus
@ -217,16 +220,17 @@ func TestMockInvalidExecution(t *testing.T) {
}
func TestNoTTD(t *testing.T) {
logger := log.New()
db := memdb.NewTestDB(t)
ctx := context.Background()
require := require.New(t)
makeTestDb(ctx, db)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil, logger)
events := shards.NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{}, nil, hd, false, log.New())
backend := NewEthBackendServer(ctx, nil, db, events, nil, &chain.Config{}, nil, hd, false, logger)
var err error

View File

@ -11,6 +11,7 @@ import (
"github.com/ledgerwatch/erigon/p2p/discover/v5wire"
"github.com/ledgerwatch/erigon/p2p/enode"
"github.com/ledgerwatch/log/v3"
)
// This test checks that lookup works.
@ -19,7 +20,8 @@ func TestUDPv5_lookup(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
test := newUDPV5Test(t)
logger := log.New()
test := newUDPV5Test(t, logger)
t.Cleanup(test.close)
// Lookup on empty table returns no nodes.
@ -31,7 +33,7 @@ func TestUDPv5_lookup(t *testing.T) {
for d, nn := range lookupTestnet.dists {
for i, key := range nn {
n := lookupTestnet.node(d, i)
test.getNode(key, &net.UDPAddr{IP: n.IP(), Port: n.UDP()})
test.getNode(key, &net.UDPAddr{IP: n.IP(), Port: n.UDP()}, logger)
}
}
@ -53,7 +55,7 @@ func TestUDPv5_lookup(t *testing.T) {
recipient, key := lookupTestnet.nodeByAddr(to)
switch p := p.(type) {
case *v5wire.Ping:
test.packetInFrom(key, to, &v5wire.Pong{ReqID: p.ReqID})
test.packetInFrom(key, to, &v5wire.Pong{ReqID: p.ReqID}, logger)
case *v5wire.Findnode:
if asked[recipient.ID()] {
t.Error("Asked node", recipient.ID(), "twice")
@ -62,7 +64,7 @@ func TestUDPv5_lookup(t *testing.T) {
nodes := lookupTestnet.neighborsAtDistances(recipient, p.Distances, 16)
t.Logf("Got FINDNODE for %v, returning %d nodes", p.Distances, len(nodes))
for _, resp := range packNodes(p.ReqID, nodes) {
test.packetInFrom(key, to, resp)
test.packetInFrom(key, to, resp, logger)
}
}
})
@ -79,8 +81,9 @@ func TestUDPv5_lookupE2E(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
logger := log.New()
bootNode := startLocalhostV5(t, Config{})
bootNode := startLocalhostV5(t, Config{}, logger)
bootNodeRec := bootNode.Self()
const N = 5
@ -89,7 +92,7 @@ func TestUDPv5_lookupE2E(t *testing.T) {
cfg := Config{
Bootnodes: []*enode.Node{bootNodeRec},
}
node := startLocalhostV5(t, cfg)
node := startLocalhostV5(t, cfg, logger)
nodes = append(nodes, node)
}

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/ledgerwatch/erigon/p2p/discover/v5wire"
"github.com/ledgerwatch/log/v3"
)
// This test checks that calls with n replies may take up to n * respTimeout.
@ -18,6 +19,7 @@ func TestUDPv5_callTimeoutReset(t *testing.T) {
t.Skip("fix me on win please")
}
t.Parallel()
logger := log.New()
replyTimeout := respTimeoutV5
// This must be significantly lower than replyTimeout to not get "RPC timeout" error.
@ -32,13 +34,13 @@ func TestUDPv5_callTimeoutReset(t *testing.T) {
ctx := context.Background()
ctx = contextWithReplyTimeout(ctx, replyTimeout)
test := newUDPV5TestContext(ctx, t)
test := newUDPV5TestContext(ctx, t, logger)
t.Cleanup(test.close)
// Launch the request:
var (
distance = uint(230)
remote = test.getNode(test.remotekey, test.remoteaddr).Node()
remote = test.getNode(test.remotekey, test.remoteaddr, logger).Node()
nodes = nodesAtDistance(remote.ID(), int(distance), totalNodesResponseLimit)
done = make(chan error, 1)
)
@ -55,7 +57,7 @@ func TestUDPv5_callTimeoutReset(t *testing.T) {
ReqID: p.ReqID,
Total: totalNodesResponseLimit,
Nodes: nodesToRecords(nodes[i : i+1]),
})
}, logger)
}
})
if err := <-done; err != nil {

View File

@ -102,7 +102,7 @@ func (hd *HeaderDownload) SingleHeaderAsSegment(headerRaw []byte, header *types.
headerHash := types.RawRlpHash(headerRaw)
if _, bad := hd.badHeaders[headerHash]; bad {
log.Warn("[downloader] Rejected header marked as bad", "hash", headerHash, "height", header.Number.Uint64())
hd.logger.Warn("[downloader] Rejected header marked as bad", "hash", headerHash, "height", header.Number.Uint64())
return nil, BadBlockPenalty, nil
}
if penalizePoSBlocks && header.Difficulty.Sign() == 0 {
@ -300,7 +300,7 @@ func (hd *HeaderDownload) logAnchorState() {
ss = append(ss, sb.String())
}
sort.Strings(ss)
log.Debug("[downloader] Queue sizes", "anchors", hd.anchorTree.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len())
hd.logger.Debug("[downloader] Queue sizes", "anchors", hd.anchorTree.Len(), "links", hd.linkQueue.Len(), "persisted", hd.persistedLinkQueue.Len())
for _, s := range ss {
log.Debug(s)
}
@ -350,7 +350,7 @@ func (hd *HeaderDownload) RecoverFromDb(db kv.RoDB) error {
select {
case <-logEvery.C:
log.Info("[downloader] recover headers from db", "left", hd.persistedLinkLimit-hd.persistedLinkQueue.Len())
hd.logger.Info("[downloader] recover headers from db", "left", hd.persistedLinkLimit-hd.persistedLinkQueue.Len())
default:
}
}
@ -377,7 +377,7 @@ func (hd *HeaderDownload) ReadProgressFromDb(tx kv.RwTx) (err error) {
}
func (hd *HeaderDownload) invalidateAnchor(anchor *Anchor, reason string) {
log.Debug("[downloader] Invalidating anchor", "height", anchor.blockHeight, "hash", anchor.parentHash, "reason", reason)
hd.logger.Debug("[downloader] Invalidating anchor", "height", anchor.blockHeight, "hash", anchor.parentHash, "reason", reason)
hd.removeAnchor(anchor)
for child := anchor.fLink; child != nil; child, child.next = child.next, nil {
hd.removeUpwards(child)
@ -415,7 +415,7 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime time.Time) (*HeaderRequ
func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime time.Time) (timeout bool, request *HeaderRequest, penalties []PenaltyItem) {
anchor := hd.posAnchor
if anchor == nil {
log.Debug("[downloader] No PoS anchor")
hd.logger.Debug("[downloader] No PoS anchor")
return
}
@ -427,7 +427,7 @@ func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime time.Time) (timeo
// TODO: [pos-downloader-tweaks] - we could reduce this number, or config it
timeout = anchor.timeouts >= 3
if timeout {
log.Warn("[downloader] Timeout", "requestId", hd.requestId, "peerID", common.Bytes2Hex(anchor.peerID[:]))
hd.logger.Warn("[downloader] Timeout", "requestId", hd.requestId, "peerID", common.Bytes2Hex(anchor.peerID[:]))
penalties = []PenaltyItem{{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID}}
return
}
@ -480,7 +480,7 @@ func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime time.T
func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest {
hd.lock.RLock()
defer hd.lock.RUnlock()
log.Debug("[downloader] Request skeleton", "anchors", len(hd.anchors), "highestInDb", hd.highestInDb)
hd.logger.Debug("[downloader] Request skeleton", "anchors", len(hd.anchors), "highestInDb", hd.highestInDb)
var stride uint64
if hd.initialCycle {
stride = 192
@ -519,7 +519,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
hd.moveLinkToQueue(link, NoQueue)
delete(hd.links, link.hash)
hd.removeUpwards(link)
log.Warn("[downloader] Rejected header marked as bad", "hash", link.hash, "height", link.blockHeight)
hd.logger.Warn("[downloader] Rejected header marked as bad", "hash", link.hash, "height", link.blockHeight)
return true, false, 0, lastTime, nil
}
if !link.verified {
@ -527,10 +527,10 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
hd.badPoSHeaders[link.hash] = link.header.ParentHash
if errors.Is(err, consensus.ErrFutureBlock) {
// This may become valid later
log.Warn("[downloader] Added future link", "hash", link.hash, "height", link.blockHeight, "timestamp", link.header.Time)
hd.logger.Warn("[downloader] Added future link", "hash", link.hash, "height", link.blockHeight, "timestamp", link.header.Time)
return false, false, 0, lastTime, nil // prevent removal of the link from the hd.linkQueue
} else {
log.Debug("[downloader] Verification failed for header", "hash", link.hash, "height", link.blockHeight, "err", err)
hd.logger.Debug("[downloader] Verification failed for header", "hash", link.hash, "height", link.blockHeight, "err", err)
hd.moveLinkToQueue(link, NoQueue)
delete(hd.links, link.hash)
hd.removeUpwards(link)
@ -542,7 +542,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
// Make sure long insertions do not appear as a stuck stage 1
select {
case <-logChannel:
log.Info(fmt.Sprintf("[%s] Inserting headers", logPrefix), "progress", hd.highestInDb, "queue", hd.insertQueue.Len())
hd.logger.Info(fmt.Sprintf("[%s] Inserting headers", logPrefix), "progress", hd.highestInDb, "queue", hd.insertQueue.Len())
default:
}
td, err := hf(link.header, link.headerRaw, link.hash, link.blockHeight)
@ -569,7 +569,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
if link.blockHeight > hd.highestInDb {
if hd.trace {
log.Info("[downloader] Highest in DB change", "number", link.blockHeight, "hash", link.hash)
hd.logger.Info("[downloader] Highest in DB change", "number", link.blockHeight, "hash", link.hash)
}
hd.highestInDb = link.blockHeight
}
@ -623,7 +623,7 @@ func (hd *HeaderDownload) InsertHeaders(hf FeedHeaderFunc, terminalTotalDifficul
}
}
if blocksToTTD > 0 {
log.Info("Estimated to reaching TTD", "blocks", blocksToTTD)
hd.logger.Info("Estimated to reaching TTD", "blocks", blocksToTTD)
}
hd.lock.RLock()
defer hd.lock.RUnlock()
@ -636,7 +636,7 @@ func (hd *HeaderDownload) SetHeaderToDownloadPoS(hash libcommon.Hash, height uin
hd.lock.Lock()
defer hd.lock.Unlock()
log.Debug("[downloader] Set posAnchor", "blockHeight", height+1)
hd.logger.Debug("[downloader] Set posAnchor", "blockHeight", height+1)
hd.posAnchor = &Anchor{
parentHash: hash,
blockHeight: height + 1,
@ -647,12 +647,12 @@ func (hd *HeaderDownload) ProcessHeadersPOS(csHeaders []ChainSegmentHeader, tx k
if len(csHeaders) == 0 {
return nil, nil
}
log.Debug("[downloader] Collecting...", "from", csHeaders[0].Number, "to", csHeaders[len(csHeaders)-1].Number, "len", len(csHeaders))
hd.logger.Debug("[downloader] Collecting...", "from", csHeaders[0].Number, "to", csHeaders[len(csHeaders)-1].Number, "len", len(csHeaders))
hd.lock.Lock()
defer hd.lock.Unlock()
if hd.posAnchor == nil {
// May happen if peers are sending unrequested header packets after we've synced
log.Debug("[downloader] posAnchor is nil")
hd.logger.Debug("[downloader] posAnchor is nil")
return nil, nil
}
@ -670,11 +670,11 @@ func (hd *HeaderDownload) ProcessHeadersPOS(csHeaders []ChainSegmentHeader, tx k
// With this code commented out, the sync proceeds but very slowly (getting 1 header from the response of 192 headers)
/*
if hd.posAnchor.blockHeight != 1 && sh.Number != hd.posAnchor.blockHeight-1 {
log.Debug("[downloader] posAnchor", "blockHeight", hd.posAnchor.blockHeight)
hd.logger.Debug("[downloader] posAnchor", "blockHeight", hd.posAnchor.blockHeight)
//return nil, nil
}
*/
log.Debug("[downloader] Unexpected header", "hash", headerHash, "expected", hd.posAnchor.parentHash, "peerID", common.Bytes2Hex(peerId[:]))
hd.logger.Debug("[downloader] Unexpected header", "hash", headerHash, "expected", hd.posAnchor.parentHash, "peerID", common.Bytes2Hex(peerId[:]))
// Not penalise because we might have sent request twice
continue
}
@ -689,7 +689,7 @@ func (hd *HeaderDownload) ProcessHeadersPOS(csHeaders []ChainSegmentHeader, tx k
return nil, err
}
if hh != nil {
log.Debug("[downloader] Synced", "requestId", hd.requestId)
hd.logger.Debug("[downloader] Synced", "requestId", hd.requestId)
if headerNumber != hh.Number.Uint64()+1 {
hd.badPoSHeaders[headerHash] = header.ParentHash
return nil, fmt.Errorf("invalid PoS segment detected: invalid block number. got %d, expected %d", headerNumber, hh.Number.Uint64()+1)
@ -967,11 +967,11 @@ func (hd *HeaderDownload) ProcessHeader(sh ChainSegmentHeader, newBlock bool, pe
anchor, foundAnchor := hd.anchors[sh.Hash]
if !foundParent && !foundAnchor {
if sh.Number < hd.highestInDb {
log.Debug(fmt.Sprintf("[downloader] new anchor too far in the past: %d, latest header in db: %d", sh.Number, hd.highestInDb))
hd.logger.Debug(fmt.Sprintf("[downloader] new anchor too far in the past: %d, latest header in db: %d", sh.Number, hd.highestInDb))
return false
}
if len(hd.anchors) >= hd.anchorLimit {
log.Debug(fmt.Sprintf("[downloader] too many anchors: %d, limit %d", len(hd.anchors), hd.anchorLimit))
hd.logger.Debug(fmt.Sprintf("[downloader] too many anchors: %d, limit %d", len(hd.anchors), hd.anchorLimit))
return false
}
}
@ -999,7 +999,7 @@ func (hd *HeaderDownload) ProcessHeader(sh ChainSegmentHeader, newBlock bool, pe
} else {
// The link has not known parent, therefore it becomes an anchor, unless it is too far in the past
if sh.Number+params.FullImmutabilityThreshold < hd.highestInDb {
log.Debug("[downloader] Remove upwards", "height", link.blockHeight, "hash", link.blockHeight)
hd.logger.Debug("[downloader] Remove upwards", "height", link.blockHeight, "hash", link.blockHeight)
hd.removeUpwards(link)
return false
}
@ -1028,9 +1028,9 @@ func (hd *HeaderDownload) ProcessHeaders(csHeaders []ChainSegmentHeader, newBloc
hd.lock.Lock()
defer hd.lock.Unlock()
hd.stats.Responses++
log.Trace("[downloader] Link queue", "size", hd.linkQueue.Len())
hd.logger.Trace("[downloader] Link queue", "size", hd.linkQueue.Len())
if hd.linkQueue.Len() > hd.linkLimit {
log.Trace("[downloader] Too many links, cutting down", "count", hd.linkQueue.Len(), "tried to add", len(csHeaders), "limit", hd.linkLimit)
hd.logger.Trace("[downloader] Too many links, cutting down", "count", hd.linkQueue.Len(), "tried to add", len(csHeaders), "limit", hd.linkLimit)
hd.pruneLinkQueue()
}
// Wake up stage loop if it is outside any of the stages
@ -1289,7 +1289,7 @@ func (hd *HeaderDownload) StartPoSDownloader(
if sentToPeer {
// If request was actually sent to a peer, we update retry time to be 5 seconds in the future
hd.UpdateRetryTime(req, currentTime, 30*time.Second /* timeout */)
log.Debug("[downloader] Sent request", "height", req.Number)
hd.logger.Debug("[downloader] Sent request", "height", req.Number)
}
}
if len(penalties) > 0 {
@ -1312,7 +1312,7 @@ func (hd *HeaderDownload) StartPoSDownloader(
prevProgress = progress
} else if progress <= prevProgress {
diff := prevProgress - progress
log.Info("[downloader] Downloaded PoS Headers", "now", progress,
hd.logger.Info("[downloader] Downloaded PoS Headers", "now", progress,
"blk/sec", float64(diff)/float64(logInterval/time.Second))
prevProgress = progress
}

View File

@ -17,6 +17,7 @@ import (
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
)
type QueueID uint8
@ -270,6 +271,7 @@ type HeaderDownload struct {
unsettledHeadHeight uint64 // Height of unsettledForkChoice.headBlockHash
posDownloaderTip common.Hash // See https://hackmd.io/GDc0maGsQeKfP8o2C7L52w
badPoSHeaders map[common.Hash]common.Hash // Invalid Tip -> Last Valid Ancestor
logger log.Logger
}
// HeaderRecord encapsulates two forms of the same header - raw RLP encoding (to avoid duplicated decodings and encodings), and parsed value types.Header
@ -283,6 +285,7 @@ func NewHeaderDownload(
linkLimit int,
engine consensus.Engine,
headerReader services.HeaderAndCanonicalReader,
logger log.Logger,
) *HeaderDownload {
persistentLinkLimit := linkLimit / 16
hd := &HeaderDownload{
@ -303,6 +306,7 @@ func NewHeaderDownload(
ShutdownCh: make(chan struct{}),
headerReader: headerReader,
badPoSHeaders: make(map[common.Hash]common.Hash),
logger: logger,
}
heap.Init(&hd.persistedLinkQueue)
heap.Init(&hd.linkQueue)