Merge pull request #32 from ledgerwatch/remote_db_no_grpc

Remote grpc connection code from remotedb
This commit is contained in:
Alex Sharov 2021-08-15 15:31:09 +07:00 committed by GitHub
commit 5244e51fc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 127 deletions

View File

@ -19,6 +19,7 @@ package mdbx_test
import (
"context"
"fmt"
"net"
"runtime"
"testing"
@ -179,7 +180,10 @@ func TestRemoteKvVersion(t *testing.T) {
// Different Major versions
v1 := v
v1.Major++
a, err := remotedb.NewRemote(v1, logger).InMem(conn).Open("", "", "")
cc, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) { return conn.Dial() }))
assert.NoError(t, err)
a, err := remotedb.NewRemote(v1, logger, remote.NewKVClient(cc)).Open()
if err != nil {
t.Fatalf("%v", err)
}
@ -187,7 +191,7 @@ func TestRemoteKvVersion(t *testing.T) {
// Different Minor versions
v2 := v
v2.Minor++
_, err = remotedb.NewRemote(v2, logger).InMem(conn).Open("", "", "")
a, err = remotedb.NewRemote(v2, logger, remote.NewKVClient(cc)).Open()
if err != nil {
t.Fatalf("%v", err)
}
@ -195,11 +199,11 @@ func TestRemoteKvVersion(t *testing.T) {
// Different Patch versions
v3 := v
v3.Patch++
_, err = remotedb.NewRemote(v3, logger).InMem(conn).Open("", "", "")
a, err = remotedb.NewRemote(v3, logger, remote.NewKVClient(cc)).Open()
if err != nil {
t.Fatalf("%v", err)
}
require.False(t, a.EnsureVersionCompatibility())
require.True(t, a.EnsureVersionCompatibility())
}
func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (writeDBs []kv.RwDB, readDBs []kv.RwDB) {
@ -219,7 +223,10 @@ func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (write
}
go f2()
v := gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion)
rdb := remotedb.NewRemote(v, logger).InMem(conn).MustOpen()
cc, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) { return conn.Dial() }))
assert.NoError(t, err)
rdb, err := remotedb.NewRemote(v, logger, remote.NewKVClient(cc)).Open()
assert.NoError(t, err)
readDBs = []kv.RwDB{
writeDBs[0],
writeDBs[1],

View File

@ -1,58 +1,31 @@
/*
Copyright 2021 Erigon contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotedb
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"time"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/types/known/emptypb"
)
// generate the messages and services
type remoteOpts struct {
bucketsCfg mdbx.TableCfgFunc
inMemConn *bufconn.Listener // for tests
DialAddress string
version gointerfaces.Version
remoteKV remote.KVClient
log log.Logger
}
type RemoteKV struct {
conn *grpc.ClientConn
remoteKV remote.KVClient
log log.Logger
buckets kv.TableCfg
@ -86,89 +59,16 @@ func (opts remoteOpts) ReadOnly() remoteOpts {
return opts
}
func (opts remoteOpts) Path(path string) remoteOpts {
opts.DialAddress = path
return opts
}
func (opts remoteOpts) WithBucketsConfig(f mdbx.TableCfgFunc) remoteOpts {
opts.bucketsCfg = f
return opts
}
func (opts remoteOpts) InMem(listener *bufconn.Listener) remoteOpts {
opts.inMemConn = listener
return opts
}
func (opts remoteOpts) Open(certFile, keyFile, caCert string) (*RemoteKV, error) {
var dialOpts []grpc.DialOption
backoffCfg := backoff.DefaultConfig
backoffCfg.BaseDelay = 500 * time.Millisecond
backoffCfg.MaxDelay = 10 * time.Second
dialOpts = []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoffCfg, MinConnectTimeout: 10 * time.Minute}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(15 * datasize.MB))),
grpc.WithKeepaliveParams(keepalive.ClientParameters{}),
}
if certFile == "" {
dialOpts = append(dialOpts, grpc.WithInsecure())
} else {
var creds credentials.TransportCredentials
var err error
if caCert == "" {
creds, err = credentials.NewClientTLSFromFile(certFile, "")
if err != nil {
return nil, err
}
} else {
// load peer cert/key, ca cert
peerCert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
opts.log.Error("load peer cert/key: %s", err)
return nil, err
}
caCert, err := ioutil.ReadFile(caCert)
if err != nil {
opts.log.Error("read ca cert: %s", err)
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
creds = credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{peerCert},
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
//nolint:gosec
InsecureSkipVerify: true, // This is to make it work when Common Name does not match - remove when procedure is updated for common name
})
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
}
if opts.inMemConn != nil {
dialOpts = append(dialOpts, grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) {
return opts.inMemConn.Dial()
}))
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, opts.DialAddress, dialOpts...)
if err != nil {
return nil, err
}
kvClient := remote.NewKVClient(conn)
func (opts remoteOpts) Open() (*RemoteKV, error) {
db := &RemoteKV{
opts: opts,
conn: conn,
remoteKV: kvClient,
log: opts.log,
remoteKV: opts.remoteKV,
log: log.New("remote_db", opts.DialAddress),
buckets: kv.TableCfg{},
}
customBuckets := opts.bucketsCfg(kv.ChaindataTablesCfg)
@ -180,7 +80,7 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (*RemoteKV, error)
}
func (opts remoteOpts) MustOpen() kv.RwDB {
db, err := opts.Open("", "", "")
db, err := opts.Open()
if err != nil {
panic(err)
}
@ -190,22 +90,18 @@ func (opts remoteOpts) MustOpen() kv.RwDB {
// NewRemote defines new remove KV connection (without actually opening it)
// version parameters represent the version the KV client is expecting,
// compatibility check will be performed when the KV connection opens
func NewRemote(v gointerfaces.Version, logger log.Logger) remoteOpts {
return remoteOpts{bucketsCfg: mdbx.WithChaindataTables, version: v, log: logger}
func NewRemote(v gointerfaces.Version, logger log.Logger, remoteKV remote.KVClient) remoteOpts {
return remoteOpts{bucketsCfg: mdbx.WithChaindataTables, version: v, log: logger, remoteKV: remoteKV}
}
func (db *RemoteKV) AllBuckets() kv.TableCfg {
return db.buckets
}
func (db *RemoteKV) GrpcConn() *grpc.ClientConn {
return db.conn
}
func (db *RemoteKV) EnsureVersionCompatibility() bool {
versionReply, err := db.remoteKV.Version(context.Background(), &emptypb.Empty{}, grpc.WaitForReady(true))
if err != nil {
db.log.Error("getting Version", "err", err)
db.log.Error("getting Version", "error", err)
return false
}
if !gointerfaces.EnsureVersion(db.opts.version, versionReply) {
@ -219,14 +115,6 @@ func (db *RemoteKV) EnsureVersionCompatibility() bool {
}
func (db *RemoteKV) Close() {
if db.conn != nil {
if err := db.conn.Close(); err != nil {
db.log.Warn("close remote DB", "err", err)
} else {
db.log.Info("remote database closed")
}
db.conn = nil
}
}
func (db *RemoteKV) BeginRo(ctx context.Context) (kv.Tx, error) {
@ -598,12 +486,12 @@ func (tx *remoteTx) closeGrpcStream() {
err := tx.stream.CloseSend()
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
tx.db.log.Warn("couldn't send msg CloseSend to server", "err", err)
log.Warn("couldn't send msg CloseSend to server", "err", err)
}
} else {
_, err = tx.stream.Recv()
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
tx.db.log.Warn("received unexpected error from server after CloseSend", "err", err)
log.Warn("received unexpected error from server after CloseSend", "err", err)
}
}
}