[Diagnostics] initial support for body downloader visualisation (#7373)

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro-2.local>
This commit is contained in:
ledgerwatch 2023-04-23 18:56:37 +01:00 committed by GitHub
parent 6588bca40b
commit d70c9f0979
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 163 additions and 51 deletions

106
dataflow/states.go Normal file
View File

@ -0,0 +1,106 @@
package dataflow
import (
"fmt"
"io"
"sync"
"github.com/google/btree"
)
var BlockBodyDownloadStates *States = NewStates(64 * 1024)
const (
BlockBodyCleared byte = iota
BlockBodyExpired
BlockBodyRequested
BlockBodyReceived
BlockBodyEvicted
BlockBodySkipped // Delivery requested but was skipped due to limitation on the size of the response
BlockBodyEmpty // Identified as empty and no need to be requested
BlockBodyPrefetched
BlockBodyInDb
)
type SnapshotItem struct {
id uint64
state byte
}
type States struct {
lock sync.Mutex
window int
ids []uint64
states []byte
snapshot *btree.BTreeG[SnapshotItem]
snapshotTick int
idx int
}
func NewStates(window int) *States {
s := &States{
window: window,
ids: make([]uint64, window),
states: make([]byte, window),
snapshot: btree.NewG[SnapshotItem](16, func(a, b SnapshotItem) bool {
return a.id < b.id
}),
idx: 0,
}
return s
}
func (s *States) AddChange(id uint64, state byte) {
s.lock.Lock()
defer s.lock.Unlock()
if s.idx >= s.window {
s.makeSnapshot()
}
i := s.idx
s.idx++
s.ids[i] = id
s.states[i] = state
}
func (s *States) makeSnapshot() {
newSnapshot := map[uint64]byte{}
// snapshotTime is now time of the latest change
s.snapshotTick += s.idx
// Proceed backwards
for i := s.idx - 1; i >= 0; i-- {
if _, ok := newSnapshot[s.ids[i]]; !ok {
newSnapshot[s.ids[i]] = s.states[i]
}
}
for id, state := range newSnapshot {
if state == 0 {
s.snapshot.Delete(SnapshotItem{id: id})
} else {
s.snapshot.ReplaceOrInsert(SnapshotItem{id: id, state: state})
}
}
s.idx = 0
}
func (s *States) ChangesSince(startTick int, w io.Writer) {
s.lock.Lock()
defer s.lock.Unlock()
var startI int
var tick int
if startTick <= s.snapshotTick {
// Include snapshot
fmt.Fprintf(w, "snapshot %d\n", s.snapshotTick)
s.snapshot.Ascend(func(a SnapshotItem) bool {
fmt.Fprintf(w, "%d,%d\n", a.id, a.state)
return true
})
tick = s.snapshotTick + 1
} else {
startI = startTick - s.snapshotTick
tick = startTick
}
fmt.Fprintf(w, "changes %d\n", tick)
for i := startI; i < s.idx; i++ {
fmt.Fprintf(w, "%d,%d\n", s.ids[i], s.states[i])
}
}

View File

@ -0,0 +1,34 @@
package diagnostics
import (
"fmt"
"io"
"net/http"
"strconv"
"github.com/ledgerwatch/erigon/dataflow"
)
func SetupBlockBodyDownload() {
http.HandleFunc("/debug/metrics/block_body_download", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
writeBlockBodyDownload(w, r)
})
}
func writeBlockBodyDownload(w io.Writer, r *http.Request) {
if err := r.ParseForm(); err != nil {
fmt.Fprintf(w, "ERROR: parsing arguments: %v\n", err)
return
}
sinceTickStr := r.Form.Get("sincetick")
var tick int64
if sinceTickStr != "" {
var err error
if tick, err = strconv.ParseInt(sinceTickStr, 10, 64); err != nil {
fmt.Fprintf(w, "ERROR: parsing sincemilli: %v\n", err)
}
}
fmt.Fprintf(w, "SUCCESS\n")
dataflow.BlockBodyDownloadStates.ChangesSince(int(tick), w)
}

