From 5b7f67deaefe57e4497905fff4933688298b645a Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Mon, 15 Nov 2021 21:19:56 +0700 Subject: [PATCH] Snapshot naming (#163) * save * save * save * save * save * save --- aggregator/aggregator.go | 8 ++++---- compress/compress.go | 15 +++++++++------ compress/decompress.go | 4 ++++ recsplit/index.go | 4 ++-- recsplit/recsplit.go | 12 ++++++++++++ 5 files changed, 31 insertions(+), 12 deletions(-) diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 01500923b..1be9fc059 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -424,7 +424,7 @@ func buildIndex(datPath, idxPath, tmpDir string, count int) (*compress.Decompres } } var idx *recsplit.Index - if idx, err = recsplit.NewIndex(idxPath); err != nil { + if idx, err = recsplit.OpenIndex(idxPath); err != nil { return nil, nil, err } return d, idx, nil @@ -694,19 +694,19 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64) ( if item.accountsD, err = compress.NewDecompressor(path.Join(diffDir, fmt.Sprintf("accounts.%d-%d.dat", item.startBlock, item.endBlock))); err != nil { return false } - if item.accountsIdx, err = recsplit.NewIndex(path.Join(diffDir, fmt.Sprintf("accounts.%d-%d.idx", item.startBlock, item.endBlock))); err != nil { + if item.accountsIdx, err = recsplit.OpenIndex(path.Join(diffDir, fmt.Sprintf("accounts.%d-%d.idx", item.startBlock, item.endBlock))); err != nil { return false } if item.codeD, err = compress.NewDecompressor(path.Join(diffDir, fmt.Sprintf("code.%d-%d.dat", item.startBlock, item.endBlock))); err != nil { return false } - if item.codeIdx, err = recsplit.NewIndex(path.Join(diffDir, fmt.Sprintf("code.%d-%d.idx", item.startBlock, item.endBlock))); err != nil { + if item.codeIdx, err = recsplit.OpenIndex(path.Join(diffDir, fmt.Sprintf("code.%d-%d.idx", item.startBlock, item.endBlock))); err != nil { return false } if item.storageD, err = compress.NewDecompressor(path.Join(diffDir, fmt.Sprintf("storage.%d-%d.dat", item.startBlock, item.endBlock))); err != nil { return false } - if item.storageIdx, err = recsplit.NewIndex(path.Join(diffDir, fmt.Sprintf("storage.%d-%d.idx", item.startBlock, item.endBlock))); err != nil { + if item.storageIdx, err = recsplit.OpenIndex(path.Join(diffDir, fmt.Sprintf("storage.%d-%d.idx", item.startBlock, item.endBlock))); err != nil { return false } return true diff --git a/compress/compress.go b/compress/compress.go index 19ec9f610..2c346bdbc 100644 --- a/compress/compress.go +++ b/compress/compress.go @@ -560,6 +560,12 @@ func (c *Compressor) Compress() error { return nil } +func (c *Compressor) Close() { + c.collector.Close() + c.wordFile.Close() + c.interFile.Close() +} + func (c *Compressor) findMatches() error { // Build patricia tree out of the patterns in the dictionary, for further matching in individual words // Allocate temporary initial codes to the patterns so that patterns with higher scores get smaller code @@ -800,7 +806,10 @@ func (c *Compressor) optimiseCodes() error { if err != nil { return err } + defer cf.Close() + defer cf.Sync() cw := bufio.NewWriterSize(cf, etl.BufIOSize) + defer cw.Flush() // First, output dictionary size binary.BigEndian.PutUint64(c.numBuf[:], offset) // Dictionary size if _, err = cw.Write(c.numBuf[:8]); err != nil { @@ -1035,12 +1044,6 @@ func (c *Compressor) optimiseCodes() error { if e != nil && !errors.Is(e, io.EOF) { return e } - if err = cw.Flush(); err != nil { - return err - } - if err = cf.Close(); err != nil { - return err - } return nil } diff --git a/compress/decompress.go b/compress/decompress.go index 9442e0e02..7ae3c516f 100644 --- a/compress/decompress.go +++ b/compress/decompress.go @@ -18,6 +18,7 @@ package compress import ( "encoding/binary" + "fmt" "os" "github.com/ledgerwatch/erigon-lib/mmap" @@ -49,6 +50,9 @@ func NewDecompressor(compressedFile string) (*Decompressor, error) { return nil, err } size := int(stat.Size()) + if size < 24 { + return nil, fmt.Errorf("compressed file is too short") + } if d.mmapHandle1, d.mmapHandle2, err = mmap.Mmap(d.f, size); err != nil { return nil, err } diff --git a/recsplit/index.go b/recsplit/index.go index ad5e61b7b..0c35c7f2b 100644 --- a/recsplit/index.go +++ b/recsplit/index.go @@ -54,14 +54,14 @@ type Index struct { } func MustOpen(indexFile string) *Index { - idx, err := NewIndex(indexFile) + idx, err := OpenIndex(indexFile) if err != nil { panic(err) } return idx } -func NewIndex(indexFile string) (*Index, error) { +func OpenIndex(indexFile string) (*Index, error) { idx := &Index{ indexFile: indexFile, } diff --git a/recsplit/recsplit.go b/recsplit/recsplit.go index 837f54168..2fd087b5c 100644 --- a/recsplit/recsplit.go +++ b/recsplit/recsplit.go @@ -146,6 +146,18 @@ func NewRecSplit(args RecSplitArgs) (*RecSplit, error) { return rs, nil } +func (rs *RecSplit) Close() { + if rs.indexF != nil { + rs.indexF.Close() + } + if rs.bucketCollector != nil { + rs.bucketCollector.Close() + } + if rs.offsetCollector != nil { + rs.offsetCollector.Close() + } +} + func (rs *RecSplit) SetTrace(trace bool) { rs.trace = trace }