State aggregator (#114)

* State aggregator

* Compile fix

* More

* Add

* More

* More on aggregator

* Writes (still incorrect)

* Move table names

* More

* Start of aggregation

* Change files instead of db

* More on change files

* More

* More

* Dealing with state and change files

* More

* More

* More boilerplate

* More

* More

* Iteration over storage

* More boilerplate

* More fixes

* Insert flag

* More

* Unit test

* Add more to the test

* Expand the test a bit

* More testing

* Keep fixing the test

* More fixes to the test

* Clean up DB tables upon aggregation

* More fixes

* Remove update/insert indicator from returned values

* Add assertions

* close files before deleting

* close files before deleting

* close files before deleting

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
ledgerwatch 2021-11-13 12:12:29 +00:00 committed by GitHub
parent 7faa84fe9f
commit fd19ad8148
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1673 additions and 0 deletions

1500
aggregator/aggregator.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,156 @@
/*
Copyright 2021 Erigon contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregator
import (
"bytes"
"context"
"encoding/binary"
"testing"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
)
func int160(i uint64) []byte {
b := make([]byte, 20)
binary.BigEndian.PutUint64(b[12:], i)
return b
}
func int256(i uint64) []byte {
b := make([]byte, 32)
binary.BigEndian.PutUint64(b[24:], i)
return b
}
func TestSimpleAggregator(t *testing.T) {
tmpDir := t.TempDir()
db := memdb.New()
defer db.Close()
a, err := NewAggregator(tmpDir, 16, 4)
if err != nil {
t.Fatal(err)
}
var rwTx kv.RwTx
if rwTx, err = db.BeginRw(context.Background()); err != nil {
t.Fatal(err)
}
defer rwTx.Rollback()
var w *Writer
if w, err = a.MakeStateWriter(rwTx, 0); err != nil {
t.Fatal(err)
}
var account1 = int256(1)
if err = w.UpdateAccountData(int160(1), account1); err != nil {
t.Fatal(err)
}
if err = w.Finish(); err != nil {
t.Fatal(err)
}
if err = rwTx.Commit(); err != nil {
t.Fatal(err)
}
var tx kv.Tx
if tx, err = db.BeginRo(context.Background()); err != nil {
t.Fatal(err)
}
defer tx.Rollback()
r := a.MakeStateReader(tx, 2)
var acc []byte
if acc, err = r.ReadAccountData(int160(1)); err != nil {
t.Fatal(err)
}
if !bytes.Equal(acc, account1) {
t.Errorf("read account %x, expected account %x", acc, account1)
}
a.Close()
}
func TestLoopAggregator(t *testing.T) {
tmpDir := t.TempDir()
db := memdb.New()
defer db.Close()
a, err := NewAggregator(tmpDir, 16, 4)
if err != nil {
t.Fatal(err)
}
var account1 = int256(1)
var rwTx kv.RwTx
defer func() {
rwTx.Rollback()
}()
var tx kv.Tx
defer func() {
tx.Rollback()
}()
for blockNum := uint64(0); blockNum < 1000; blockNum++ {
accountKey := int160(blockNum/10 + 1)
//fmt.Printf("blockNum = %d\n", blockNum)
if rwTx, err = db.BeginRw(context.Background()); err != nil {
t.Fatal(err)
}
var w *Writer
if w, err = a.MakeStateWriter(rwTx, blockNum); err != nil {
t.Fatal(err)
}
if err = w.UpdateAccountData(accountKey, account1); err != nil {
t.Fatal(err)
}
if err = w.Finish(); err != nil {
t.Fatal(err)
}
if err = rwTx.Commit(); err != nil {
t.Fatal(err)
}
if tx, err = db.BeginRo(context.Background()); err != nil {
t.Fatal(err)
}
r := a.MakeStateReader(tx, blockNum+1)
var acc []byte
if acc, err = r.ReadAccountData(accountKey); err != nil {
t.Fatal(err)
}
tx.Rollback()
if !bytes.Equal(acc, account1) {
t.Errorf("read account %x, expected account %x for block %d", acc, account1, blockNum)
}
account1 = int256(blockNum + 2)
}
if tx, err = db.BeginRo(context.Background()); err != nil {
t.Fatal(err)
}
blockNum := uint64(1000)
r := a.MakeStateReader(tx, blockNum)
for i := uint64(0); i < blockNum/10+1; i++ {
accountKey := int160(i)
var expected []byte
if i > 0 {
expected = int256(i * 10)
}
var acc []byte
if acc, err = r.ReadAccountData(accountKey); err != nil {
t.Fatal(err)
}
if !bytes.Equal(acc, expected) {
t.Errorf("read account %x, expected account %x for block %d", acc, expected, i)
}
}
tx.Rollback()
a.Close()
}

View File

@ -273,6 +273,10 @@ const (
PendingEpoch = "DevPendingEpoch" // block_num_u64+block_hash->transition_proof
Issuance = "Issuance" // block_num_u64->RLP(issuance+burnt[0 if < london])
StateAccounts = "StateAccounts"
StateStorage = "StateStorage"
StateCode = "StateCode"
)
// Keys
@ -349,6 +353,9 @@ var ChaindataTables = []string{
Epoch,
PendingEpoch,
Issuance,
StateAccounts,
StateStorage,
StateCode,
}
const (

View File

@ -163,7 +163,17 @@ func (idx *Index) golombParam(m uint16) int {
return int(idx.golombRice[m] >> 27)
}
func (idx Index) Empty() bool {
return idx.keyCount == 0
}
func (idx Index) Lookup(key []byte) uint64 {
if idx.keyCount == 0 {
panic("no Lookup should be done when keyCount==0, please use Empty function to guard")
}
if idx.keyCount == 1 {
return 0
}
var gr GolombRiceReader
gr.data = idx.grData
idx.hasher.Reset()