move DictAggregator to erigon-lib (#2933)

This commit is contained in:
Alex Sharov 2021-11-09 10:12:25 +07:00 committed by GitHub
parent 3d3a1d30cc
commit 3842351293
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 97 deletions

View File

@ -13,6 +13,7 @@ import (
"io"
"io/ioutil"
"math/big"
"net/http"
_ "net/http/pprof" //nolint:gosec
"os"
"os/signal"
@ -55,7 +56,6 @@ import (
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb"
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/metrics/exp"
"github.com/ledgerwatch/erigon/migrations"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"
@ -64,7 +64,7 @@ import (
"github.com/wcharczuk/go-chart/v2"
)
const ASSERT = true
const ASSERT = false
var (
verbosity = flag.Uint("verbosity", 3, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default 3)")
@ -1311,7 +1311,7 @@ func mphf(chaindata string, block int) error {
if rs, err = recsplit.NewRecSplit(recsplit.RecSplitArgs{
KeyCount: int(count),
BucketSize: 2000,
Salt: 0,
Salt: 1,
LeafSize: 8,
TmpDir: "",
StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73,
@ -1492,6 +1492,7 @@ func processSuperstring(superstringCh chan []byte, dictCollector *etl.Collector,
}
//log.Info("Kasai algorithm finished")
// Checking LCP array
if ASSERT {
for i := 0; i < n-1; i++ {
var prefixLen int
@ -1502,6 +1503,7 @@ func processSuperstring(superstringCh chan []byte, dictCollector *etl.Collector,
}
if prefixLen != int(lcp[i]) {
log.Error("Mismatch", "prefixLen", prefixLen, "lcp[i]", lcp[i])
break
}
l := int(lcp[i]) // Length of potential dictionary word
if l < 2 {
@ -1511,7 +1513,7 @@ func processSuperstring(superstringCh chan []byte, dictCollector *etl.Collector,
for s := 0; s < l; s++ {
dictKey[s] = superstring[(filtered[i]+s)*2+1]
}
fmt.Printf("%d %d %s\n", filtered[i], lcp[i], dictKey)
//fmt.Printf("%d %d %s\n", filtered[i], lcp[i], dictKey)
}
}
//log.Info("LCP array checked")
@ -1567,73 +1569,6 @@ func processSuperstring(superstringCh chan []byte, dictCollector *etl.Collector,
completion.Done()
}
type DictionaryItem struct {
word []byte
score uint64
}
type DictionaryBuilder struct {
limit int
lastWord []byte
lastWordScore uint64
items []DictionaryItem
}
func (db DictionaryBuilder) Len() int {
return len(db.items)
}
func (db DictionaryBuilder) Less(i, j int) bool {
if db.items[i].score == db.items[j].score {
return bytes.Compare(db.items[i].word, db.items[j].word) < 0
}
return db.items[i].score < db.items[j].score
}
func (db *DictionaryBuilder) Swap(i, j int) {
db.items[i], db.items[j] = db.items[j], db.items[i]
}
func (db *DictionaryBuilder) Push(x interface{}) {
db.items = append(db.items, x.(DictionaryItem))
}
func (db *DictionaryBuilder) Pop() interface{} {
old := db.items
n := len(old)
x := old[n-1]
db.items = old[0 : n-1]
return x
}
func (db *DictionaryBuilder) processWord(word []byte, score uint64) {
heap.Push(db, DictionaryItem{word: word, score: score})
if db.Len() > db.limit {
// Remove the element with smallest score
heap.Pop(db)
}
}
func (db *DictionaryBuilder) compressLoadFunc(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error {
score := binary.BigEndian.Uint64(v)
if bytes.Equal(k, db.lastWord) {
db.lastWordScore += score
} else {
if db.lastWord != nil {
db.processWord(db.lastWord, db.lastWordScore)
}
db.lastWord = common.CopyBytes(k)
db.lastWordScore = score
}
return nil
}
func (db *DictionaryBuilder) finish() {
if db.lastWord != nil {
db.processWord(db.lastWord, db.lastWordScore)
}
}
type DictAggregator struct {
lastWord []byte
lastWordScore uint64
@ -1751,35 +1686,19 @@ func compress1(chaindata string, name string) error {
}
close(ch)
wg.Wait()
dictCollector := etl.NewCollector(CompressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
dictAggregator := &DictAggregator{collector: dictCollector}
for _, collector := range collectors {
if err = collector.Load(nil, "", dictAggregator.aggLoadFunc, etl.TransformArgs{}); err != nil {
return err
}
collector.Close()
}
if err = dictAggregator.finish(); err != nil {
db, err := compress.DictionaryBuilderFromCollectors(context.Background(), CompressLogPrefix, tmpDir, collectors)
if err != nil {
return err
}
db := &DictionaryBuilder{limit: maxDictPatterns} // Only collect 1m words with highest scores
if err = dictCollector.Load(nil, "", db.compressLoadFunc, etl.TransformArgs{}); err != nil {
return err
}
db.finish()
dictCollector.Close()
var df *os.File
df, err = os.Create(name + ".dictionary.txt")
if err != nil {
return err
}
w := bufio.NewWriterSize(df, etl.BufIOSize)
// Sort dictionary builder
sort.Sort(db)
for i := len(db.items); i > 0; i-- {
fmt.Fprintf(w, "%d %x\n", db.items[i-1].score, db.items[i-1].word)
}
db.ForEach(func(score uint64, word []byte) { fmt.Fprintf(w, "%d %x\n", score, word) })
if err = w.Flush(); err != nil {
return err
}
@ -2920,7 +2839,7 @@ RETRY:
if err = rs.Build(); err != nil {
if errors.Is(err, recsplit.ErrCollision) {
log.Info("Building recsplit. Collision happened. It's ok. Restarting...")
log.Info("Building recsplit. Collision happened. It's ok. Restarting...", "err", err)
rs.ResetNextSalt()
goto RETRY
}
@ -4142,7 +4061,11 @@ func main() {
}
defer pprof.StopCPUProfile()
}
exp.Setup("0.0.0.0:6060")
go func() {
if err := http.ListenAndServe("localhost:6060", nil); err != nil {
log.Error("Failure in running pprof server", "err", err)
}
}()
var err error
switch *action {

2
go.mod
View File

@ -35,7 +35,7 @@ require (
github.com/json-iterator/go v1.1.12
github.com/julienschmidt/httprouter v1.3.0
github.com/kevinburke/go-bindata v3.21.0+incompatible
github.com/ledgerwatch/erigon-lib v0.0.0-20211108033615-b426ff2271f0
github.com/ledgerwatch/erigon-lib v0.0.0-20211109030232-5677f0c2bd53
github.com/ledgerwatch/log/v3 v3.4.0
github.com/ledgerwatch/secp256k1 v1.0.0
github.com/logrusorgru/aurora/v3 v3.0.0

4
go.sum
View File

@ -596,8 +596,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20211108033615-b426ff2271f0 h1:uCFtgY4IegzfrNvfAwQNIebBk74I9folhLyyB4lSOjY=
github.com/ledgerwatch/erigon-lib v0.0.0-20211108033615-b426ff2271f0/go.mod h1:CuEZROm43MykZT5CjCj02jw0FOwaDl8Nh+PZkTEGopg=
github.com/ledgerwatch/erigon-lib v0.0.0-20211109030232-5677f0c2bd53 h1:SRxoOSlbv2o1qzU5Cqx59ZVBKh7x8baEF2taRySNubk=
github.com/ledgerwatch/erigon-lib v0.0.0-20211109030232-5677f0c2bd53/go.mod h1:CuEZROm43MykZT5CjCj02jw0FOwaDl8Nh+PZkTEGopg=
github.com/ledgerwatch/log/v3 v3.4.0 h1:SEIOcv5a2zkG3PmoT5jeTU9m/0nEUv0BJS5bzsjwKCI=
github.com/ledgerwatch/log/v3 v3.4.0/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=