remove grpc from remote db

This commit is contained in:
alex.sharov 2021-08-15 11:58:07 +07:00
parent 99d57caee2
commit d8973e89cc
2 changed files with 21 additions and 127 deletions

View File

@ -19,6 +19,7 @@ package mdbx_test
import ( import (
"context" "context"
"fmt" "fmt"
"net"
"runtime" "runtime"
"testing" "testing"
@ -179,7 +180,10 @@ func TestRemoteKvVersion(t *testing.T) {
// Different Major versions // Different Major versions
v1 := v v1 := v
v1.Major++ v1.Major++
a, err := remotedb.NewRemote(v1, logger).InMem(conn).Open("", "", "")
cc, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) { return conn.Dial() }))
assert.NoError(t, err)
a, err := remotedb.NewRemote(v1, logger, remote.NewKVClient(cc)).Open()
if err != nil { if err != nil {
t.Fatalf("%v", err) t.Fatalf("%v", err)
} }
@ -187,7 +191,7 @@ func TestRemoteKvVersion(t *testing.T) {
// Different Minor versions // Different Minor versions
v2 := v v2 := v
v2.Minor++ v2.Minor++
_, err = remotedb.NewRemote(v2, logger).InMem(conn).Open("", "", "") a, err = remotedb.NewRemote(v2, logger, remote.NewKVClient(cc)).Open()
if err != nil { if err != nil {
t.Fatalf("%v", err) t.Fatalf("%v", err)
} }
@ -195,11 +199,11 @@ func TestRemoteKvVersion(t *testing.T) {
// Different Patch versions // Different Patch versions
v3 := v v3 := v
v3.Patch++ v3.Patch++
_, err = remotedb.NewRemote(v3, logger).InMem(conn).Open("", "", "") a, err = remotedb.NewRemote(v3, logger, remote.NewKVClient(cc)).Open()
if err != nil { if err != nil {
t.Fatalf("%v", err) t.Fatalf("%v", err)
} }
require.False(t, a.EnsureVersionCompatibility()) require.True(t, a.EnsureVersionCompatibility())
} }
func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (writeDBs []kv.RwDB, readDBs []kv.RwDB) { func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (writeDBs []kv.RwDB, readDBs []kv.RwDB) {
@ -219,7 +223,9 @@ func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (write
} }
go f2() go f2()
v := gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion) v := gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion)
rdb := remotedb.NewRemote(v, logger).InMem(conn).MustOpen() cc, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) { return conn.Dial() }))
assert.NoError(t, err)
rdb, err := remotedb.NewRemote(v, logger, remote.NewKVClient(cc)).Open()
readDBs = []kv.RwDB{ readDBs = []kv.RwDB{
writeDBs[0], writeDBs[0],
writeDBs[1], writeDBs[1],

View File

@ -1,58 +1,31 @@
/*
Copyright 2021 Erigon contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotedb package remotedb
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/tls"
"crypto/x509"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net"
"time"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/log/v3"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
) )
// generate the messages and services // generate the messages and services
type remoteOpts struct { type remoteOpts struct {
bucketsCfg mdbx.TableCfgFunc bucketsCfg mdbx.TableCfgFunc
inMemConn *bufconn.Listener // for tests
DialAddress string DialAddress string
version gointerfaces.Version version gointerfaces.Version
remoteKV remote.KVClient
log log.Logger log log.Logger
} }
type RemoteKV struct { type RemoteKV struct {
conn *grpc.ClientConn
remoteKV remote.KVClient remoteKV remote.KVClient
log log.Logger log log.Logger
buckets kv.TableCfg buckets kv.TableCfg
@ -86,89 +59,16 @@ func (opts remoteOpts) ReadOnly() remoteOpts {
return opts return opts
} }
func (opts remoteOpts) Path(path string) remoteOpts {
opts.DialAddress = path
return opts
}
func (opts remoteOpts) WithBucketsConfig(f mdbx.TableCfgFunc) remoteOpts { func (opts remoteOpts) WithBucketsConfig(f mdbx.TableCfgFunc) remoteOpts {
opts.bucketsCfg = f opts.bucketsCfg = f
return opts return opts
} }
func (opts remoteOpts) InMem(listener *bufconn.Listener) remoteOpts { func (opts remoteOpts) Open() (*RemoteKV, error) {
opts.inMemConn = listener
return opts
}
func (opts remoteOpts) Open(certFile, keyFile, caCert string) (*RemoteKV, error) {
var dialOpts []grpc.DialOption
backoffCfg := backoff.DefaultConfig
backoffCfg.BaseDelay = 500 * time.Millisecond
backoffCfg.MaxDelay = 10 * time.Second
dialOpts = []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoffCfg, MinConnectTimeout: 10 * time.Minute}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(15 * datasize.MB))),
grpc.WithKeepaliveParams(keepalive.ClientParameters{}),
}
if certFile == "" {
dialOpts = append(dialOpts, grpc.WithInsecure())
} else {
var creds credentials.TransportCredentials
var err error
if caCert == "" {
creds, err = credentials.NewClientTLSFromFile(certFile, "")
if err != nil {
return nil, err
}
} else {
// load peer cert/key, ca cert
peerCert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
opts.log.Error("load peer cert/key: %s", err)
return nil, err
}
caCert, err := ioutil.ReadFile(caCert)
if err != nil {
opts.log.Error("read ca cert: %s", err)
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
creds = credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{peerCert},
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
//nolint:gosec
InsecureSkipVerify: true, // This is to make it work when Common Name does not match - remove when procedure is updated for common name
})
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
}
if opts.inMemConn != nil {
dialOpts = append(dialOpts, grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) {
return opts.inMemConn.Dial()
}))
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, opts.DialAddress, dialOpts...)
if err != nil {
return nil, err
}
kvClient := remote.NewKVClient(conn)
db := &RemoteKV{ db := &RemoteKV{
opts: opts, opts: opts,
conn: conn, remoteKV: opts.remoteKV,
remoteKV: kvClient, log: log.New("remote_db", opts.DialAddress),
log: opts.log,
buckets: kv.TableCfg{}, buckets: kv.TableCfg{},
} }
customBuckets := opts.bucketsCfg(kv.ChaindataTablesCfg) customBuckets := opts.bucketsCfg(kv.ChaindataTablesCfg)
@ -180,7 +80,7 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (*RemoteKV, error)
} }
func (opts remoteOpts) MustOpen() kv.RwDB { func (opts remoteOpts) MustOpen() kv.RwDB {
db, err := opts.Open("", "", "") db, err := opts.Open()
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -190,22 +90,18 @@ func (opts remoteOpts) MustOpen() kv.RwDB {
// NewRemote defines new remove KV connection (without actually opening it) // NewRemote defines new remove KV connection (without actually opening it)
// version parameters represent the version the KV client is expecting, // version parameters represent the version the KV client is expecting,
// compatibility check will be performed when the KV connection opens // compatibility check will be performed when the KV connection opens
func NewRemote(v gointerfaces.Version, logger log.Logger) remoteOpts { func NewRemote(v gointerfaces.Version, logger log.Logger, remoteKV remote.KVClient) remoteOpts {
return remoteOpts{bucketsCfg: mdbx.WithChaindataTables, version: v, log: logger} return remoteOpts{bucketsCfg: mdbx.WithChaindataTables, version: v, log: logger, remoteKV: remoteKV}
} }
func (db *RemoteKV) AllBuckets() kv.TableCfg { func (db *RemoteKV) AllBuckets() kv.TableCfg {
return db.buckets return db.buckets
} }
func (db *RemoteKV) GrpcConn() *grpc.ClientConn {
return db.conn
}
func (db *RemoteKV) EnsureVersionCompatibility() bool { func (db *RemoteKV) EnsureVersionCompatibility() bool {
versionReply, err := db.remoteKV.Version(context.Background(), &emptypb.Empty{}, grpc.WaitForReady(true)) versionReply, err := db.remoteKV.Version(context.Background(), &emptypb.Empty{}, grpc.WaitForReady(true))
if err != nil { if err != nil {
db.log.Error("getting Version", "err", err) db.log.Error("getting Version", "error", err)
return false return false
} }
if !gointerfaces.EnsureVersion(db.opts.version, versionReply) { if !gointerfaces.EnsureVersion(db.opts.version, versionReply) {
@ -219,14 +115,6 @@ func (db *RemoteKV) EnsureVersionCompatibility() bool {
} }
func (db *RemoteKV) Close() { func (db *RemoteKV) Close() {
if db.conn != nil {
if err := db.conn.Close(); err != nil {
db.log.Warn("close remote DB", "err", err)
} else {
db.log.Info("remote database closed")
}
db.conn = nil
}
} }
func (db *RemoteKV) BeginRo(ctx context.Context) (kv.Tx, error) { func (db *RemoteKV) BeginRo(ctx context.Context) (kv.Tx, error) {
@ -598,12 +486,12 @@ func (tx *remoteTx) closeGrpcStream() {
err := tx.stream.CloseSend() err := tx.stream.CloseSend()
if err != nil { if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
tx.db.log.Warn("couldn't send msg CloseSend to server", "err", err) log.Warn("couldn't send msg CloseSend to server", "err", err)
} }
} else { } else {
_, err = tx.stream.Recv() _, err = tx.stream.Recv()
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
tx.db.log.Warn("received unexpected error from server after CloseSend", "err", err) log.Warn("received unexpected error from server after CloseSend", "err", err)
} }
} }
} }