mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-27 22:28:21 +00:00
e3: better prune logs (#703)
This commit is contained in:
parent
bfdf6d80c0
commit
4fec09660e
@ -238,7 +238,7 @@ func (cq *CompressionQueue) Pop() interface{} {
|
||||
|
||||
// reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word
|
||||
func reducedict(ctx context.Context, trace bool, logPrefix, segmentFilePath string, datFile *DecompressedFile, workers int, dictBuilder *DictionaryBuilder, lvl log.Lvl) error {
|
||||
logEvery := time.NewTicker(30 * time.Second)
|
||||
logEvery := time.NewTicker(60 * time.Second)
|
||||
defer logEvery.Stop()
|
||||
|
||||
// DictionaryBuilder is for sorting words by their freuency (to assign codes)
|
||||
|
@ -146,3 +146,17 @@ func ReadAhead(ctx context.Context, db RoDB, progress *atomic.Bool, table string
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
// FirstKey - candidate on move to kv.Tx interface
|
||||
func FirstKey(tx Tx, table string) ([]byte, error) {
|
||||
c, err := tx.Cursor(table)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer c.Close()
|
||||
k, _, err := c.First()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return k, nil
|
||||
}
|
||||
|
@ -471,6 +471,9 @@ func (a *Aggregator22) warmup(txFrom, limit uint64) {
|
||||
if a.db == nil {
|
||||
return
|
||||
}
|
||||
if limit < 10_000 {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := a.db.View(context.Background(), func(tx kv.Tx) error {
|
||||
|
@ -465,22 +465,29 @@ func (h *History) Flush() error {
|
||||
|
||||
type historyWriter struct {
|
||||
h *History
|
||||
historyVals *etl.Collector
|
||||
tmpdir string
|
||||
autoIncrementBuf []byte
|
||||
autoIncrement uint64
|
||||
buffered bool
|
||||
}
|
||||
|
||||
func (h *historyWriter) close() {
|
||||
if h == nil { // allow dobule-close
|
||||
return
|
||||
}
|
||||
h.historyVals.Close()
|
||||
}
|
||||
|
||||
func (h *History) newWriter(tmpdir string) *historyWriter {
|
||||
w := &historyWriter{h: h,
|
||||
tmpdir: tmpdir,
|
||||
autoIncrementBuf: make([]byte, 8),
|
||||
|
||||
buffered: true,
|
||||
historyVals: etl.NewCollector(h.historyValsTable, tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize/16)),
|
||||
}
|
||||
w.historyVals.LogLvl(log.LvlTrace)
|
||||
|
||||
val, err := h.tx.GetOne(h.settingsTable, historyValCountKey)
|
||||
if err != nil {
|
||||
@ -500,6 +507,10 @@ func (h *historyWriter) flush(tx kv.RwTx) error {
|
||||
if err := tx.Put(h.h.settingsTable, historyValCountKey, h.autoIncrementBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := h.historyVals.Load(tx, h.h.historyValsTable, loadFunc, etl.TransformArgs{}); err != nil {
|
||||
return err
|
||||
}
|
||||
h.close()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -543,11 +554,15 @@ func (h *historyWriter) addPrevValue(key1, key2, original []byte) error {
|
||||
//if err := h.h.tx.Put(h.h.settingsTable, historyValCountKey, historyKey[lk:]); err != nil {
|
||||
// return err
|
||||
//}
|
||||
//if err := h.historyVals.Collect(historyKey[lk:], original); err != nil {
|
||||
// return err
|
||||
//}
|
||||
if err := h.h.tx.Put(h.h.historyValsTable, historyKey[lk:], original); err != nil {
|
||||
return err
|
||||
|
||||
if h.buffered {
|
||||
if err := h.historyVals.Collect(historyKey[lk:], original); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := h.h.tx.Put(h.h.historyValsTable, historyKey[lk:], original); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -881,16 +896,6 @@ func (h *History) prune(txFrom, txTo, limit uint64) error {
|
||||
defer historyKeysCursor.Close()
|
||||
var txKey [8]byte
|
||||
binary.BigEndian.PutUint64(txKey[:], txFrom)
|
||||
idxC, err := h.tx.RwCursorDupSort(h.indexTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer idxC.Close()
|
||||
valsC, err := h.tx.RwCursor(h.historyValsTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer valsC.Close()
|
||||
|
||||
k, v, err := historyKeysCursor.Seek(txKey[:])
|
||||
if err != nil {
|
||||
@ -903,9 +908,23 @@ func (h *History) prune(txFrom, txTo, limit uint64) error {
|
||||
if limit != math.MaxUint64 && limit != 0 {
|
||||
txTo = cmp.Min(txTo, txFrom+limit)
|
||||
}
|
||||
if txFrom-txTo > 10_000 {
|
||||
log.Info("[snapshots] prune old history", "name", h.filenameBase, "range", fmt.Sprintf("%.1fm-%.1fm", float64(txFrom)/1_000_000, float64(txTo)/1_000_000))
|
||||
if txFrom >= txTo {
|
||||
return nil
|
||||
}
|
||||
if txTo-txFrom > 10_000 {
|
||||
log.Info("[snapshots] prune old history", "name", h.filenameBase, "range", fmt.Sprintf("%.2fm-%.2fm", float64(txFrom)/1_000_000, float64(txTo)/1_000_000))
|
||||
}
|
||||
|
||||
valsC, err := h.tx.RwCursor(h.historyValsTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer valsC.Close()
|
||||
idxC, err := h.tx.RwCursorDupSort(h.indexTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer idxC.Close()
|
||||
for ; err == nil && k != nil; k, v, err = historyKeysCursor.Next() {
|
||||
txNum := binary.BigEndian.Uint64(k)
|
||||
if txNum >= txTo {
|
||||
|
@ -334,6 +334,7 @@ type invertedIndexWriter struct {
|
||||
index *etl.Collector
|
||||
indexKeys *etl.Collector
|
||||
tmpdir string
|
||||
buffered bool
|
||||
}
|
||||
|
||||
// loadFunc - is analog of etl.Identity, but it signaling to etl - use .Put instead of .AppendDup - to allow duplicates
|
||||
@ -363,7 +364,8 @@ func (ii *invertedIndexWriter) close() {
|
||||
|
||||
func (ii *InvertedIndex) newWriter(tmpdir string) *invertedIndexWriter {
|
||||
w := &invertedIndexWriter{ii: ii,
|
||||
tmpdir: tmpdir,
|
||||
buffered: true,
|
||||
tmpdir: tmpdir,
|
||||
// 3 history + 4 indices = 10 etl collectors, 10*256Mb/16 = 256mb - for all indices buffers
|
||||
// etl collector doesn't fsync: means if have enough ram, all files produced by all collectors will be in ram
|
||||
index: etl.NewCollector(ii.indexTable, tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize/16)),
|
||||
@ -375,18 +377,21 @@ func (ii *InvertedIndex) newWriter(tmpdir string) *invertedIndexWriter {
|
||||
}
|
||||
|
||||
func (ii *invertedIndexWriter) add(key, indexKey []byte) error {
|
||||
//if err := ii.ii.tx.Put(ii.ii.indexKeysTable, ii.ii.txNumBytes[:], key); err != nil {
|
||||
// return err
|
||||
//}
|
||||
if err := ii.indexKeys.Collect(ii.ii.txNumBytes[:], key); err != nil {
|
||||
return err
|
||||
}
|
||||
if ii.buffered {
|
||||
if err := ii.indexKeys.Collect(ii.ii.txNumBytes[:], key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//if err := ii.ii.tx.Put(ii.ii.indexTable, indexKey, ii.ii.txNumBytes[:]); err != nil {
|
||||
// return err
|
||||
//}
|
||||
if err := ii.index.Collect(indexKey, ii.ii.txNumBytes[:]); err != nil {
|
||||
return err
|
||||
if err := ii.index.Collect(indexKey, ii.ii.txNumBytes[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := ii.ii.tx.Put(ii.ii.indexKeysTable, ii.ii.txNumBytes[:], key); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ii.ii.tx.Put(ii.ii.indexTable, indexKey, ii.ii.txNumBytes[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -882,11 +887,6 @@ func (ii *InvertedIndex) prune(txFrom, txTo, limit uint64) error {
|
||||
defer keysCursor.Close()
|
||||
var txKey [8]byte
|
||||
binary.BigEndian.PutUint64(txKey[:], txFrom)
|
||||
idxC, err := ii.tx.RwCursorDupSort(ii.indexTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer idxC.Close()
|
||||
k, v, err := keysCursor.Seek(txKey[:])
|
||||
if err != nil {
|
||||
return err
|
||||
@ -898,9 +898,17 @@ func (ii *InvertedIndex) prune(txFrom, txTo, limit uint64) error {
|
||||
if limit != math.MaxUint64 && limit != 0 {
|
||||
txTo = cmp.Min(txTo, txFrom+limit)
|
||||
}
|
||||
if txFrom-txTo > 10_000 {
|
||||
log.Info("[snapshots] prune old history", "name", ii.filenameBase, "range", fmt.Sprintf("%.1fm-%.1fm", float64(txFrom)/1_000_000, float64(txTo)/1_000_000))
|
||||
if txFrom >= txTo {
|
||||
return nil
|
||||
}
|
||||
if txTo-txFrom > 10_000 {
|
||||
log.Info("[snapshots] prune old history", "name", ii.filenameBase, "range", fmt.Sprintf("%.2fm-%.2fm", float64(txFrom)/1_000_000, float64(txTo)/1_000_000))
|
||||
}
|
||||
idxC, err := ii.tx.RwCursorDupSort(ii.indexTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer idxC.Close()
|
||||
for ; err == nil && k != nil; k, v, err = keysCursor.Next() {
|
||||
txNum := binary.BigEndian.Uint64(k)
|
||||
if txNum >= txTo {
|
||||
|
Loading…
Reference in New Issue
Block a user