erigon-pulse/common/etl/dataprovider.go
ledgerwatch 7ab10c85af
Separate Collector pattern out of ETL, hash collector for rebuilding Intermediate Hashes (#597)
* Introduce hashCollector

* Add HashCollector to SubTrieLoader

* Fix linter

* Reset hashed state

* Not to regenerate the hashed state

* Not to delete state

* Fix linter

* Print expected hash in the beginning

* Simplify

* Remove initialTrie

* Use etl to buffer intermediate hashes

* Copy values, not insert empty key

* Compress instead of decompress

* Enhance file buffer logging, fix linter

* Fix compile errors

* Fix log

* Fix logging

* Exclude zero key again

* Add rewind

* Restrict timestamps

* Fix

* Fix formatting

* Incorporate separation

* Extract identityLoadFunction

* Fix formatting
2020-05-31 13:23:34 +01:00

134 lines
3.0 KiB
Go

package etl
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"os"
"runtime"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ugorji/go/codec"
)
type dataProvider interface {
Next(*codec.Decoder) ([]byte, []byte, error)
Dispose() error
}
type fileDataProvider struct {
file *os.File
reader io.Reader
}
func FlushToDisk(currentKey []byte, b *sortableBuffer, datadir string) (dataProvider, error) {
if len(b.entries) == 0 {
return nil, nil
}
bufferFile, err := ioutil.TempFile(datadir, "tg-sync-sortable-buf")
if err != nil {
return nil, err
}
w := bufio.NewWriter(bufferFile)
defer w.Flush() //nolint:errcheck
defer func() {
bufferFile.Sync() //nolint:errcheck
}()
b.encoder.Reset(w)
for i := range b.entries {
err = writeToDisk(b.encoder, b.entries[i].key, b.entries[i].value)
if err != nil {
return nil, fmt.Errorf("error writing entries to disk: %v", err)
}
}
var m runtime.MemStats
runtime.ReadMemStats(&m)
var currentKeyStr string
if currentKey == nil {
currentKeyStr = "final"
} else if len(currentKey) < 4 {
currentKeyStr = fmt.Sprintf("%x", currentKey)
} else {
currentKeyStr = fmt.Sprintf("%x...", currentKey[:4])
}
log.Info(
"Flushed buffer file",
"current key", currentKeyStr,
"name", bufferFile.Name(),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
b.entries = b.entries[:0] // keep the capacity
b.size = 0
return &fileDataProvider{bufferFile, nil}, nil
}
func (p *fileDataProvider) Next(decoder *codec.Decoder) ([]byte, []byte, error) {
if p.reader == nil {
_, err := p.file.Seek(0, 0)
if err != nil {
return nil, nil, err
}
p.reader = bufio.NewReader(p.file)
}
decoder.Reset(p.reader)
return readElementFromDisk(decoder)
}
func (p *fileDataProvider) Dispose() error {
errClose := p.file.Close()
errRemove := os.Remove(p.file.Name())
if errClose != nil {
return errClose
}
if errRemove != nil {
return errRemove
}
return nil
}
func (p *fileDataProvider) String() string {
return fmt.Sprintf("%T(file: %s)", p, p.file.Name())
}
func writeToDisk(encoder *codec.Encoder, key []byte, value []byte) error {
toWrite := [][]byte{key, value}
return encoder.Encode(toWrite)
}
func readElementFromDisk(decoder Decoder) ([]byte, []byte, error) {
result := make([][]byte, 2)
err := decoder.Decode(&result)
return result[0], result[1], err
}
type memoryDataProvider struct {
buffer *sortableBuffer
currentIndex int
}
func KeepInRAM(buffer *sortableBuffer) dataProvider {
return &memoryDataProvider{buffer, 0}
}
func (p *memoryDataProvider) Next(decoder *codec.Decoder) ([]byte, []byte, error) {
if p.currentIndex >= p.buffer.Len() {
return nil, nil, io.EOF
}
entry := p.buffer.Get(p.currentIndex)
p.currentIndex++
return entry.key, entry.value, nil
}
func (p *memoryDataProvider) Dispose() error {
return nil
}
func (p *memoryDataProvider) String() string {
return fmt.Sprintf("%T(buffer.Len: %d)", p, p.buffer.Len())
}