From 274f84598c518fedc431f6193c1aa141bdee1e4d Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Thu, 16 Nov 2023 20:59:43 +0100 Subject: [PATCH] Automation tool to automatically upload caplin's snapshot files to R2 (#8747) Upload beacon snapshots to R2 every week by default --- Makefile | 2 + cl/antiquary/antiquary.go | 4 +- cmd/capcli/cli.go | 62 +++++--------- cmd/tooling/README.md | 3 + cmd/tooling/cli.go | 174 ++++++++++++++++++++++++++++++++++++++ cmd/tooling/main.go | 35 ++++++++ p2p/netutil/net.go | 47 ++++++---- 7 files changed, 271 insertions(+), 56 deletions(-) create mode 100644 cmd/tooling/README.md create mode 100644 cmd/tooling/cli.go create mode 100644 cmd/tooling/main.go diff --git a/Makefile b/Makefile index c7f1cab16..01c7b21c1 100644 --- a/Makefile +++ b/Makefile @@ -121,6 +121,8 @@ COMMANDS += evm COMMANDS += sentinel COMMANDS += caplin COMMANDS += caplin-regression +COMMANDS += tooling + # build each command using %.cmd rule diff --git a/cl/antiquary/antiquary.go b/cl/antiquary/antiquary.go index 89bfc1bcf..ca0656782 100644 --- a/cl/antiquary/antiquary.go +++ b/cl/antiquary/antiquary.go @@ -173,11 +173,11 @@ func (a *Antiquary) Loop() error { if from >= to { continue } + to = utils.Min64(to, to-safetyMargin) // We don't want to retire snapshots that are too close to the finalized head + to = (to / snaptype.Erigon2RecentMergeLimit) * snaptype.Erigon2RecentMergeLimit if to-from < snaptype.Erigon2RecentMergeLimit { continue } - to = utils.Min64(to, to-safetyMargin) // We don't want to retire snapshots that are too close to the finalized head - to = (to / snaptype.Erigon2RecentMergeLimit) * snaptype.Erigon2RecentMergeLimit if err := a.antiquate(from, to); err != nil { return err } diff --git a/cmd/capcli/cli.go b/cmd/capcli/cli.go index 8807a9dd4..c3f8726ea 100644 --- a/cmd/capcli/cli.go +++ b/cmd/capcli/cli.go @@ -17,7 +17,6 @@ import ( "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon-lib/chain/snapcfg" - libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/downloader" "github.com/ledgerwatch/erigon/cl/abstract" @@ -464,8 +463,6 @@ type CheckSnapshots struct { chainCfg outputFolder withPPROF - - Slot uint64 `name:"slot" help:"slot to check"` } func (c *CheckSnapshots) Run(ctx *Context) error { @@ -480,7 +477,7 @@ func (c *CheckSnapshots) Run(ctx *Context) error { log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) rawDB := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory) - beaconDB, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false) + _, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false) if err != nil { return err } @@ -503,51 +500,38 @@ func (c *CheckSnapshots) Run(ctx *Context) error { return err } - br := &snapshot_format.MockBlockReader{} - snReader := freezeblocks.NewBeaconSnapshotReader(csn, br, beaconDB, beaconConfig) - for i := c.Slot; i < to; i++ { - // Read the original canonical slot - data, err := beaconDB.GetBlock(ctx, tx, i) + genesisHeader, _, _, err := csn.ReadHeader(0) + if err != nil { + return err + } + previousBlockRoot, err := genesisHeader.Header.HashSSZ() + if err != nil { + return err + } + previousBlockSlot := genesisHeader.Header.Slot + for i := uint64(1); i < to; i++ { + if utils.Min64(0, i-320) > previousBlockSlot { + return fmt.Errorf("snapshot %d has invalid slot", i) + } + // Checking of snapshots is a chain contiguity problem + currentHeader, _, _, err := csn.ReadHeader(i) if err != nil { return err } - if data == nil { + if currentHeader == nil { continue } - blk := data.Data - if blk == nil { - continue + if currentHeader.Header.ParentRoot != previousBlockRoot { + return fmt.Errorf("snapshot %d has invalid parent root", i) } - // first thing if the block is bellatrix update the mock block reader - if blk.Version() >= clparams.BellatrixVersion { - br.Block = blk.Block.Body.ExecutionPayload - } - blk2, err := snReader.ReadBlockBySlot(ctx, tx, i) - if err != nil { - log.Error("Error detected in decoding snapshots", "err", err, "slot", i) - return nil - } - if blk2 == nil { - log.Error("Block not found in snapshot", "slot", i) - return nil - } - - hash1, _ := blk.Block.HashSSZ() - hash2, _ := blk2.Block.HashSSZ() - if hash1 != hash2 { - log.Error("Mismatching blocks", "slot", i, "gotSlot", blk2.Block.Slot, "datadir", libcommon.Hash(hash1), "snapshot", libcommon.Hash(hash2)) - return nil - } - header, _, _, err := csn.ReadHeader(i) + previousBlockRoot, err = currentHeader.Header.HashSSZ() if err != nil { return err } - hash3, _ := header.Header.HashSSZ() - if hash3 != hash2 { - log.Error("Mismatching blocks", "slot", i, "gotSlot", blk2.Block.Slot, "datadir", libcommon.Hash(hash1), "snapshot", libcommon.Hash(hash3)) - return nil + previousBlockSlot = currentHeader.Header.Slot + if i%2000 == 0 { + log.Info("Successfully checked", "slot", i) } - log.Info("Successfully checked", "slot", i) } return nil } diff --git a/cmd/tooling/README.md b/cmd/tooling/README.md new file mode 100644 index 000000000..222adcade --- /dev/null +++ b/cmd/tooling/README.md @@ -0,0 +1,3 @@ +# Tooling + +this are a bunch of tools for our scripting necessities \ No newline at end of file diff --git a/cmd/tooling/cli.go b/cmd/tooling/cli.go new file mode 100644 index 000000000..91f141cc4 --- /dev/null +++ b/cmd/tooling/cli.go @@ -0,0 +1,174 @@ +package main + +import ( + "fmt" + "math" + "os/exec" + "time" + + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cmd/caplin/caplin1" + "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" + "golang.org/x/net/context" + + "github.com/ledgerwatch/erigon-lib/common/datadir" + "github.com/ledgerwatch/erigon-lib/downloader/snaptype" + "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" + "github.com/ledgerwatch/erigon/cl/persistence/db_config" + "github.com/ledgerwatch/erigon/cl/utils" + + "github.com/ledgerwatch/log/v3" +) + +var CLI struct { + BucketCaplinAutomation BucketCaplinAutomation `cmd:"" help:"migrate from one state to another"` +} + +type chainCfg struct { + Chain string `help:"chain" default:"mainnet"` +} + +// func (c *chainCfg) configs() (beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, err error) { +// genesisConfig, _, beaconConfig, _, err = clparams.GetConfigsByNetworkName(c.Chain) +// return +// } + +type withDatadir struct { + Datadir string `help:"datadir" default:"~/.local/share/erigon" type:"existingdir"` +} + +// func (w *withPPROF) withProfile() { +// if w.Pprof { +// debug.StartPProf("localhost:6060", metrics.Setup("localhost:6060", log.Root())) +// } +// } + +// func (w *withSentinel) connectSentinel() (sentinel.SentinelClient, error) { +// // YOLO message size +// gconn, err := grpc.Dial(w.Sentinel, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt))) +// if err != nil { +// return nil, err +// } +// return sentinel.NewSentinelClient(gconn), nil +// } + +// func openFs(fsName string, path string) (afero.Fs, error) { +// return afero.NewBasePathFs(afero.NewBasePathFs(afero.NewOsFs(), fsName), path), nil +// } + +type BucketCaplinAutomation struct { + withDatadir + chainCfg + + UploadPeriod time.Duration `help:"upload period" default:"1440h"` + Bucket string `help:"r2 address" default:"http://localhost:8080"` +} + +func (c *BucketCaplinAutomation) Run(ctx *Context) error { + _, _, beaconConfig, _, err := clparams.GetConfigsByNetworkName(c.Chain) + if err != nil { + return err + } + log.Root().SetHandler(log.LvlFilterHandler(log.LvlDebug, log.StderrHandler)) + log.Info("Started the automation tool for automatic snapshot sanity check and R2 uploading (caplin only)", "chain", c.Chain) + dirs := datadir.New(c.Datadir) + log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) + tickerTriggerer := time.NewTicker(c.UploadPeriod) + defer tickerTriggerer.Stop() + // do the checking at first run + if err := checkSnapshots(ctx, beaconConfig, dirs); err != nil { + return err + } + log.Info("Uploading snapshots to R2 bucket") + // next upload to R2 + command := "rclone" + args := []string{"sync", dirs.Snap, c.Bucket, "--include", "*beaconblocks*"} + if err := exec.Command(command, args...).Run(); err != nil { + return fmt.Errorf("rclone failed, make sure rclone is installed and is properly configured: %s", err) + } + log.Info("Finished snapshots to R2 bucket") + for { + select { + case <-tickerTriggerer.C: + log.Info("Checking snapshots") + if err := checkSnapshots(ctx, beaconConfig, dirs); err != nil { + return err + } + log.Info("Finishing snapshots") + // next upload to R2 + command := "rclone" + args := []string{"sync", dirs.Snap, c.Bucket, "--include", "*beaconblocks*"} + log.Info("Uploading snapshots to R2 bucket") + if err := exec.Command(command, args...).Run(); err != nil { + return fmt.Errorf("rclone failed, make sure rclone is installed and is properly configured: %s", err) + } + log.Info("Finished snapshots to R2 bucket") + case <-ctx.Done(): + return nil + } + } +} + +func checkSnapshots(ctx context.Context, beaconConfig *clparams.BeaconChainConfig, dirs datadir.Dirs) error { + rawDB := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory) + _, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false) + if err != nil { + return err + } + defer db.Close() + var to uint64 + tx, err := db.BeginRo(ctx) + if err != nil { + return err + } + defer tx.Rollback() + + to, err = beacon_indicies.ReadHighestFinalized(tx) + if err != nil { + return err + } + + to = (to / snaptype.Erigon2RecentMergeLimit) * snaptype.Erigon2RecentMergeLimit + + csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, dirs.Snap, log.Root()) + if err := csn.ReopenFolder(); err != nil { + return err + } + + genesisHeader, _, _, err := csn.ReadHeader(0) + if err != nil { + return err + } + previousBlockRoot, err := genesisHeader.Header.HashSSZ() + if err != nil { + return err + } + previousBlockSlot := genesisHeader.Header.Slot + for i := uint64(1); i < to; i++ { + if utils.Min64(0, i-320) > previousBlockSlot { + return fmt.Errorf("snapshot %d has invalid slot", i) + } + // Checking of snapshots is a chain contiguity problem + currentHeader, _, _, err := csn.ReadHeader(i) + if err != nil { + return err + } + if currentHeader == nil { + continue + } + if currentHeader.Header.ParentRoot != previousBlockRoot { + return fmt.Errorf("snapshot %d has invalid parent root", i) + } + previousBlockRoot, err = currentHeader.Header.HashSSZ() + if err != nil { + return err + } + previousBlockSlot = currentHeader.Header.Slot + if i%20000 == 0 { + log.Info("Successfully checked", "slot", i) + } + } + return nil +} diff --git a/cmd/tooling/main.go b/cmd/tooling/main.go new file mode 100644 index 000000000..d2d04e6dc --- /dev/null +++ b/cmd/tooling/main.go @@ -0,0 +1,35 @@ +/* + Copyright 2022 Erigon-Lightclient contributors + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +import ( + "context" + + "github.com/alecthomas/kong" +) + +type Context struct { + context.Context + kctx *kong.Context +} + +func main() { + ctx := kong.Parse(&CLI) + // Call the Run() method of the selected parsed command. + err := ctx.Run(&Context{ + kctx: ctx, + Context: context.TODO(), + }) + ctx.FatalIfErrorf(err) +} diff --git a/p2p/netutil/net.go b/p2p/netutil/net.go index d5da3c694..2c9818c8f 100644 --- a/p2p/netutil/net.go +++ b/p2p/netutil/net.go @@ -24,6 +24,7 @@ import ( "net" "sort" "strings" + "sync" ) var lan4, lan6, special4, special6 Netlist @@ -221,7 +222,7 @@ type DistinctNetSet struct { Subnet uint // number of common prefix bits Limit uint // maximum number of IPs in each subnet - members map[string]uint + members *sync.Map buf net.IP } @@ -229,9 +230,12 @@ type DistinctNetSet struct { // number of existing IPs in the defined range exceeds the limit. func (s *DistinctNetSet) Add(ip net.IP) bool { key := s.key(ip) - n := s.members[string(key)] + n := uint(0) + if value, ok := s.members.Load(string(key)); ok { + n = value.(uint) + } if n < s.Limit { - s.members[string(key)] = n + 1 + s.members.Store(string(key), n+1) return true } return false @@ -240,11 +244,11 @@ func (s *DistinctNetSet) Add(ip net.IP) bool { // Remove removes an IP from the set. func (s *DistinctNetSet) Remove(ip net.IP) { key := s.key(ip) - if n, ok := s.members[string(key)]; ok { - if n == 1 { - delete(s.members, string(key)) + if n, ok := s.members.Load(string(key)); ok { + if n.(uint) == 1 { + s.members.Delete(string(key)) } else { - s.members[string(key)] = n - 1 + s.members.Store(string(key), n.(uint)-1) } } } @@ -252,16 +256,20 @@ func (s *DistinctNetSet) Remove(ip net.IP) { // Contains whether the given IP is contained in the set. func (s DistinctNetSet) Contains(ip net.IP) bool { key := s.key(ip) - _, ok := s.members[string(key)] + _, ok := s.members.Load(string(key)) return ok } // Len returns the number of tracked IPs. func (s DistinctNetSet) Len() int { n := uint(0) - for _, i := range s.members { - n += i + if s.members == nil { + return 0 } + s.members.Range(func(_, v interface{}) bool { + n += v.(uint) + return true + }) return int(n) } @@ -272,7 +280,7 @@ func (s DistinctNetSet) Len() int { func (s *DistinctNetSet) key(ip net.IP) net.IP { // Lazily initialize storage. if s.members == nil { - s.members = make(map[string]uint) + s.members = &sync.Map{} s.buf = make(net.IP, 17) } // Canonicalize ip and bits. @@ -299,10 +307,14 @@ func (s *DistinctNetSet) key(ip net.IP) net.IP { func (s DistinctNetSet) String() string { var buf bytes.Buffer buf.WriteString("{") - keys := make([]string, 0, len(s.members)) - for k := range s.members { - keys = append(keys, k) + if s.members == nil { + return "{}" } + keys := []string{} + s.members.Range(func(k, v interface{}) bool { + keys = append(keys, k.(string)) + return true + }) sort.Strings(keys) for i, k := range keys { var ip net.IP @@ -312,7 +324,12 @@ func (s DistinctNetSet) String() string { ip = make(net.IP, 16) } copy(ip, k[1:]) - fmt.Fprintf(&buf, "%v×%d", ip, s.members[k]) + v, ok := s.members.Load(k) + vs := uint(0) + if ok { + vs = v.(uint) + } + fmt.Fprintf(&buf, "%v×%d", ip, vs) if i != len(keys)-1 { buf.WriteString(" ") }