diff --git a/compress/parallel_compress.go b/compress/parallel_compress.go index 3ed36370f..835151689 100644 --- a/compress/parallel_compress.go +++ b/compress/parallel_compress.go @@ -238,7 +238,7 @@ func (cq *CompressionQueue) Pop() interface{} { // reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word func reducedict(ctx context.Context, trace bool, logPrefix, segmentFilePath string, datFile *DecompressedFile, workers int, dictBuilder *DictionaryBuilder, lvl log.Lvl) error { - logEvery := time.NewTicker(30 * time.Second) + logEvery := time.NewTicker(60 * time.Second) defer logEvery.Stop() // DictionaryBuilder is for sorting words by their freuency (to assign codes) diff --git a/kv/helpers.go b/kv/helpers.go index a012c0807..d506299d6 100644 --- a/kv/helpers.go +++ b/kv/helpers.go @@ -146,3 +146,17 @@ func ReadAhead(ctx context.Context, db RoDB, progress *atomic.Bool, table string }) }() } + +// FirstKey - candidate on move to kv.Tx interface +func FirstKey(tx Tx, table string) ([]byte, error) { + c, err := tx.Cursor(table) + if err != nil { + return nil, err + } + defer c.Close() + k, _, err := c.First() + if err != nil { + return nil, err + } + return k, nil +} diff --git a/state/aggregator22.go b/state/aggregator22.go index c65837daa..d108439eb 100644 --- a/state/aggregator22.go +++ b/state/aggregator22.go @@ -471,6 +471,9 @@ func (a *Aggregator22) warmup(txFrom, limit uint64) { if a.db == nil { return } + if limit < 10_000 { + return + } go func() { if err := a.db.View(context.Background(), func(tx kv.Tx) error { diff --git a/state/history.go b/state/history.go index 7cca4c7e6..e2347c7b9 100644 --- a/state/history.go +++ b/state/history.go @@ -465,22 +465,29 @@ func (h *History) Flush() error { type historyWriter struct { h *History + historyVals *etl.Collector tmpdir string autoIncrementBuf []byte autoIncrement uint64 + buffered bool } func (h *historyWriter) close() { if h == nil { // allow dobule-close return } + h.historyVals.Close() } func (h *History) newWriter(tmpdir string) *historyWriter { w := &historyWriter{h: h, tmpdir: tmpdir, autoIncrementBuf: make([]byte, 8), + + buffered: true, + historyVals: etl.NewCollector(h.historyValsTable, tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize/16)), } + w.historyVals.LogLvl(log.LvlTrace) val, err := h.tx.GetOne(h.settingsTable, historyValCountKey) if err != nil { @@ -500,6 +507,10 @@ func (h *historyWriter) flush(tx kv.RwTx) error { if err := tx.Put(h.h.settingsTable, historyValCountKey, h.autoIncrementBuf); err != nil { return err } + if err := h.historyVals.Load(tx, h.h.historyValsTable, loadFunc, etl.TransformArgs{}); err != nil { + return err + } + h.close() return nil } @@ -543,11 +554,15 @@ func (h *historyWriter) addPrevValue(key1, key2, original []byte) error { //if err := h.h.tx.Put(h.h.settingsTable, historyValCountKey, historyKey[lk:]); err != nil { // return err //} - //if err := h.historyVals.Collect(historyKey[lk:], original); err != nil { - // return err - //} - if err := h.h.tx.Put(h.h.historyValsTable, historyKey[lk:], original); err != nil { - return err + + if h.buffered { + if err := h.historyVals.Collect(historyKey[lk:], original); err != nil { + return err + } + } else { + if err := h.h.tx.Put(h.h.historyValsTable, historyKey[lk:], original); err != nil { + return err + } } } @@ -881,16 +896,6 @@ func (h *History) prune(txFrom, txTo, limit uint64) error { defer historyKeysCursor.Close() var txKey [8]byte binary.BigEndian.PutUint64(txKey[:], txFrom) - idxC, err := h.tx.RwCursorDupSort(h.indexTable) - if err != nil { - return err - } - defer idxC.Close() - valsC, err := h.tx.RwCursor(h.historyValsTable) - if err != nil { - return err - } - defer valsC.Close() k, v, err := historyKeysCursor.Seek(txKey[:]) if err != nil { @@ -903,9 +908,23 @@ func (h *History) prune(txFrom, txTo, limit uint64) error { if limit != math.MaxUint64 && limit != 0 { txTo = cmp.Min(txTo, txFrom+limit) } - if txFrom-txTo > 10_000 { - log.Info("[snapshots] prune old history", "name", h.filenameBase, "range", fmt.Sprintf("%.1fm-%.1fm", float64(txFrom)/1_000_000, float64(txTo)/1_000_000)) + if txFrom >= txTo { + return nil } + if txTo-txFrom > 10_000 { + log.Info("[snapshots] prune old history", "name", h.filenameBase, "range", fmt.Sprintf("%.2fm-%.2fm", float64(txFrom)/1_000_000, float64(txTo)/1_000_000)) + } + + valsC, err := h.tx.RwCursor(h.historyValsTable) + if err != nil { + return err + } + defer valsC.Close() + idxC, err := h.tx.RwCursorDupSort(h.indexTable) + if err != nil { + return err + } + defer idxC.Close() for ; err == nil && k != nil; k, v, err = historyKeysCursor.Next() { txNum := binary.BigEndian.Uint64(k) if txNum >= txTo { diff --git a/state/inverted_index.go b/state/inverted_index.go index 5d742f2e9..9db21f288 100644 --- a/state/inverted_index.go +++ b/state/inverted_index.go @@ -334,6 +334,7 @@ type invertedIndexWriter struct { index *etl.Collector indexKeys *etl.Collector tmpdir string + buffered bool } // loadFunc - is analog of etl.Identity, but it signaling to etl - use .Put instead of .AppendDup - to allow duplicates @@ -363,7 +364,8 @@ func (ii *invertedIndexWriter) close() { func (ii *InvertedIndex) newWriter(tmpdir string) *invertedIndexWriter { w := &invertedIndexWriter{ii: ii, - tmpdir: tmpdir, + buffered: true, + tmpdir: tmpdir, // 3 history + 4 indices = 10 etl collectors, 10*256Mb/16 = 256mb - for all indices buffers // etl collector doesn't fsync: means if have enough ram, all files produced by all collectors will be in ram index: etl.NewCollector(ii.indexTable, tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize/16)), @@ -375,18 +377,21 @@ func (ii *InvertedIndex) newWriter(tmpdir string) *invertedIndexWriter { } func (ii *invertedIndexWriter) add(key, indexKey []byte) error { - //if err := ii.ii.tx.Put(ii.ii.indexKeysTable, ii.ii.txNumBytes[:], key); err != nil { - // return err - //} - if err := ii.indexKeys.Collect(ii.ii.txNumBytes[:], key); err != nil { - return err - } + if ii.buffered { + if err := ii.indexKeys.Collect(ii.ii.txNumBytes[:], key); err != nil { + return err + } - //if err := ii.ii.tx.Put(ii.ii.indexTable, indexKey, ii.ii.txNumBytes[:]); err != nil { - // return err - //} - if err := ii.index.Collect(indexKey, ii.ii.txNumBytes[:]); err != nil { - return err + if err := ii.index.Collect(indexKey, ii.ii.txNumBytes[:]); err != nil { + return err + } + } else { + if err := ii.ii.tx.Put(ii.ii.indexKeysTable, ii.ii.txNumBytes[:], key); err != nil { + return err + } + if err := ii.ii.tx.Put(ii.ii.indexTable, indexKey, ii.ii.txNumBytes[:]); err != nil { + return err + } } return nil } @@ -882,11 +887,6 @@ func (ii *InvertedIndex) prune(txFrom, txTo, limit uint64) error { defer keysCursor.Close() var txKey [8]byte binary.BigEndian.PutUint64(txKey[:], txFrom) - idxC, err := ii.tx.RwCursorDupSort(ii.indexTable) - if err != nil { - return err - } - defer idxC.Close() k, v, err := keysCursor.Seek(txKey[:]) if err != nil { return err @@ -898,9 +898,17 @@ func (ii *InvertedIndex) prune(txFrom, txTo, limit uint64) error { if limit != math.MaxUint64 && limit != 0 { txTo = cmp.Min(txTo, txFrom+limit) } - if txFrom-txTo > 10_000 { - log.Info("[snapshots] prune old history", "name", ii.filenameBase, "range", fmt.Sprintf("%.1fm-%.1fm", float64(txFrom)/1_000_000, float64(txTo)/1_000_000)) + if txFrom >= txTo { + return nil } + if txTo-txFrom > 10_000 { + log.Info("[snapshots] prune old history", "name", ii.filenameBase, "range", fmt.Sprintf("%.2fm-%.2fm", float64(txFrom)/1_000_000, float64(txTo)/1_000_000)) + } + idxC, err := ii.tx.RwCursorDupSort(ii.indexTable) + if err != nil { + return err + } + defer idxC.Close() for ; err == nil && k != nil; k, v, err = keysCursor.Next() { txNum := binary.BigEndian.Uint64(k) if txNum >= txTo {