erigon-pulse/consensus/bor/heimdall/client.go
ledgerwatch 52aef48748
Workaround for BSC nodes not propagating new block hashes and blocks (#6777)
It turns out that "standard" BSC nodes based on Geth, do not propagate
new block hashes and blocks, at least towards Erigon nodes. This is a
workaround

---------

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
2023-02-05 18:42:51 +00:00

331 lines
7.6 KiB
Go

package heimdall
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"sort"
"time"
"github.com/ledgerwatch/erigon/consensus/bor/clerk"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall/checkpoint"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall/span"
"github.com/ledgerwatch/log/v3"
)
var (
// // ErrShutdownDetected is returned if a shutdown was detected
ErrShutdownDetected = errors.New("shutdown detected")
ErrNoResponse = errors.New("got a nil response")
ErrNotSuccessfulResponse = errors.New("error while fetching data from Heimdall")
)
const (
stateFetchLimit = 50
apiHeimdallTimeout = 5 * time.Second
retryCall = 5 * time.Second
)
type StateSyncEventsResponse struct {
Height string `json:"height"`
Result []*clerk.EventRecordWithTime `json:"result"`
}
type SpanResponse struct {
Height string `json:"height"`
Result span.HeimdallSpan `json:"result"`
}
type HeimdallClient struct {
urlString string
client http.Client
closeCh chan struct{}
}
type Request struct {
client http.Client
url *url.URL
start time.Time
}
func NewHeimdallClient(urlString string) *HeimdallClient {
return &HeimdallClient{
urlString: urlString,
client: http.Client{
Timeout: apiHeimdallTimeout,
},
closeCh: make(chan struct{}),
}
}
const (
fetchStateSyncEventsFormat = "from-id=%d&to-time=%d&limit=%d"
fetchStateSyncEventsPath = "clerk/event-record/list"
fetchCheckpoint = "/checkpoints/%s"
fetchCheckpointCount = "/checkpoints/count"
fetchSpanFormat = "bor/span/%d"
)
func (h *HeimdallClient) StateSyncEvents(ctx context.Context, fromID uint64, to int64) ([]*clerk.EventRecordWithTime, error) {
eventRecords := make([]*clerk.EventRecordWithTime, 0)
for {
url, err := stateSyncURL(h.urlString, fromID, to)
if err != nil {
return nil, err
}
log.Debug("Fetching state sync events", "queryParams", url.RawQuery)
ctx = withRequestType(ctx, stateSyncRequest)
response, err := FetchWithRetry[StateSyncEventsResponse](ctx, h.client, url, h.closeCh)
if err != nil {
return nil, err
}
if response == nil || response.Result == nil {
// status 204
break
}
eventRecords = append(eventRecords, response.Result...)
if len(response.Result) < stateFetchLimit {
break
}
fromID += uint64(stateFetchLimit)
}
sort.SliceStable(eventRecords, func(i, j int) bool {
return eventRecords[i].ID < eventRecords[j].ID
})
return eventRecords, nil
}
func (h *HeimdallClient) Span(ctx context.Context, spanID uint64) (*span.HeimdallSpan, error) {
url, err := spanURL(h.urlString, spanID)
if err != nil {
return nil, err
}
ctx = withRequestType(ctx, spanRequest)
response, err := FetchWithRetry[SpanResponse](ctx, h.client, url, h.closeCh)
if err != nil {
return nil, err
}
return &response.Result, nil
}
// FetchCheckpoint fetches the checkpoint from heimdall
func (h *HeimdallClient) FetchCheckpoint(ctx context.Context, number int64) (*checkpoint.Checkpoint, error) {
url, err := checkpointURL(h.urlString, number)
if err != nil {
return nil, err
}
ctx = withRequestType(ctx, checkpointRequest)
response, err := FetchWithRetry[checkpoint.CheckpointResponse](ctx, h.client, url, h.closeCh)
if err != nil {
return nil, err
}
return &response.Result, nil
}
// FetchCheckpointCount fetches the checkpoint count from heimdall
func (h *HeimdallClient) FetchCheckpointCount(ctx context.Context) (int64, error) {
url, err := checkpointCountURL(h.urlString)
if err != nil {
return 0, err
}
ctx = withRequestType(ctx, checkpointCountRequest)
response, err := FetchWithRetry[checkpoint.CheckpointCountResponse](ctx, h.client, url, h.closeCh)
if err != nil {
return 0, err
}
return response.Result.Result, nil
}
// FetchWithRetry returns data from heimdall with retry
func FetchWithRetry[T any](ctx context.Context, client http.Client, url *url.URL, closeCh chan struct{}) (*T, error) {
// request data once
request := &Request{client: client, url: url, start: time.Now()}
result, err := Fetch[T](ctx, request)
if err == nil {
return result, nil
}
// attempt counter
attempt := 1
log.Warn("an error while trying fetching from Heimdall", "attempt", attempt, "error", err)
// create a new ticker for retrying the request
ticker := time.NewTicker(retryCall)
defer ticker.Stop()
const logEach = 5
retryLoop:
for {
log.Info("Retrying again in 5 seconds to fetch data from Heimdall", "path", url.Path, "attempt", attempt)
attempt++
select {
case <-ctx.Done():
log.Debug("Shutdown detected, terminating request by context.Done")
return nil, ctx.Err()
case <-closeCh:
log.Debug("Shutdown detected, terminating request by closing")
return nil, ErrShutdownDetected
case <-ticker.C:
request = &Request{client: client, url: url, start: time.Now()}
result, err = Fetch[T](ctx, request)
if err != nil {
if attempt%logEach == 0 {
log.Warn("an error while trying fetching from Heimdall", "attempt", attempt, "error", err)
}
continue retryLoop
}
return result, nil
}
}
}
// TODO: Uncomment once metrics are added
// Fetch fetches response from heimdall
func Fetch[T any](ctx context.Context, request *Request) (*T, error) {
// isSuccessful := false
// defer func() {
// if metrics.EnabledExpensive {
// sendMetrics(ctx, request.start, isSuccessful)
// }
// }()
result := new(T)
body, err := internalFetchWithTimeout(ctx, request.client, request.url)
if err != nil {
return nil, err
}
if body == nil {
return nil, ErrNoResponse
}
err = json.Unmarshal(body, result)
if err != nil {
return nil, err
}
// isSuccessful = true
return result, nil
}
func spanURL(urlString string, spanID uint64) (*url.URL, error) {
return makeURL(urlString, fmt.Sprintf(fetchSpanFormat, spanID), "")
}
func stateSyncURL(urlString string, fromID uint64, to int64) (*url.URL, error) {
queryParams := fmt.Sprintf(fetchStateSyncEventsFormat, fromID, to, stateFetchLimit)
return makeURL(urlString, fetchStateSyncEventsPath, queryParams)
}
func checkpointURL(urlString string, number int64) (*url.URL, error) {
url := ""
if number == -1 {
url = fmt.Sprintf(fetchCheckpoint, "latest")
} else {
url = fmt.Sprintf(fetchCheckpoint, fmt.Sprint(number))
}
return makeURL(urlString, url, "")
}
func checkpointCountURL(urlString string) (*url.URL, error) {
return makeURL(urlString, fetchCheckpointCount, "")
}
func makeURL(urlString, rawPath, rawQuery string) (*url.URL, error) {
u, err := url.Parse(urlString)
if err != nil {
return nil, err
}
u.Path = rawPath
u.RawQuery = rawQuery
return u, err
}
// internal fetch method
func internalFetch(ctx context.Context, client http.Client, u *url.URL) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
res, err := client.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
// check status code
if res.StatusCode != 200 && res.StatusCode != 204 {
return nil, fmt.Errorf("%w: response code %d", ErrNotSuccessfulResponse, res.StatusCode)
}
// unmarshall data from buffer
if res.StatusCode == 204 {
return nil, nil
}
// get response
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
return body, nil
}
func internalFetchWithTimeout(ctx context.Context, client http.Client, url *url.URL) ([]byte, error) {
ctx, cancel := context.WithTimeout(ctx, apiHeimdallTimeout)
defer cancel()
// request data once
return internalFetch(ctx, client, url)
}
// Close sends a signal to stop the running process
func (h *HeimdallClient) Close() {
close(h.closeCh)
h.client.CloseIdleConnections()
}