erigon-pulse/cmd/headers/download/downloader.go
ledgerwatch 14288b8cd0
Headers poc 5 - Intermediate (#1145)
* Add headers persistence

* Print flushBuffer

* Fix indexing problem

* Not skip hard-coded headers if the files are empty

* Fix lint

* print anchor state after init

* Properly construct file names

* Add check sub-command

* Fix lint

* Fix lint

* Fix lint

* Print more info when recovering

* Fix recovering

* Fix recovery

* Add anchors also as tips

* 2-level priority queue for tips

* Initialise tipQueue in anchor

* update maxTipHeight

* fix type

* Add anchors to the anchorQueue and rebuild anchorQueue on deletion

* Fix NPE

* fix recovery

* User buffersize, add hard coded headers to buffer

* Reinit anchorQueue

* Schedule requests after recovery

* No fix

* Remove duplicates

* Report duplicate headers

* Log buffer additions

* Fix for duplicate headers

* Try to fix duplicate headers again

* remove TODO comment

* Use LLRB instead of heap for anchors

* Print reserveTip info

* Correctly replace anchors in the tree

* Remove excessive logging

* Print tips attached to the anchor

* Print tips better

* limitTips instead of reserveTip

* Print forked headers

* Use pointers in llrb

* Print tipStretch

* Print limitTips

* Mininise AnchorItem

* Put anchors into the tree

* Fix totalDiff calculation, but is it needed?

* Remove totalDifficulty from anchors

* CheckInitiation

* Fix tests

* Fix lint, print more at check initiation

* Better output for hard-coded anchors

* Print hard-coded anchors better

* Prioritise anchors with short stretches

* Prioritise by chainSize

* Use linked list instead of heap for requestQueue

* Fix problem of no requests

* Push front

* Fix lint

* Not verify PoW for too far in the past and future

* Fix Calculation of totalDifficulty when Connect

* Fix hard tips

* Another fix for tips
2020-09-27 21:32:05 +01:00

221 lines
7.5 KiB
Go

package download
import (
"context"
"errors"
"fmt"
"io"
"math/big"
"os"
"time"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/consensus/ethash"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/params"
"github.com/ledgerwatch/turbo-geth/turbo/stages/headerdownload"
)
type chainReader struct {
config *params.ChainConfig
}
func (cr chainReader) Config() *params.ChainConfig { return cr.config }
func (cr chainReader) CurrentHeader() *types.Header { panic("") }
func (cr chainReader) GetHeader(hash common.Hash, number uint64) *types.Header { panic("") }
func (cr chainReader) GetHeaderByNumber(number uint64) *types.Header { panic("") }
func (cr chainReader) GetHeaderByHash(hash common.Hash) *types.Header { panic("") }
func processSegment(hd *headerdownload.HeaderDownload, segment *headerdownload.ChainSegment) {
log.Info(hd.AnchorState())
log.Info("processSegment", "from", segment.Headers[0].Number.Uint64(), "to", segment.Headers[len(segment.Headers)-1].Number.Uint64())
foundAnchor, start, anchorParent, invalidAnchors := hd.FindAnchors(segment)
if len(invalidAnchors) > 0 {
if _, err1 := hd.InvalidateAnchors(anchorParent, invalidAnchors); err1 != nil {
log.Error("Invalidation of anchor failed", "error", err1)
}
log.Warn(fmt.Sprintf("Invalidated anchors %v for %x", invalidAnchors, anchorParent))
}
foundTip, end, penalty := hd.FindTip(segment, start) // We ignore penalty because we will check it as part of PoW check
if penalty != headerdownload.NoPenalty {
log.Error(fmt.Sprintf("FindTip penalty %d", penalty))
return
}
currentTime := uint64(time.Now().Unix())
var powDepth int
if powDepth1, err1 := hd.VerifySeals(segment, foundAnchor, foundTip, start, end, currentTime); err1 == nil {
powDepth = powDepth1
} else {
log.Error("VerifySeals", "error", err1)
return
}
if err1 := hd.FlushBuffer(); err1 != nil {
log.Error("Could not flush the buffer, will discard the data", "error", err1)
return
}
// There are 4 cases
if foundAnchor {
if foundTip {
// Connect
if err1 := hd.Connect(segment, start, end, currentTime); err1 != nil {
log.Error("Connect failed", "error", err1)
} else {
hd.AddSegmentToBuffer(segment, start, end)
log.Info("Connected", "start", start, "end", end)
}
} else {
// ExtendDown
if err1 := hd.ExtendDown(segment, start, end, powDepth, currentTime); err1 != nil {
log.Error("ExtendDown failed", "error", err1)
} else {
hd.AddSegmentToBuffer(segment, start, end)
log.Info("Extended Down", "start", start, "end", end)
}
}
} else if foundTip {
if end == 0 {
log.Info("No action needed, tip already exists")
} else {
// ExtendUp
if err1 := hd.ExtendUp(segment, start, end, currentTime); err1 != nil {
log.Error("ExtendUp failed", "error", err1)
} else {
hd.AddSegmentToBuffer(segment, start, end)
log.Info("Extended Up", "start", start, "end", end)
}
}
} else {
// NewAnchor
if err1 := hd.NewAnchor(segment, start, end, currentTime); err1 != nil {
log.Error("NewAnchor failed", "error", err1)
} else {
hd.AddSegmentToBuffer(segment, start, end)
log.Info("NewAnchor", "start", start, "end", end)
}
}
if start == 0 || end > 0 {
hd.CheckInitiation(segment, params.MainnetGenesisHash)
}
}
// Downloader needs to be run from a go-routine, and it is in the sole control of the HeaderDownloader object
func Downloader(
ctx context.Context,
filesDir string,
bufferLimit int,
newBlockCh chan NewBlockFromSentry,
newBlockHashCh chan NewBlockHashFromSentry,
headersCh chan BlockHeadersFromSentry,
penaltyCh chan PenaltyMsg,
reqHeadersCh chan headerdownload.HeaderRequest,
) {
//config := eth.DefaultConfig.Ethash
engine := ethash.New(ethash.Config{
CachesInMem: 1,
CachesLockMmap: false,
DatasetDir: "ethash",
DatasetsInMem: 1,
DatasetsOnDisk: 0,
DatasetsLockMmap: false,
}, nil, false)
cr := chainReader{config: params.MainnetChainConfig}
calcDiffFunc := func(childTimestamp uint64, parentTime uint64, parentDifficulty, parentNumber *big.Int, parentHash, parentUncleHash common.Hash) *big.Int {
return engine.CalcDifficulty(cr, childTimestamp, parentTime, parentDifficulty, parentNumber, parentHash, parentUncleHash)
}
verifySealFunc := func(header *types.Header) error {
return engine.VerifySeal(cr, header)
}
hd := headerdownload.NewHeaderDownload(
filesDir,
bufferLimit, /* bufferLimit */
16*1024, /* tipLimit */
1024, /* initPowDepth */
calcDiffFunc,
verifySealFunc,
3600, /* newAnchor future limit */
3600, /* newAnchor past limit */
)
hd.InitHardCodedTips("hard-coded-headers.dat")
if recovered, err := hd.RecoverFromFiles(uint64(time.Now().Unix())); err != nil || !recovered {
if err != nil {
log.Error("Recovery from file failed, will start from scratch", "error", err)
}
// Insert hard-coded headers if present
if _, err := os.Stat("hard-coded-headers.dat"); err == nil {
if f, err1 := os.Open("hard-coded-headers.dat"); err1 == nil {
var hBuffer [headerdownload.HeaderSerLength]byte
i := 0
for {
var h types.Header
if _, err2 := io.ReadFull(f, hBuffer[:]); err2 == nil {
headerdownload.DeserialiseHeader(&h, hBuffer[:])
} else if errors.Is(err2, io.EOF) {
break
} else {
log.Error("Failed to read hard coded header", "i", i, "error", err2)
break
}
if err2 := hd.HardCodedHeader(&h, uint64(time.Now().Unix())); err2 != nil {
log.Error("Failed to insert hard coded header", "i", i, "block", h.Number.Uint64(), "error", err2)
} else {
hd.AddHeaderToBuffer(&h)
}
i++
}
}
}
}
log.Info(hd.AnchorState())
for {
select {
case newBlockReq := <-newBlockCh:
if segments, penalty, err := hd.SingleHeaderAsSegment(newBlockReq.Block.Header()); err == nil {
if penalty == headerdownload.NoPenalty {
processSegment(hd, segments[0]) // There is only one segment in this case
} else {
// Send penalty back to the sentry
penaltyCh <- PenaltyMsg{SentryMsg: newBlockReq.SentryMsg, penalty: penalty}
}
} else {
log.Error("SingleHeaderAsSegment failed", "error", err)
continue
}
log.Info(fmt.Sprintf("NewBlockMsg{blockNumber: %d}", newBlockReq.Block.NumberU64()))
case newBlockHashReq := <-newBlockHashCh:
for _, announce := range newBlockHashReq.NewBlockHashesData {
if !hd.HasTip(announce.Hash) {
log.Info(fmt.Sprintf("Sending header request {hash: %x, height: %d, length: %d}", announce.Hash, announce.Number, 1))
reqHeadersCh <- headerdownload.HeaderRequest{
Hash: announce.Hash,
Number: announce.Number,
Length: 1,
}
}
}
case headersReq := <-headersCh:
if segments, penalty, err := hd.SplitIntoSegments(headersReq.headers); err == nil {
if penalty == headerdownload.NoPenalty {
for _, segment := range segments {
processSegment(hd, segment)
}
} else {
penaltyCh <- PenaltyMsg{SentryMsg: headersReq.SentryMsg, penalty: penalty}
}
} else {
log.Error("SingleHeaderAsSegment failed", "error", err)
}
log.Info("HeadersMsg processed")
case <-hd.RequestQueueTimer.C:
fmt.Printf("RequestQueueTimer ticked\n")
case <-ctx.Done():
return
}
reqs := hd.RequestMoreHeaders(uint64(time.Now().Unix()), 5 /*timeout */)
for _, req := range reqs {
//log.Info(fmt.Sprintf("Sending header request {hash: %x, height: %d, length: %d}", req.Hash, req.Number, req.Length))
reqHeadersCh <- *req
}
}
}