Snapshot naming (#163)

* save

* save

* save

* save

* save

* save
This commit is contained in:
Alex Sharov 2021-11-15 21:19:56 +07:00 committed by GitHub
parent eefbde1443
commit 5b7f67deae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 31 additions and 12 deletions

View File

@ -424,7 +424,7 @@ func buildIndex(datPath, idxPath, tmpDir string, count int) (*compress.Decompres
}
}
var idx *recsplit.Index
if idx, err = recsplit.NewIndex(idxPath); err != nil {
if idx, err = recsplit.OpenIndex(idxPath); err != nil {
return nil, nil, err
}
return d, idx, nil
@ -694,19 +694,19 @@ func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64) (
if item.accountsD, err = compress.NewDecompressor(path.Join(diffDir, fmt.Sprintf("accounts.%d-%d.dat", item.startBlock, item.endBlock))); err != nil {
return false
}
if item.accountsIdx, err = recsplit.NewIndex(path.Join(diffDir, fmt.Sprintf("accounts.%d-%d.idx", item.startBlock, item.endBlock))); err != nil {
if item.accountsIdx, err = recsplit.OpenIndex(path.Join(diffDir, fmt.Sprintf("accounts.%d-%d.idx", item.startBlock, item.endBlock))); err != nil {
return false
}
if item.codeD, err = compress.NewDecompressor(path.Join(diffDir, fmt.Sprintf("code.%d-%d.dat", item.startBlock, item.endBlock))); err != nil {
return false
}
if item.codeIdx, err = recsplit.NewIndex(path.Join(diffDir, fmt.Sprintf("code.%d-%d.idx", item.startBlock, item.endBlock))); err != nil {
if item.codeIdx, err = recsplit.OpenIndex(path.Join(diffDir, fmt.Sprintf("code.%d-%d.idx", item.startBlock, item.endBlock))); err != nil {
return false
}
if item.storageD, err = compress.NewDecompressor(path.Join(diffDir, fmt.Sprintf("storage.%d-%d.dat", item.startBlock, item.endBlock))); err != nil {
return false
}
if item.storageIdx, err = recsplit.NewIndex(path.Join(diffDir, fmt.Sprintf("storage.%d-%d.idx", item.startBlock, item.endBlock))); err != nil {
if item.storageIdx, err = recsplit.OpenIndex(path.Join(diffDir, fmt.Sprintf("storage.%d-%d.idx", item.startBlock, item.endBlock))); err != nil {
return false
}
return true

View File

@ -560,6 +560,12 @@ func (c *Compressor) Compress() error {
return nil
}
func (c *Compressor) Close() {
c.collector.Close()
c.wordFile.Close()
c.interFile.Close()
}
func (c *Compressor) findMatches() error {
// Build patricia tree out of the patterns in the dictionary, for further matching in individual words
// Allocate temporary initial codes to the patterns so that patterns with higher scores get smaller code
@ -800,7 +806,10 @@ func (c *Compressor) optimiseCodes() error {
if err != nil {
return err
}
defer cf.Close()
defer cf.Sync()
cw := bufio.NewWriterSize(cf, etl.BufIOSize)
defer cw.Flush()
// First, output dictionary size
binary.BigEndian.PutUint64(c.numBuf[:], offset) // Dictionary size
if _, err = cw.Write(c.numBuf[:8]); err != nil {
@ -1035,12 +1044,6 @@ func (c *Compressor) optimiseCodes() error {
if e != nil && !errors.Is(e, io.EOF) {
return e
}
if err = cw.Flush(); err != nil {
return err
}
if err = cf.Close(); err != nil {
return err
}
return nil
}

View File

@ -18,6 +18,7 @@ package compress
import (
"encoding/binary"
"fmt"
"os"
"github.com/ledgerwatch/erigon-lib/mmap"
@ -49,6 +50,9 @@ func NewDecompressor(compressedFile string) (*Decompressor, error) {
return nil, err
}
size := int(stat.Size())
if size < 24 {
return nil, fmt.Errorf("compressed file is too short")
}
if d.mmapHandle1, d.mmapHandle2, err = mmap.Mmap(d.f, size); err != nil {
return nil, err
}

View File

@ -54,14 +54,14 @@ type Index struct {
}
func MustOpen(indexFile string) *Index {
idx, err := NewIndex(indexFile)
idx, err := OpenIndex(indexFile)
if err != nil {
panic(err)
}
return idx
}
func NewIndex(indexFile string) (*Index, error) {
func OpenIndex(indexFile string) (*Index, error) {
idx := &Index{
indexFile: indexFile,
}

View File

@ -146,6 +146,18 @@ func NewRecSplit(args RecSplitArgs) (*RecSplit, error) {
return rs, nil
}
func (rs *RecSplit) Close() {
if rs.indexF != nil {
rs.indexF.Close()
}
if rs.bucketCollector != nil {
rs.bucketCollector.Close()
}
if rs.offsetCollector != nil {
rs.offsetCollector.Close()
}
}
func (rs *RecSplit) SetTrace(trace bool) {
rs.trace = trace
}