erigon-pulse/kv/kvcache/cache_test.go
2021-09-10 14:18:00 +07:00

153 lines
4.1 KiB
Go

package kvcache
import (
"context"
"encoding/binary"
"fmt"
"sync"
"testing"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/stretchr/testify/require"
)
func bigEndian(n uint64) []byte {
num := [8]byte{}
binary.BigEndian.PutUint64(num[:], n)
return num[:]
}
func TestAPI(t *testing.T) {
require := require.New(t)
c := New()
k1, k2 := [20]byte{1}, [20]byte{2}
db := memdb.NewTestDB(t)
get := func(key [20]byte) (res [10]chan []byte) {
wg := sync.WaitGroup{}
for i := 0; i < len(res); i++ {
wg.Add(1)
res[i] = make(chan []byte)
go func(out chan []byte) {
require.NoError(db.View(context.Background(), func(tx kv.Tx) error {
wg.Done()
cache, err := c.View(tx)
if err != nil {
panic(err)
}
v, err := cache.Get(key[:], tx)
if err != nil {
panic(err)
}
out <- copyBytes(v)
return nil
}))
}(res[i])
}
wg.Wait() // ensure that all goroutines started their transactions
return res
}
put := func(blockNum uint64, blockHash [32]byte, k, v []byte) {
require.NoError(db.Update(context.Background(), func(tx kv.RwTx) error {
_ = tx.Put(kv.SyncStageProgress, []byte("Finish"), bigEndian(blockNum))
_ = tx.Put(kv.HeaderCanonical, bigEndian(blockNum), blockHash[:])
_ = tx.Put(kv.PlainState, k, v)
return nil
}))
}
// block 1 - represents existing state (no notifications about this data will come to client)
put(1, [32]byte{}, k2[:], []byte{42})
res1, res2 := get(k1), get(k2) // will return immediately
for i := range res1 {
require.Nil(<-res1[i])
}
for i := range res2 {
require.Equal([]byte{42}, <-res2[i])
}
put(2, [32]byte{}, k1[:], []byte{2})
fmt.Printf("-----1\n")
res3, res4 := get(k1), get(k2) // will see View of transaction 2
put(3, [32]byte{}, k1[:], []byte{3}) // even if core already on block 3
c.OnNewBlock(&remote.StateChange{
Direction: remote.Direction_FORWARD,
BlockHeight: 2,
BlockHash: gointerfaces.ConvertHashToH256([32]byte{}),
Changes: []*remote.AccountChange{{
Action: remote.Action_UPSERT,
Address: gointerfaces.ConvertAddressToH160(k1),
Data: []byte{1},
}},
})
for i := range res3 {
require.Equal([]byte{2}, <-res3[i])
}
for i := range res4 {
require.Equal([]byte{42}, <-res4[i])
}
fmt.Printf("-----2\n")
res5, res6 := get(k1), get(k2) // will see View of transaction 3, even if notification has not enough changes
c.OnNewBlock(&remote.StateChange{
Direction: remote.Direction_FORWARD,
BlockHeight: 3,
BlockHash: gointerfaces.ConvertHashToH256([32]byte{}),
Changes: []*remote.AccountChange{{
Action: remote.Action_UPSERT,
Address: gointerfaces.ConvertAddressToH160(k1),
Data: []byte{3},
}},
})
fmt.Printf("-----20\n")
for i := range res5 {
require.Equal([]byte{3}, <-res5[i])
}
fmt.Printf("-----21\n")
for i := range res6 {
require.Equal([]byte{42}, <-res6[i])
}
fmt.Printf("-----3\n")
put(2, [32]byte{}, k1[:], []byte{2})
c.OnNewBlock(&remote.StateChange{
Direction: remote.Direction_UNWIND,
BlockHeight: 2,
BlockHash: gointerfaces.ConvertHashToH256([32]byte{}),
Changes: []*remote.AccountChange{{
Action: remote.Action_UPSERT,
Address: gointerfaces.ConvertAddressToH160(k1),
Data: []byte{2},
}},
})
fmt.Printf("-----4\n")
put(3, [32]byte{2}, k1[:], []byte{4}) // reorg to new chain
c.OnNewBlock(&remote.StateChange{
Direction: remote.Direction_FORWARD,
BlockHeight: 3,
BlockHash: gointerfaces.ConvertHashToH256([32]byte{2}),
Changes: []*remote.AccountChange{{
Action: remote.Action_UPSERT,
Address: gointerfaces.ConvertAddressToH160(k1),
Data: []byte{4},
}},
})
fmt.Printf("-----5\n")
res7, res8 := get(k1), get(k2) // will see View of transaction 3, even if notification has not enough changes
for i := range res7 {
require.Equal([]byte{4}, <-res7[i])
}
for i := range res8 {
require.Equal([]byte{42}, <-res8[i])
}
_ = db.View(context.Background(), func(tx kv.Tx) error {
require.NoError(AssertCheckValues(tx, c))
return nil
})
}