diff --git a/kv/mdbx/kv_abstract_test.go b/kv/mdbx/kv_abstract_test.go index 70822b8ff..cbde41cd3 100644 --- a/kv/mdbx/kv_abstract_test.go +++ b/kv/mdbx/kv_abstract_test.go @@ -19,6 +19,7 @@ package mdbx_test import ( "context" "fmt" + "net" "runtime" "testing" @@ -179,7 +180,10 @@ func TestRemoteKvVersion(t *testing.T) { // Different Major versions v1 := v v1.Major++ - a, err := remotedb.NewRemote(v1, logger).InMem(conn).Open("", "", "") + + cc, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) { return conn.Dial() })) + assert.NoError(t, err) + a, err := remotedb.NewRemote(v1, logger, remote.NewKVClient(cc)).Open() if err != nil { t.Fatalf("%v", err) } @@ -187,7 +191,7 @@ func TestRemoteKvVersion(t *testing.T) { // Different Minor versions v2 := v v2.Minor++ - _, err = remotedb.NewRemote(v2, logger).InMem(conn).Open("", "", "") + a, err = remotedb.NewRemote(v2, logger, remote.NewKVClient(cc)).Open() if err != nil { t.Fatalf("%v", err) } @@ -195,11 +199,11 @@ func TestRemoteKvVersion(t *testing.T) { // Different Patch versions v3 := v v3.Patch++ - _, err = remotedb.NewRemote(v3, logger).InMem(conn).Open("", "", "") + a, err = remotedb.NewRemote(v3, logger, remote.NewKVClient(cc)).Open() if err != nil { t.Fatalf("%v", err) } - require.False(t, a.EnsureVersionCompatibility()) + require.True(t, a.EnsureVersionCompatibility()) } func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (writeDBs []kv.RwDB, readDBs []kv.RwDB) { @@ -219,7 +223,10 @@ func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (write } go f2() v := gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion) - rdb := remotedb.NewRemote(v, logger).InMem(conn).MustOpen() + cc, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) { return conn.Dial() })) + assert.NoError(t, err) + rdb, err := remotedb.NewRemote(v, logger, remote.NewKVClient(cc)).Open() + assert.NoError(t, err) readDBs = []kv.RwDB{ writeDBs[0], writeDBs[1], diff --git a/kv/remotedb/kv_remote.go b/kv/remotedb/kv_remote.go index 34195f26f..4b2cacfd5 100644 --- a/kv/remotedb/kv_remote.go +++ b/kv/remotedb/kv_remote.go @@ -1,58 +1,31 @@ -/* - Copyright 2021 Erigon contributors - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - package remotedb import ( "bytes" "context" - "crypto/tls" - "crypto/x509" "errors" "fmt" "io" - "io/ioutil" - "net" - "time" - "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/log/v3" "google.golang.org/grpc" - "google.golang.org/grpc/backoff" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/test/bufconn" "google.golang.org/protobuf/types/known/emptypb" ) // generate the messages and services type remoteOpts struct { bucketsCfg mdbx.TableCfgFunc - inMemConn *bufconn.Listener // for tests DialAddress string version gointerfaces.Version + remoteKV remote.KVClient log log.Logger } type RemoteKV struct { - conn *grpc.ClientConn remoteKV remote.KVClient log log.Logger buckets kv.TableCfg @@ -86,89 +59,16 @@ func (opts remoteOpts) ReadOnly() remoteOpts { return opts } -func (opts remoteOpts) Path(path string) remoteOpts { - opts.DialAddress = path - return opts -} - func (opts remoteOpts) WithBucketsConfig(f mdbx.TableCfgFunc) remoteOpts { opts.bucketsCfg = f return opts } -func (opts remoteOpts) InMem(listener *bufconn.Listener) remoteOpts { - opts.inMemConn = listener - return opts -} - -func (opts remoteOpts) Open(certFile, keyFile, caCert string) (*RemoteKV, error) { - var dialOpts []grpc.DialOption - - backoffCfg := backoff.DefaultConfig - backoffCfg.BaseDelay = 500 * time.Millisecond - backoffCfg.MaxDelay = 10 * time.Second - dialOpts = []grpc.DialOption{ - grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoffCfg, MinConnectTimeout: 10 * time.Minute}), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(15 * datasize.MB))), - grpc.WithKeepaliveParams(keepalive.ClientParameters{}), - } - if certFile == "" { - dialOpts = append(dialOpts, grpc.WithInsecure()) - } else { - var creds credentials.TransportCredentials - var err error - if caCert == "" { - creds, err = credentials.NewClientTLSFromFile(certFile, "") - - if err != nil { - return nil, err - } - } else { - // load peer cert/key, ca cert - peerCert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - opts.log.Error("load peer cert/key: %s", err) - return nil, err - } - caCert, err := ioutil.ReadFile(caCert) - if err != nil { - opts.log.Error("read ca cert: %s", err) - return nil, err - } - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCert) - creds = credentials.NewTLS(&tls.Config{ - Certificates: []tls.Certificate{peerCert}, - ClientCAs: caCertPool, - ClientAuth: tls.RequireAndVerifyClientCert, - //nolint:gosec - InsecureSkipVerify: true, // This is to make it work when Common Name does not match - remove when procedure is updated for common name - }) - } - - dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds)) - } - - if opts.inMemConn != nil { - dialOpts = append(dialOpts, grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) { - return opts.inMemConn.Dial() - })) - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - conn, err := grpc.DialContext(ctx, opts.DialAddress, dialOpts...) - if err != nil { - return nil, err - } - - kvClient := remote.NewKVClient(conn) +func (opts remoteOpts) Open() (*RemoteKV, error) { db := &RemoteKV{ opts: opts, - conn: conn, - remoteKV: kvClient, - log: opts.log, + remoteKV: opts.remoteKV, + log: log.New("remote_db", opts.DialAddress), buckets: kv.TableCfg{}, } customBuckets := opts.bucketsCfg(kv.ChaindataTablesCfg) @@ -180,7 +80,7 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (*RemoteKV, error) } func (opts remoteOpts) MustOpen() kv.RwDB { - db, err := opts.Open("", "", "") + db, err := opts.Open() if err != nil { panic(err) } @@ -190,22 +90,18 @@ func (opts remoteOpts) MustOpen() kv.RwDB { // NewRemote defines new remove KV connection (without actually opening it) // version parameters represent the version the KV client is expecting, // compatibility check will be performed when the KV connection opens -func NewRemote(v gointerfaces.Version, logger log.Logger) remoteOpts { - return remoteOpts{bucketsCfg: mdbx.WithChaindataTables, version: v, log: logger} +func NewRemote(v gointerfaces.Version, logger log.Logger, remoteKV remote.KVClient) remoteOpts { + return remoteOpts{bucketsCfg: mdbx.WithChaindataTables, version: v, log: logger, remoteKV: remoteKV} } func (db *RemoteKV) AllBuckets() kv.TableCfg { return db.buckets } -func (db *RemoteKV) GrpcConn() *grpc.ClientConn { - return db.conn -} - func (db *RemoteKV) EnsureVersionCompatibility() bool { versionReply, err := db.remoteKV.Version(context.Background(), &emptypb.Empty{}, grpc.WaitForReady(true)) if err != nil { - db.log.Error("getting Version", "err", err) + db.log.Error("getting Version", "error", err) return false } if !gointerfaces.EnsureVersion(db.opts.version, versionReply) { @@ -219,14 +115,6 @@ func (db *RemoteKV) EnsureVersionCompatibility() bool { } func (db *RemoteKV) Close() { - if db.conn != nil { - if err := db.conn.Close(); err != nil { - db.log.Warn("close remote DB", "err", err) - } else { - db.log.Info("remote database closed") - } - db.conn = nil - } } func (db *RemoteKV) BeginRo(ctx context.Context) (kv.Tx, error) { @@ -598,12 +486,12 @@ func (tx *remoteTx) closeGrpcStream() { err := tx.stream.CloseSend() if err != nil { if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { - tx.db.log.Warn("couldn't send msg CloseSend to server", "err", err) + log.Warn("couldn't send msg CloseSend to server", "err", err) } } else { _, err = tx.stream.Recv() if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { - tx.db.log.Warn("received unexpected error from server after CloseSend", "err", err) + log.Warn("received unexpected error from server after CloseSend", "err", err) } } }