Snapshots: save initial list to db, to avoid future snapshots downloading #4625

This commit is contained in:
Alex Sharov 2022-07-04 18:44:15 +06:00 committed by GitHub
parent 99d9535fd8
commit ff847cd459
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 15 deletions

View File

@ -109,13 +109,6 @@ func (d *Downloader) SnapDir() string {
return d.cfg.DataDir
}
func (d *Downloader) IsInitialSync() bool {
d.clientLock.RLock()
defer d.clientLock.RUnlock()
_, lastPart := filepath.Split(d.cfg.DataDir)
return lastPart == "tmp"
}
func (d *Downloader) ReCalcStats(interval time.Duration) {
d.statsLock.Lock()
defer d.statsLock.Unlock()

View File

@ -28,8 +28,6 @@ type GrpcServer struct {
// Download - create new .torrent ONLY if initialSync, everything else Erigon can generate by itself
func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
isInitialSync := s.d.IsInitialSync()
torrentClient := s.d.Torrent()
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
for _, it := range request.Items {
@ -38,6 +36,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
return nil, err
}
}
ok, err := AddSegment(it.Path, s.d.SnapDir(), torrentClient)
if err != nil {
return nil, fmt.Errorf("AddSegment: %w", err)
@ -46,10 +45,6 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
continue
}
if !isInitialSync {
continue
}
hash := Proto2InfoHash(it.TorrentHash)
if _, ok := torrentClient.Torrent(hash); ok {
continue

View File

@ -1616,3 +1616,30 @@ func IsPosBlock(db kv.Getter, blockHash common.Hash) (trans bool, err error) {
return header.Difficulty.Cmp(common.Big0) == 0, nil
}
func ReadSnapshots(tx kv.Tx) (map[string]string, error) {
res := map[string]string{}
if err := tx.ForEach(kv.Snapshots, nil, func(k, v []byte) error {
res[string(k)] = string(v)
return nil
}); err != nil {
return nil, err
}
return res, nil
}
func WriteSnapshots(tx kv.RwTx, list map[string]string) error {
for k, v := range list {
has, err := tx.Has(kv.Snapshots, []byte(k))
if err != nil {
return err
}
if has {
continue
}
if err = tx.Put(kv.Snapshots, []byte(k), []byte(v)); err != nil {
return err
}
}
return nil
}

View File

@ -1165,7 +1165,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
return nil
}
if err := WaitForDownloader(ctx, cfg); err != nil {
if err := WaitForDownloader(ctx, cfg, tx); err != nil {
return err
}
if err := cfg.snapshots.Reopen(); err != nil {
@ -1280,16 +1280,30 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
// WaitForDownloader - wait for Downloader service to download all expected snapshots
// for MVP we sync with Downloader only once, in future will send new snapshots also
func WaitForDownloader(ctx context.Context, cfg HeadersCfg) error {
func WaitForDownloader(ctx context.Context, cfg HeadersCfg, tx kv.RwTx) error {
if cfg.snapshots.Cfg().NoDownloader {
return nil
}
snInDB, err := rawdb.ReadSnapshots(tx)
if err != nil {
return err
}
dbEmpty := len(snInDB) == 0
// send all hashes to the Downloader service
preverified := snapshothashes.KnownConfig(cfg.chainConfig.ChainName).Preverified
req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(preverified))}
i := 0
for _, p := range preverified {
_, has := snInDB[p.Name]
if !dbEmpty && !has {
continue
}
if dbEmpty {
snInDB[p.Name] = p.Hash
}
req.Items = append(req.Items, &proto_downloader.DownloadItem{
TorrentHash: downloadergrpc.String2Proto(p.Hash),
Path: p.Name,
@ -1361,5 +1375,11 @@ Finish:
return err
}
}
if dbEmpty {
if err = rawdb.WriteSnapshots(tx, snInDB); err != nil {
return err
}
}
return nil
}