mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
remotedbserver: add support for bor snapshots (#9180)
This commit is contained in:
parent
74ec3a9db7
commit
e25b15b00e
@ -27,6 +27,7 @@ require (
|
||||
github.com/edsrzf/mmap-go v1.1.0
|
||||
github.com/go-stack/stack v1.8.1
|
||||
github.com/gofrs/flock v0.8.1
|
||||
github.com/golang/mock v1.6.0
|
||||
github.com/google/btree v1.1.2
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.6
|
||||
|
@ -215,6 +215,8 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
|
||||
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
|
||||
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
@ -464,6 +466,7 @@ github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPyS
|
||||
github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
|
||||
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
|
||||
@ -500,6 +503,7 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl
|
||||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
|
||||
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
|
||||
@ -520,6 +524,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
|
||||
golang.org/x/net v0.0.0-20201201195509-5d6afe98e0b7/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20211201190559-0a0e4e1bb54c/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
@ -542,6 +547,7 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
|
||||
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
@ -565,7 +571,9 @@ golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
@ -611,6 +619,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM=
|
||||
golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
|
||||
|
@ -23,6 +23,12 @@ import (
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/test/bufconn"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/gointerfaces"
|
||||
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
@ -30,11 +36,6 @@ import (
|
||||
"github.com/ledgerwatch/erigon-lib/kv/memdb"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/remotedb"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/remotedbserver"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/test/bufconn"
|
||||
)
|
||||
|
||||
func TestSequence(t *testing.T) {
|
||||
@ -169,7 +170,7 @@ func TestRemoteKvVersion(t *testing.T) {
|
||||
conn := bufconn.Listen(1024 * 1024)
|
||||
grpcServer := grpc.NewServer()
|
||||
go func() {
|
||||
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil, logger))
|
||||
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, logger))
|
||||
if err := grpcServer.Serve(conn); err != nil {
|
||||
log.Error("private RPC server fail", "err", err)
|
||||
}
|
||||
@ -210,7 +211,7 @@ func TestRemoteKvRange(t *testing.T) {
|
||||
ctx, writeDB := context.Background(), memdb.NewTestDB(t)
|
||||
grpcServer, conn := grpc.NewServer(), bufconn.Listen(1024*1024)
|
||||
go func() {
|
||||
kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil, logger)
|
||||
kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, logger)
|
||||
remote.RegisterKVServer(grpcServer, kvServer)
|
||||
if err := grpcServer.Serve(conn); err != nil {
|
||||
log.Error("private RPC server fail", "err", err)
|
||||
@ -344,7 +345,7 @@ func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (write
|
||||
|
||||
grpcServer := grpc.NewServer()
|
||||
f2 := func() {
|
||||
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil, logger))
|
||||
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil, nil, logger))
|
||||
if err := grpcServer.Serve(conn); err != nil {
|
||||
logger.Error("private RPC server fail", "err", err)
|
||||
}
|
||||
|
48
erigon-lib/kv/remotedbserver/mock/snapshots_mock.go
Normal file
48
erigon-lib/kv/remotedbserver/mock/snapshots_mock.go
Normal file
@ -0,0 +1,48 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/ledgerwatch/erigon-lib/kv/remotedbserver (interfaces: Snapshots)
|
||||
|
||||
// Package mock is a generated GoMock package.
|
||||
package mock
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
)
|
||||
|
||||
// MockSnapshots is a mock of Snapshots interface.
|
||||
type MockSnapshots struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockSnapshotsMockRecorder
|
||||
}
|
||||
|
||||
// MockSnapshotsMockRecorder is the mock recorder for MockSnapshots.
|
||||
type MockSnapshotsMockRecorder struct {
|
||||
mock *MockSnapshots
|
||||
}
|
||||
|
||||
// NewMockSnapshots creates a new mock instance.
|
||||
func NewMockSnapshots(ctrl *gomock.Controller) *MockSnapshots {
|
||||
mock := &MockSnapshots{ctrl: ctrl}
|
||||
mock.recorder = &MockSnapshotsMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockSnapshots) EXPECT() *MockSnapshotsMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// Files mocks base method.
|
||||
func (m *MockSnapshots) Files() []string {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Files")
|
||||
ret0, _ := ret[0].([]string)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Files indicates an expected call of Files.
|
||||
func (mr *MockSnapshotsMockRecorder) Files() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Files", reflect.TypeOf((*MockSnapshots)(nil).Files))
|
||||
}
|
@ -71,8 +71,9 @@ type KvServer struct {
|
||||
|
||||
kv kv.RoDB
|
||||
stateChangeStreams *StateChangePubSub
|
||||
blockSnapshots Snapsthots
|
||||
historySnapshots Snapsthots
|
||||
blockSnapshots Snapshots
|
||||
borSnapshots Snapshots
|
||||
historySnapshots Snapshots
|
||||
ctx context.Context
|
||||
|
||||
//v3 fields
|
||||
@ -90,18 +91,24 @@ type threadSafeTx struct {
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
type Snapsthots interface {
|
||||
//go:generate mockgen -destination=./mock/snapshots_mock.go -package=mock . Snapshots
|
||||
type Snapshots interface {
|
||||
Files() []string
|
||||
}
|
||||
|
||||
func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapsthots, historySnapshots Snapsthots, logger log.Logger) *KvServer {
|
||||
func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapshots Snapshots, historySnapshots Snapshots, logger log.Logger) *KvServer {
|
||||
return &KvServer{
|
||||
trace: false,
|
||||
rangeStep: 1024,
|
||||
kv: db, stateChangeStreams: newStateChangeStreams(), ctx: ctx,
|
||||
blockSnapshots: snapshots, historySnapshots: historySnapshots,
|
||||
txs: map[uint64]*threadSafeTx{}, txsMapLock: &sync.RWMutex{},
|
||||
logger: logger,
|
||||
trace: false,
|
||||
rangeStep: 1024,
|
||||
kv: db,
|
||||
stateChangeStreams: newStateChangeStreams(),
|
||||
ctx: ctx,
|
||||
blockSnapshots: snapshots,
|
||||
borSnapshots: borSnapshots,
|
||||
historySnapshots: historySnapshots,
|
||||
txs: map[uint64]*threadSafeTx{},
|
||||
txsMapLock: &sync.RWMutex{},
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
@ -430,7 +437,7 @@ func bytesCopy(b []byte) []byte {
|
||||
return copiedBytes
|
||||
}
|
||||
|
||||
func (s *KvServer) StateChanges(req *remote.StateChangeRequest, server remote.KV_StateChangesServer) error {
|
||||
func (s *KvServer) StateChanges(_ *remote.StateChangeRequest, server remote.KV_StateChangesServer) error {
|
||||
ch, remove := s.stateChangeStreams.Sub()
|
||||
defer remove()
|
||||
for {
|
||||
@ -447,16 +454,21 @@ func (s *KvServer) StateChanges(req *remote.StateChangeRequest, server remote.KV
|
||||
}
|
||||
}
|
||||
|
||||
func (s *KvServer) SendStateChanges(ctx context.Context, sc *remote.StateChangeBatch) {
|
||||
func (s *KvServer) SendStateChanges(_ context.Context, sc *remote.StateChangeBatch) {
|
||||
s.stateChangeStreams.Pub(sc)
|
||||
}
|
||||
|
||||
func (s *KvServer) Snapshots(ctx context.Context, _ *remote.SnapshotsRequest) (*remote.SnapshotsReply, error) {
|
||||
func (s *KvServer) Snapshots(_ context.Context, _ *remote.SnapshotsRequest) (*remote.SnapshotsReply, error) {
|
||||
if s.blockSnapshots == nil || reflect.ValueOf(s.blockSnapshots).IsNil() { // nolint
|
||||
return &remote.SnapshotsReply{BlocksFiles: []string{}, HistoryFiles: []string{}}, nil
|
||||
}
|
||||
|
||||
return &remote.SnapshotsReply{BlocksFiles: s.blockSnapshots.Files(), HistoryFiles: s.historySnapshots.Files()}, nil
|
||||
blockFiles := s.blockSnapshots.Files()
|
||||
if s.borSnapshots != nil {
|
||||
blockFiles = append(blockFiles, s.borSnapshots.Files()...)
|
||||
}
|
||||
|
||||
return &remote.SnapshotsReply{BlocksFiles: blockFiles, HistoryFiles: s.historySnapshots.Files()}, nil
|
||||
}
|
||||
|
||||
type StateChangePubSub struct {
|
||||
@ -507,8 +519,11 @@ func (s *StateChangePubSub) remove(id uint) {
|
||||
delete(s.chans, id)
|
||||
}
|
||||
|
||||
//
|
||||
// Temporal methods
|
||||
func (s *KvServer) DomainGet(ctx context.Context, req *remote.DomainGetReq) (reply *remote.DomainGetReply, err error) {
|
||||
//
|
||||
|
||||
func (s *KvServer) DomainGet(_ context.Context, req *remote.DomainGetReq) (reply *remote.DomainGetReply, err error) {
|
||||
reply = &remote.DomainGetReply{}
|
||||
if err := s.with(req.TxId, func(tx kv.Tx) error {
|
||||
ttx, ok := tx.(kv.TemporalTx)
|
||||
@ -532,7 +547,7 @@ func (s *KvServer) DomainGet(ctx context.Context, req *remote.DomainGetReq) (rep
|
||||
}
|
||||
return reply, nil
|
||||
}
|
||||
func (s *KvServer) HistoryGet(ctx context.Context, req *remote.HistoryGetReq) (reply *remote.HistoryGetReply, err error) {
|
||||
func (s *KvServer) HistoryGet(_ context.Context, req *remote.HistoryGetReq) (reply *remote.HistoryGetReply, err error) {
|
||||
reply = &remote.HistoryGetReply{}
|
||||
if err := s.with(req.TxId, func(tx kv.Tx) error {
|
||||
ttx, ok := tx.(kv.TemporalTx)
|
||||
@ -552,7 +567,7 @@ func (s *KvServer) HistoryGet(ctx context.Context, req *remote.HistoryGetReq) (r
|
||||
|
||||
const PageSizeLimit = 4 * 4096
|
||||
|
||||
func (s *KvServer) IndexRange(ctx context.Context, req *remote.IndexRangeReq) (*remote.IndexRangeReply, error) {
|
||||
func (s *KvServer) IndexRange(_ context.Context, req *remote.IndexRangeReq) (*remote.IndexRangeReply, error) {
|
||||
reply := &remote.IndexRangeReply{}
|
||||
from, limit := int(req.FromTs), int(req.Limit)
|
||||
if req.PageToken != "" {
|
||||
@ -600,7 +615,7 @@ func (s *KvServer) IndexRange(ctx context.Context, req *remote.IndexRangeReq) (*
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
func (s *KvServer) Range(ctx context.Context, req *remote.RangeReq) (*remote.Pairs, error) {
|
||||
func (s *KvServer) Range(_ context.Context, req *remote.RangeReq) (*remote.Pairs, error) {
|
||||
from, limit := req.FromPrefix, int(req.Limit)
|
||||
if req.PageToken != "" {
|
||||
var pagination remote.ParisPagination
|
||||
|
@ -21,14 +21,18 @@ import (
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/memdb"
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/memdb"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/remotedbserver/mock"
|
||||
)
|
||||
|
||||
func TestKvServer_renew(t *testing.T) {
|
||||
//goland:noinspection GoBoolExpressions
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("fix me on win please")
|
||||
}
|
||||
@ -44,7 +48,7 @@ func TestKvServer_renew(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
|
||||
s := NewKvServer(ctx, db, nil, nil, log.New())
|
||||
s := NewKvServer(ctx, db, nil, nil, nil, log.New())
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
testCase := func() error {
|
||||
id, err := s.begin(ctx)
|
||||
@ -95,3 +99,44 @@ func TestKvServer_renew(t *testing.T) {
|
||||
}
|
||||
require.NoError(g.Wait())
|
||||
}
|
||||
|
||||
func TestKVServerSnapshotsReturnsSnapshots(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctrl := gomock.NewController(t)
|
||||
blockSnapshots := mock.NewMockSnapshots(ctrl)
|
||||
blockSnapshots.EXPECT().Files().Return([]string{"headers.seg", "bodies.seg"}).Times(1)
|
||||
historySnapshots := mock.NewMockSnapshots(ctrl)
|
||||
historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1)
|
||||
|
||||
s := NewKvServer(ctx, nil, blockSnapshots, nil, historySnapshots, log.New())
|
||||
reply, err := s.Snapshots(ctx, nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []string{"headers.seg", "bodies.seg"}, reply.BlocksFiles)
|
||||
require.Equal(t, []string{"history"}, reply.HistoryFiles)
|
||||
}
|
||||
|
||||
func TestKVServerSnapshotsReturnsBorSnapshots(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctrl := gomock.NewController(t)
|
||||
blockSnapshots := mock.NewMockSnapshots(ctrl)
|
||||
blockSnapshots.EXPECT().Files().Return([]string{"headers.seg", "bodies.seg"}).Times(1)
|
||||
borSnapshots := mock.NewMockSnapshots(ctrl)
|
||||
borSnapshots.EXPECT().Files().Return([]string{"borevents.seg", "borspans.seg"}).Times(1)
|
||||
historySnapshots := mock.NewMockSnapshots(ctrl)
|
||||
historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1)
|
||||
|
||||
s := NewKvServer(ctx, nil, blockSnapshots, borSnapshots, historySnapshots, log.New())
|
||||
reply, err := s.Snapshots(ctx, nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []string{"headers.seg", "bodies.seg", "borevents.seg", "borspans.seg"}, reply.BlocksFiles)
|
||||
require.Equal(t, []string{"history"}, reply.HistoryFiles)
|
||||
}
|
||||
|
||||
func TestKVServerSnapshotsReturnsEmptyIfNoBlockSnapshots(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
s := NewKvServer(ctx, nil, nil, nil, nil, log.New())
|
||||
reply, err := s.Snapshots(ctx, nil)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, reply.BlocksFiles)
|
||||
require.Empty(t, reply.HistoryFiles)
|
||||
}
|
@ -17,6 +17,7 @@ package tools
|
||||
// build tag 'trick_go_mod_tidy' - is used to hide warnings of IDEA (because we can't import `main` packages in go)
|
||||
|
||||
import (
|
||||
_ "github.com/golang/mock/mockgen/model"
|
||||
_ "github.com/ledgerwatch/interfaces"
|
||||
_ "github.com/ledgerwatch/interfaces/downloader"
|
||||
_ "github.com/ledgerwatch/interfaces/execution"
|
||||
|
@ -321,7 +321,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
|
||||
|
||||
// Check if we have an already initialized chain and fall back to
|
||||
// that if so. Otherwise we need to generate a new genesis spec.
|
||||
blockReader, blockWriter, allSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, snapshotVersion, config.Snapshot, config.HistoryV3, chainConfig.Bor != nil, logger)
|
||||
blockReader, blockWriter, allSnapshots, allBorSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, snapshotVersion, config.Snapshot, config.HistoryV3, chainConfig.Bor != nil, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -339,7 +339,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, agg, logger)
|
||||
kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, agg, logger)
|
||||
backend.notifications.StateChangesConsumer = kvRPC
|
||||
backend.kvRPC = kvRPC
|
||||
|
||||
@ -1242,7 +1242,7 @@ func (s *Ethereum) setUpSnapDownloader(ctx context.Context, downloaderCfg *downl
|
||||
return err
|
||||
}
|
||||
|
||||
func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snashotVersion uint8, snConfig ethconfig.BlocksFreezing, histV3 bool, isBor bool, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *libstate.AggregatorV3, error) {
|
||||
func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snashotVersion uint8, snConfig ethconfig.BlocksFreezing, histV3 bool, isBor bool, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter, *freezeblocks.RoSnapshots, *freezeblocks.BorRoSnapshots, *libstate.AggregatorV3, error) {
|
||||
allSnapshots := freezeblocks.NewRoSnapshots(snConfig, dirs.Snap, snashotVersion, logger)
|
||||
|
||||
var allBorSnapshots *freezeblocks.BorRoSnapshots
|
||||
@ -1267,12 +1267,12 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snasho
|
||||
|
||||
agg, err := libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, db, logger)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, err
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
if err = agg.OpenFolder(); err != nil {
|
||||
return nil, nil, nil, nil, err
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
return blockReader, blockWriter, allSnapshots, agg, nil
|
||||
return blockReader, blockWriter, allSnapshots, allBorSnapshots, agg, nil
|
||||
}
|
||||
|
||||
func (s *Ethereum) Peers(ctx context.Context) (*remote.PeersReply, error) {
|
||||
|
@ -34,7 +34,6 @@ import (
|
||||
"github.com/ledgerwatch/erigon-lib/txpool/txpoolcfg"
|
||||
types2 "github.com/ledgerwatch/erigon-lib/types"
|
||||
"github.com/ledgerwatch/erigon-lib/wrap"
|
||||
|
||||
"github.com/ledgerwatch/erigon/consensus"
|
||||
"github.com/ledgerwatch/erigon/consensus/bor"
|
||||
"github.com/ledgerwatch/erigon/consensus/ethash"
|
||||
@ -260,7 +259,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
|
||||
histV3, db, agg := temporal.NewTestDB(nil, dirs, nil)
|
||||
cfg.HistoryV3 = histV3
|
||||
|
||||
erigonGrpcServeer := remotedbserver.NewKvServer(ctx, db, nil, nil, logger)
|
||||
erigonGrpcServeer := remotedbserver.NewKvServer(ctx, db, nil, nil, nil, logger)
|
||||
allSnapshots := freezeblocks.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, 1, logger)
|
||||
allBorSnapshots := freezeblocks.NewBorRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, 1, logger)
|
||||
mock := &MockSentry{
|
||||
|
Loading…
Reference in New Issue
Block a user