View File

@ -14,6 +14,7 @@ import (
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/dataflow"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/turbo/adapter"
"github.com/ledgerwatch/erigon/turbo/services"
@ -129,39 +130,14 @@ func BodiesForward(
prevProgress := bodyProgress
var noProgressCount uint = 0 // How many time the progress was printed without actual progress
var totalDelivered uint64 = 0
cr := ChainReader{Cfg: cfg.chanConfig, Db: tx}
loopBody := func() (bool, error) {
// always check if a new request is needed at the start of the loop
// this will check for timed out old requests and attempt to send them again
start := time.Now()
currentTime := uint64(time.Now().Unix())
req, err = cfg.bd.RequestMoreBodies(tx, cfg.blockReader, currentTime, cfg.blockPropagator)
if err != nil {
return false, fmt.Errorf("request more bodies: %w", err)
}
d1 += time.Since(start)
peer = [64]byte{}
sentToPeer = false
if req != nil {
start := time.Now()
peer, sentToPeer = cfg.bodyReqSend(ctx, req)
d2 += time.Since(start)
}
if req != nil && sentToPeer {
start := time.Now()
currentTime := uint64(time.Now().Unix())
cfg.bd.RequestSent(req, currentTime+uint64(timeout), peer)
d3 += time.Since(start)
}
// loopCount is used here to ensure we don't get caught in a constant loop of making requests
// having some time out so requesting again and cycling like that forever. We'll cap it
// and break the loop so we can see if there are any records to actually process further down
// then come back here again in the next cycle
loopCount := 0
for req != nil && sentToPeer {
for loopCount := 0; loopCount == 0 || (req != nil && sentToPeer && loopCount < requestLoopCutOff); loopCount++ {
start := time.Now()
currentTime := uint64(time.Now().Unix())
req, err = cfg.bd.RequestMoreBodies(tx, cfg.blockReader, currentTime, cfg.blockPropagator)
@ -181,14 +157,9 @@ func BodiesForward(
cfg.bd.RequestSent(req, currentTime+uint64(timeout), peer)
d3 += time.Since(start)
}
loopCount++
if loopCount >= requestLoopCutOff {
break
}
}
start = time.Now()
start := time.Now()
requestedLow, delivered, err := cfg.bd.GetDeliveries(tx)
if err != nil {
return false, err
@ -196,7 +167,6 @@ func BodiesForward(
totalDelivered += delivered
d4 += time.Since(start)
start = time.Now()
cr := ChainReader{Cfg: cfg.chanConfig, Db: tx}
toProcess := cfg.bd.NextProcessingCount()
@ -246,6 +216,9 @@ func BodiesForward(
return false, err
}
}
if ok {
dataflow.BlockBodyDownloadStates.AddChange(blockHeight, dataflow.BlockBodyCleared)
}
if blockHeight > bodyProgress {
bodyProgress = blockHeight
@ -296,14 +269,11 @@ func BodiesForward(
}
// kick off the loop and check for any reason to stop and break early
for !stopped {
shouldBreak, err := loopBody()
if err != nil {
var shouldBreak bool
for !stopped && !shouldBreak {
if shouldBreak, err = loopBody(); err != nil {
return err
}
if shouldBreak {
break
}
}
// remove the temporary bucket for bodies stage

View File

@ -9,7 +9,6 @@ import (
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/signal"
@ -69,16 +68,6 @@ func connectDiagnostics(cliCtx *cli.Context) error {
// Create a pool with the server certificate since it is not signed
// by a known CA
certPool := x509.NewCertPool()
srvCert, err := ioutil.ReadFile("diagnostics.crt")
if err != nil {
return fmt.Errorf("reading server certificate: %v", err)
}
caCert, err := ioutil.ReadFile("CA-cert.pem")
if err != nil {
return fmt.Errorf("reading server certificate: %v", err)
}
certPool.AppendCertsFromPEM(srvCert)
certPool.AppendCertsFromPEM(caCert)
// Create TLS configuration with the certificate of the server
insecure := cliCtx.Bool(insecureFlag.Name)

View File

@ -193,6 +193,7 @@ func Setup(ctx *cli.Context) error {
diagnostics.SetupDbAccess(ctx)
diagnostics.SetupCmdLineAccess()
diagnostics.SetupVersionAccess()
diagnostics.SetupBlockBodyDownload()
}
// pprof server

View File

@ -16,6 +16,7 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/dataflow"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/turbo/adapter"
"github.com/ledgerwatch/erigon/turbo/services"
@ -88,6 +89,7 @@ func (bd *BodyDownload) RequestMoreBodies(tx kv.RwTx, blockReader services.FullB
continue
}
bd.peerMap[req.peerID]++
dataflow.BlockBodyDownloadStates.AddChange(blockNum, dataflow.BlockBodyExpired)
delete(bd.requests, blockNum)
}
@ -147,12 +149,14 @@ func (bd *BodyDownload) RequestMoreBodies(tx kv.RwTx, blockReader services.FullB
body.Withdrawals = make([]*types.Withdrawal, 0)
}
bd.addBodyToCache(blockNum, body)
dataflow.BlockBodyDownloadStates.AddChange(blockNum, dataflow.BlockBodyEmpty)
request = false
} else {
// Perhaps we already have this block
block := rawdb.ReadBlock(tx, hash, blockNum)
if block != nil {
bd.addBodyToCache(blockNum, block.RawBody())
dataflow.BlockBodyDownloadStates.AddChange(blockNum, dataflow.BlockBodyInDb)
request = false
}
}
@ -192,6 +196,7 @@ func (bd *BodyDownload) checkPrefetchedBlock(hash libcommon.Hash, tx kv.RwTx, bl
bd.deliveriesH[blockNum] = header
// make sure we have the body in the bucket for later use
dataflow.BlockBodyDownloadStates.AddChange(blockNum, dataflow.BlockBodyPrefetched)
bd.addBodyToCache(blockNum, body)
// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
@ -215,6 +220,7 @@ func (bd *BodyDownload) RequestSent(bodyReq *BodyRequest, timeWithTimeout uint64
//}
for _, num := range bodyReq.BlockNums {
bd.requests[num] = bodyReq
dataflow.BlockBodyDownloadStates.AddChange(num, dataflow.BlockBodyRequested)
}
bodyReq.waitUntil = timeWithTimeout
bodyReq.peerID = peer
@ -317,11 +323,16 @@ Loop:
bd.addBodyToCache(blockNum, &types.RawBody{Transactions: txs[i], Uncles: uncles[i], Withdrawals: withdrawals[i]})
bd.delivered.Add(blockNum)
delivered++
dataflow.BlockBodyDownloadStates.AddChange(blockNum, dataflow.BlockBodyReceived)
}
// Clean up the requests
//var clearedNums []uint64
for blockNum := range toClean {
delete(bd.requests, blockNum)
if !bd.delivered.Contains(blockNum) {
// Delivery was requested but was skipped due to the limitation on the size of the response
dataflow.BlockBodyDownloadStates.AddChange(blockNum, dataflow.BlockBodySkipped)
}
//clearedNums = append(clearedNums, blockNum)
}
//sort.Slice(deliveredNums, func(i, j int) bool { return deliveredNums[i] < deliveredNums[j] })
@ -417,6 +428,7 @@ func (bd *BodyDownload) addBodyToCache(key uint64, body *types.RawBody) {
item, _ := bd.bodyCache.DeleteMax()
bd.bodyCacheSize -= item.payloadSize
delete(bd.requests, item.blockNum)
dataflow.BlockBodyDownloadStates.AddChange(item.blockNum, dataflow.BlockBodyEvicted)
}
}