erigon-pulse/common/etl/buffers.go
Alex Sharov e02d6acc7d
bitmap indices for logs (#1124)
* save progress

* try now

* don't create bloom inside rlpDecode

* don't create bloom inside ApplyTransaction

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* rename method

* print timings

* print timings

* print timings

* sort before flush

* fix err lint

* clean

* move tests to transactions

* compressed version

* up bound

* up bound

* more tests

* more tests

* more tests

* more tests

* better removal

* clean

* better performance of get/put methods

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* optimize rpcdaemon

* fix test

* fix rpcdaemon

* fix test

* simplify

* simplify

* fix nil pointer

* clean

* revert some changes

* add some logs

* clean

* try without optimize

* clean

* clean

* clean

* clean

* try

* move log_index to own stage

* move log_index to own stage

* integration add log_index stage

* integration add log_index stage

* clean

* clean

* print timing

* remove duplicates at unwind

* extract truncateBitmaps func

* try detect

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* add blackList of topics

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* clean

* sharding 1

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 2

* sharded 3

* sharded 3

* sharded 3

* speedup things by putCurrent and putReserve

* clean

* optimize trim

* clean

* remove blacklist

* add more info to err

* ?

* clean

* clean

* clean

* clean

* clean

* working version

* switch to cgo version of roaring bitmaps

* clean

* clean

* clean

* clean

* more docs

* clean

* clean

* fix logs bloom field

* Fix debug_getModifiedAccountsByNumber

* Try to fix crash

* fix problem with "absent block"

* fix problem with "absent block"

* remove optimize method call

* remove roaring iterator

* fix problem with rebuild indicess

* remove debug prints

* tests for eth_getLogs involving topics

* add tests for new stage, speparate topics into 2 buckets

* version up

* remove debug logs

* remove debug logs

* remove bloom filter implementation

* Optimisation

* Optimisatin not required, make rpctest lenient to geth errors

* Lenient to geth failures

Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-09-28 18:18:36 +01:00

275 lines
6.2 KiB
Go

package etl
import (
"bytes"
"sort"
"strconv"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
)
const (
//SliceBuffer - just simple slice w
SortableSliceBuffer = iota
//SortableAppendBuffer - map[k] [v1 v2 v3]
SortableAppendBuffer
// SortableOldestAppearedBuffer - buffer that keeps only the oldest entries.
// if first v1 was added under key K, then v2; only v1 will stay
SortableOldestAppearedBuffer
BufferOptimalSize = 256 * 1024 * 1024 /* 256 mb | var because we want to sometimes change it from tests */
BufIOSize = 64 * 4096 // 64 pages | default is 1 page | increasing further doesn't show speedup on SSD
)
type Buffer interface {
Put(k, v []byte)
Get(i int) sortableBufferEntry
Len() int
Reset()
GetEntries() []sortableBufferEntry
Sort()
CheckFlushSize() bool
SetComparator(cmp dbutils.CmpFunc)
}
type sortableBufferEntry struct {
key []byte
value []byte
}
var (
_ Buffer = &sortableBuffer{}
_ Buffer = &appendSortableBuffer{}
_ Buffer = &oldestEntrySortableBuffer{}
)
func NewSortableBuffer(bufferOptimalSize int) *sortableBuffer {
return &sortableBuffer{
entries: make([]sortableBufferEntry, 0),
size: 0,
optimalSize: bufferOptimalSize,
}
}
type sortableBuffer struct {
entries []sortableBufferEntry
size int
optimalSize int
comparator dbutils.CmpFunc
}
func (b *sortableBuffer) Put(k, v []byte) {
b.size += len(k)
b.size += len(v)
b.entries = append(b.entries, sortableBufferEntry{k, v})
}
func (b *sortableBuffer) Size() int {
return b.size
}
func (b *sortableBuffer) Len() int {
return len(b.entries)
}
func (b *sortableBuffer) SetComparator(cmp dbutils.CmpFunc) {
b.comparator = cmp
}
func (b *sortableBuffer) Less(i, j int) bool {
if b.comparator != nil {
return b.comparator(b.entries[i].key, b.entries[j].key, b.entries[i].value, b.entries[j].value) < 0
}
return bytes.Compare(b.entries[i].key, b.entries[j].key) < 0
}
func (b *sortableBuffer) Swap(i, j int) {
b.entries[i], b.entries[j] = b.entries[j], b.entries[i]
}
func (b *sortableBuffer) Get(i int) sortableBufferEntry {
return b.entries[i]
}
func (b *sortableBuffer) Reset() {
b.entries = nil
b.size = 0
}
func (b *sortableBuffer) Sort() {
sort.Stable(b)
}
func (b *sortableBuffer) GetEntries() []sortableBufferEntry {
return b.entries
}
func (b *sortableBuffer) CheckFlushSize() bool {
return b.size >= b.optimalSize
}
func NewAppendBuffer(bufferOptimalSize int) *appendSortableBuffer {
return &appendSortableBuffer{
entries: make(map[string][]byte),
size: 0,
optimalSize: bufferOptimalSize,
}
}
type appendSortableBuffer struct {
entries map[string][]byte
size int
optimalSize int
sortedBuf []sortableBufferEntry
comparator dbutils.CmpFunc
}
func (b *appendSortableBuffer) Put(k, v []byte) {
ks := string(k)
stored, ok := b.entries[ks]
if !ok {
b.size += len(k)
}
b.size += len(v)
stored = append(stored, v...)
b.entries[ks] = stored
}
func (b *appendSortableBuffer) SetComparator(cmp dbutils.CmpFunc) {
b.comparator = cmp
}
func (b *appendSortableBuffer) Size() int {
return b.size
}
func (b *appendSortableBuffer) Len() int {
return len(b.entries)
}
func (b *appendSortableBuffer) Sort() {
for i := range b.entries {
b.sortedBuf = append(b.sortedBuf, sortableBufferEntry{key: []byte(i), value: b.entries[i]})
}
sort.Stable(b)
}
func (b *appendSortableBuffer) Less(i, j int) bool {
if b.comparator != nil {
return b.comparator(b.sortedBuf[i].key, b.sortedBuf[j].key, b.sortedBuf[i].value, b.sortedBuf[j].value) < 0
}
return bytes.Compare(b.sortedBuf[i].key, b.sortedBuf[j].key) < 0
}
func (b *appendSortableBuffer) Swap(i, j int) {
b.sortedBuf[i], b.sortedBuf[j] = b.sortedBuf[j], b.sortedBuf[i]
}
func (b *appendSortableBuffer) Get(i int) sortableBufferEntry {
return b.sortedBuf[i]
}
func (b *appendSortableBuffer) Reset() {
b.sortedBuf = nil
b.entries = make(map[string][]byte)
b.size = 0
}
func (b *appendSortableBuffer) GetEntries() []sortableBufferEntry {
return b.sortedBuf
}
func (b *appendSortableBuffer) CheckFlushSize() bool {
return b.size >= b.optimalSize
}
func NewOldestEntryBuffer(bufferOptimalSize int) *oldestEntrySortableBuffer {
return &oldestEntrySortableBuffer{
entries: make(map[string][]byte),
size: 0,
optimalSize: bufferOptimalSize,
}
}
type oldestEntrySortableBuffer struct {
entries map[string][]byte
size int
optimalSize int
sortedBuf []sortableBufferEntry
comparator dbutils.CmpFunc
}
func (b *oldestEntrySortableBuffer) SetComparator(cmp dbutils.CmpFunc) {
b.comparator = cmp
}
func (b *oldestEntrySortableBuffer) Put(k, v []byte) {
ks := string(k)
_, ok := b.entries[ks]
if ok {
// if we already had this entry, we are going to keep it and ignore new value
return
}
b.size += len(k)
b.size += len(v)
if v != nil {
v = common.CopyBytes(v)
}
b.entries[ks] = v
}
func (b *oldestEntrySortableBuffer) Size() int {
return b.size
}
func (b *oldestEntrySortableBuffer) Len() int {
return len(b.entries)
}
func (b *oldestEntrySortableBuffer) Sort() {
for k, v := range b.entries {
b.sortedBuf = append(b.sortedBuf, sortableBufferEntry{key: []byte(k), value: v})
}
sort.Stable(b)
}
func (b *oldestEntrySortableBuffer) Less(i, j int) bool {
if b.comparator != nil {
return b.comparator(b.sortedBuf[i].key, b.sortedBuf[j].key, b.sortedBuf[i].value, b.sortedBuf[j].value) < 0
}
return bytes.Compare(b.sortedBuf[i].key, b.sortedBuf[j].key) < 0
}
func (b *oldestEntrySortableBuffer) Swap(i, j int) {
b.sortedBuf[i], b.sortedBuf[j] = b.sortedBuf[j], b.sortedBuf[i]
}
func (b *oldestEntrySortableBuffer) Get(i int) sortableBufferEntry {
return b.sortedBuf[i]
}
func (b *oldestEntrySortableBuffer) Reset() {
b.sortedBuf = nil
b.entries = make(map[string][]byte)
b.size = 0
}
func (b *oldestEntrySortableBuffer) GetEntries() []sortableBufferEntry {
return b.sortedBuf
}
func (b *oldestEntrySortableBuffer) CheckFlushSize() bool {
return b.size >= b.optimalSize
}
func getBufferByType(tp int, size int) Buffer {
switch tp {
case SortableSliceBuffer:
return NewSortableBuffer(size)
case SortableAppendBuffer:
return NewAppendBuffer(size)
case SortableOldestAppearedBuffer:
return NewOldestEntryBuffer(size)
default:
panic("unknown buffer type " + strconv.Itoa(tp))
}
}