Automation tool to automatically upload caplin's snapshot files to R2 (#8747)

Upload beacon snapshots to R2 every week by default
This commit is contained in:
Giulio rebuffo 2023-11-16 20:59:43 +01:00 committed by GitHub
parent a6b5297b3e
commit 274f84598c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 271 additions and 56 deletions

View File

@ -121,6 +121,8 @@ COMMANDS += evm
COMMANDS += sentinel
COMMANDS += caplin
COMMANDS += caplin-regression
COMMANDS += tooling
# build each command using %.cmd rule

View File

@ -173,11 +173,11 @@ func (a *Antiquary) Loop() error {
if from >= to {
continue
}
to = utils.Min64(to, to-safetyMargin) // We don't want to retire snapshots that are too close to the finalized head
to = (to / snaptype.Erigon2RecentMergeLimit) * snaptype.Erigon2RecentMergeLimit
if to-from < snaptype.Erigon2RecentMergeLimit {
continue
}
to = utils.Min64(to, to-safetyMargin) // We don't want to retire snapshots that are too close to the finalized head
to = (to / snaptype.Erigon2RecentMergeLimit) * snaptype.Erigon2RecentMergeLimit
if err := a.antiquate(from, to); err != nil {
return err
}

View File

@ -17,7 +17,6 @@ import (
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/downloader"
"github.com/ledgerwatch/erigon/cl/abstract"
@ -464,8 +463,6 @@ type CheckSnapshots struct {
chainCfg
outputFolder
withPPROF
Slot uint64 `name:"slot" help:"slot to check"`
}
func (c *CheckSnapshots) Run(ctx *Context) error {
@ -480,7 +477,7 @@ func (c *CheckSnapshots) Run(ctx *Context) error {
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
rawDB := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory)
beaconDB, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false)
_, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false)
if err != nil {
return err
}
@ -503,51 +500,38 @@ func (c *CheckSnapshots) Run(ctx *Context) error {
return err
}
br := &snapshot_format.MockBlockReader{}
snReader := freezeblocks.NewBeaconSnapshotReader(csn, br, beaconDB, beaconConfig)
for i := c.Slot; i < to; i++ {
// Read the original canonical slot
data, err := beaconDB.GetBlock(ctx, tx, i)
genesisHeader, _, _, err := csn.ReadHeader(0)
if err != nil {
return err
}
previousBlockRoot, err := genesisHeader.Header.HashSSZ()
if err != nil {
return err
}
previousBlockSlot := genesisHeader.Header.Slot
for i := uint64(1); i < to; i++ {
if utils.Min64(0, i-320) > previousBlockSlot {
return fmt.Errorf("snapshot %d has invalid slot", i)
}
// Checking of snapshots is a chain contiguity problem
currentHeader, _, _, err := csn.ReadHeader(i)
if err != nil {
return err
}
if data == nil {
if currentHeader == nil {
continue
}
blk := data.Data
if blk == nil {
continue
if currentHeader.Header.ParentRoot != previousBlockRoot {
return fmt.Errorf("snapshot %d has invalid parent root", i)
}
// first thing if the block is bellatrix update the mock block reader
if blk.Version() >= clparams.BellatrixVersion {
br.Block = blk.Block.Body.ExecutionPayload
}
blk2, err := snReader.ReadBlockBySlot(ctx, tx, i)
if err != nil {
log.Error("Error detected in decoding snapshots", "err", err, "slot", i)
return nil
}
if blk2 == nil {
log.Error("Block not found in snapshot", "slot", i)
return nil
}
hash1, _ := blk.Block.HashSSZ()
hash2, _ := blk2.Block.HashSSZ()
if hash1 != hash2 {
log.Error("Mismatching blocks", "slot", i, "gotSlot", blk2.Block.Slot, "datadir", libcommon.Hash(hash1), "snapshot", libcommon.Hash(hash2))
return nil
}
header, _, _, err := csn.ReadHeader(i)
previousBlockRoot, err = currentHeader.Header.HashSSZ()
if err != nil {
return err
}
hash3, _ := header.Header.HashSSZ()
if hash3 != hash2 {
log.Error("Mismatching blocks", "slot", i, "gotSlot", blk2.Block.Slot, "datadir", libcommon.Hash(hash1), "snapshot", libcommon.Hash(hash3))
return nil
previousBlockSlot = currentHeader.Header.Slot
if i%2000 == 0 {
log.Info("Successfully checked", "slot", i)
}
log.Info("Successfully checked", "slot", i)
}
return nil
}

3
cmd/tooling/README.md Normal file
View File

@ -0,0 +1,3 @@
# Tooling
this are a bunch of tools for our scripting necessities

174
cmd/tooling/cli.go Normal file
View File

@ -0,0 +1,174 @@
package main
import (
"fmt"
"math"
"os/exec"
"time"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cmd/caplin/caplin1"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
"golang.org/x/net/context"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon/cl/persistence"
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
"github.com/ledgerwatch/erigon/cl/persistence/db_config"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/log/v3"
)
var CLI struct {
BucketCaplinAutomation BucketCaplinAutomation `cmd:"" help:"migrate from one state to another"`
}
type chainCfg struct {
Chain string `help:"chain" default:"mainnet"`
}
// func (c *chainCfg) configs() (beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, err error) {
// genesisConfig, _, beaconConfig, _, err = clparams.GetConfigsByNetworkName(c.Chain)
// return
// }
type withDatadir struct {
Datadir string `help:"datadir" default:"~/.local/share/erigon" type:"existingdir"`
}
// func (w *withPPROF) withProfile() {
// if w.Pprof {
// debug.StartPProf("localhost:6060", metrics.Setup("localhost:6060", log.Root()))
// }
// }
// func (w *withSentinel) connectSentinel() (sentinel.SentinelClient, error) {
// // YOLO message size
// gconn, err := grpc.Dial(w.Sentinel, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt)))
// if err != nil {
// return nil, err
// }
// return sentinel.NewSentinelClient(gconn), nil
// }
// func openFs(fsName string, path string) (afero.Fs, error) {
// return afero.NewBasePathFs(afero.NewBasePathFs(afero.NewOsFs(), fsName), path), nil
// }
type BucketCaplinAutomation struct {
withDatadir
chainCfg
UploadPeriod time.Duration `help:"upload period" default:"1440h"`
Bucket string `help:"r2 address" default:"http://localhost:8080"`
}
func (c *BucketCaplinAutomation) Run(ctx *Context) error {
_, _, beaconConfig, _, err := clparams.GetConfigsByNetworkName(c.Chain)
if err != nil {
return err
}
log.Root().SetHandler(log.LvlFilterHandler(log.LvlDebug, log.StderrHandler))
log.Info("Started the automation tool for automatic snapshot sanity check and R2 uploading (caplin only)", "chain", c.Chain)
dirs := datadir.New(c.Datadir)
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
tickerTriggerer := time.NewTicker(c.UploadPeriod)
defer tickerTriggerer.Stop()
// do the checking at first run
if err := checkSnapshots(ctx, beaconConfig, dirs); err != nil {
return err
}
log.Info("Uploading snapshots to R2 bucket")
// next upload to R2
command := "rclone"
args := []string{"sync", dirs.Snap, c.Bucket, "--include", "*beaconblocks*"}
if err := exec.Command(command, args...).Run(); err != nil {
return fmt.Errorf("rclone failed, make sure rclone is installed and is properly configured: %s", err)
}
log.Info("Finished snapshots to R2 bucket")
for {
select {
case <-tickerTriggerer.C:
log.Info("Checking snapshots")
if err := checkSnapshots(ctx, beaconConfig, dirs); err != nil {
return err
}
log.Info("Finishing snapshots")
// next upload to R2
command := "rclone"
args := []string{"sync", dirs.Snap, c.Bucket, "--include", "*beaconblocks*"}
log.Info("Uploading snapshots to R2 bucket")
if err := exec.Command(command, args...).Run(); err != nil {
return fmt.Errorf("rclone failed, make sure rclone is installed and is properly configured: %s", err)
}
log.Info("Finished snapshots to R2 bucket")
case <-ctx.Done():
return nil
}
}
}
func checkSnapshots(ctx context.Context, beaconConfig *clparams.BeaconChainConfig, dirs datadir.Dirs) error {
rawDB := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory)
_, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false)
if err != nil {
return err
}
defer db.Close()
var to uint64
tx, err := db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()
to, err = beacon_indicies.ReadHighestFinalized(tx)
if err != nil {
return err
}
to = (to / snaptype.Erigon2RecentMergeLimit) * snaptype.Erigon2RecentMergeLimit
csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, dirs.Snap, log.Root())
if err := csn.ReopenFolder(); err != nil {
return err
}
genesisHeader, _, _, err := csn.ReadHeader(0)
if err != nil {
return err
}
previousBlockRoot, err := genesisHeader.Header.HashSSZ()
if err != nil {
return err
}
previousBlockSlot := genesisHeader.Header.Slot
for i := uint64(1); i < to; i++ {
if utils.Min64(0, i-320) > previousBlockSlot {
return fmt.Errorf("snapshot %d has invalid slot", i)
}
// Checking of snapshots is a chain contiguity problem
currentHeader, _, _, err := csn.ReadHeader(i)
if err != nil {
return err
}
if currentHeader == nil {
continue
}
if currentHeader.Header.ParentRoot != previousBlockRoot {
return fmt.Errorf("snapshot %d has invalid parent root", i)
}
previousBlockRoot, err = currentHeader.Header.HashSSZ()
if err != nil {
return err
}
previousBlockSlot = currentHeader.Header.Slot
if i%20000 == 0 {
log.Info("Successfully checked", "slot", i)
}
}
return nil
}

35
cmd/tooling/main.go Normal file
View File

@ -0,0 +1,35 @@
/*
Copyright 2022 Erigon-Lightclient contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"github.com/alecthomas/kong"
)
type Context struct {
context.Context
kctx *kong.Context
}
func main() {
ctx := kong.Parse(&CLI)
// Call the Run() method of the selected parsed command.
err := ctx.Run(&Context{
kctx: ctx,
Context: context.TODO(),
})
ctx.FatalIfErrorf(err)
}

View File

@ -24,6 +24,7 @@ import (
"net"
"sort"
"strings"
"sync"
)
var lan4, lan6, special4, special6 Netlist
@ -221,7 +222,7 @@ type DistinctNetSet struct {
Subnet uint // number of common prefix bits
Limit uint // maximum number of IPs in each subnet
members map[string]uint
members *sync.Map
buf net.IP
}
@ -229,9 +230,12 @@ type DistinctNetSet struct {
// number of existing IPs in the defined range exceeds the limit.
func (s *DistinctNetSet) Add(ip net.IP) bool {
key := s.key(ip)
n := s.members[string(key)]
n := uint(0)
if value, ok := s.members.Load(string(key)); ok {
n = value.(uint)
}
if n < s.Limit {
s.members[string(key)] = n + 1
s.members.Store(string(key), n+1)
return true
}
return false
@ -240,11 +244,11 @@ func (s *DistinctNetSet) Add(ip net.IP) bool {
// Remove removes an IP from the set.
func (s *DistinctNetSet) Remove(ip net.IP) {
key := s.key(ip)
if n, ok := s.members[string(key)]; ok {
if n == 1 {
delete(s.members, string(key))
if n, ok := s.members.Load(string(key)); ok {
if n.(uint) == 1 {
s.members.Delete(string(key))
} else {
s.members[string(key)] = n - 1
s.members.Store(string(key), n.(uint)-1)
}
}
}
@ -252,16 +256,20 @@ func (s *DistinctNetSet) Remove(ip net.IP) {
// Contains whether the given IP is contained in the set.
func (s DistinctNetSet) Contains(ip net.IP) bool {
key := s.key(ip)
_, ok := s.members[string(key)]
_, ok := s.members.Load(string(key))
return ok
}
// Len returns the number of tracked IPs.
func (s DistinctNetSet) Len() int {
n := uint(0)
for _, i := range s.members {
n += i
if s.members == nil {
return 0
}
s.members.Range(func(_, v interface{}) bool {
n += v.(uint)
return true
})
return int(n)
}
@ -272,7 +280,7 @@ func (s DistinctNetSet) Len() int {
func (s *DistinctNetSet) key(ip net.IP) net.IP {
// Lazily initialize storage.
if s.members == nil {
s.members = make(map[string]uint)
s.members = &sync.Map{}
s.buf = make(net.IP, 17)
}
// Canonicalize ip and bits.
@ -299,10 +307,14 @@ func (s *DistinctNetSet) key(ip net.IP) net.IP {
func (s DistinctNetSet) String() string {
var buf bytes.Buffer
buf.WriteString("{")
keys := make([]string, 0, len(s.members))
for k := range s.members {
keys = append(keys, k)
if s.members == nil {
return "{}"
}
keys := []string{}
s.members.Range(func(k, v interface{}) bool {
keys = append(keys, k.(string))
return true
})
sort.Strings(keys)
for i, k := range keys {
var ip net.IP
@ -312,7 +324,12 @@ func (s DistinctNetSet) String() string {
ip = make(net.IP, 16)
}
copy(ip, k[1:])
fmt.Fprintf(&buf, "%v×%d", ip, s.members[k])
v, ok := s.members.Load(k)
vs := uint(0)
if ok {
vs = v.(uint)
}
fmt.Fprintf(&buf, "%v×%d", ip, vs)
if i != len(keys)-1 {
buf.WriteString(" ")
}