erigon-pulse/kv/remotedb/kv_remote.go
2021-07-28 11:50:45 +07:00

665 lines
19 KiB
Go

/*
Copyright 2021 Erigon contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotedb
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"time"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/types/known/emptypb"
)
// generate the messages and services
type remoteOpts struct {
bucketsCfg mdbx.TableCfgFunc
inMemConn *bufconn.Listener // for tests
DialAddress string
version gointerfaces.Version
log log.Logger
}
type RemoteKV struct {
conn *grpc.ClientConn
remoteKV remote.KVClient
log log.Logger
buckets kv.TableCfg
opts remoteOpts
}
type remoteTx struct {
stream remote.KV_TxClient
ctx context.Context
streamCancelFn context.CancelFunc
db *RemoteKV
cursors []*remoteCursor
statelessCursors map[string]kv.Cursor
streamingRequested bool
}
type remoteCursor struct {
ctx context.Context
stream remote.KV_TxClient
tx *remoteTx
bucketName string
bucketCfg kv.TableCfgItem
id uint32
}
type remoteCursorDupSort struct {
*remoteCursor
}
func (opts remoteOpts) ReadOnly() remoteOpts {
return opts
}
func (opts remoteOpts) Path(path string) remoteOpts {
opts.DialAddress = path
return opts
}
func (opts remoteOpts) WithBucketsConfig(f mdbx.TableCfgFunc) remoteOpts {
opts.bucketsCfg = f
return opts
}
func (opts remoteOpts) InMem(listener *bufconn.Listener) remoteOpts {
opts.inMemConn = listener
return opts
}
func (opts remoteOpts) Open(certFile, keyFile, caCert string) (*RemoteKV, error) {
var dialOpts []grpc.DialOption
backoffCfg := backoff.DefaultConfig
backoffCfg.BaseDelay = 500 * time.Millisecond
backoffCfg.MaxDelay = 10 * time.Second
dialOpts = []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoffCfg, MinConnectTimeout: 10 * time.Minute}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(15 * datasize.MB))),
grpc.WithKeepaliveParams(keepalive.ClientParameters{}),
}
if certFile == "" {
dialOpts = append(dialOpts, grpc.WithInsecure())
} else {
var creds credentials.TransportCredentials
var err error
if caCert == "" {
creds, err = credentials.NewClientTLSFromFile(certFile, "")
if err != nil {
return nil, err
}
} else {
// load peer cert/key, ca cert
peerCert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
opts.log.Error("load peer cert/key: %s", err)
return nil, err
}
caCert, err := ioutil.ReadFile(caCert)
if err != nil {
opts.log.Error("read ca cert: %s", err)
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
creds = credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{peerCert},
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
//nolint:gosec
InsecureSkipVerify: true, // This is to make it work when Common Name does not match - remove when procedure is updated for common name
})
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
}
if opts.inMemConn != nil {
dialOpts = append(dialOpts, grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) {
return opts.inMemConn.Dial()
}))
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, opts.DialAddress, dialOpts...)
if err != nil {
return nil, err
}
kvClient := remote.NewKVClient(conn)
db := &RemoteKV{
opts: opts,
conn: conn,
remoteKV: kvClient,
log: opts.log,
buckets: kv.TableCfg{},
}
customBuckets := opts.bucketsCfg(kv.ChaindataTablesCfg)
for name, cfg := range customBuckets { // copy map to avoid changing global variable
db.buckets[name] = cfg
}
return db, nil
}
func (opts remoteOpts) MustOpen() kv.RwDB {
db, err := opts.Open("", "", "")
if err != nil {
panic(err)
}
return db
}
// NewRemote defines new remove KV connection (without actually opening it)
// version parameters represent the version the KV client is expecting,
// compatibility check will be performed when the KV connection opens
func NewRemote(v gointerfaces.Version, logger log.Logger) remoteOpts {
return remoteOpts{bucketsCfg: mdbx.WithChaindataTables, version: v, log: logger}
}
func (db *RemoteKV) AllBuckets() kv.TableCfg {
return db.buckets
}
func (db *RemoteKV) GrpcConn() *grpc.ClientConn {
return db.conn
}
func (db *RemoteKV) EnsureVersionCompatibility() bool {
versionReply, err := db.remoteKV.Version(context.Background(), &emptypb.Empty{}, grpc.WaitForReady(true))
if err != nil {
db.log.Error("getting Version", "err", err)
return false
}
if !gointerfaces.EnsureVersion(db.opts.version, versionReply) {
db.log.Error("incompatible interface versions", "client", db.opts.version.String(),
"server", fmt.Sprintf("%d.%d.%d", versionReply.Major, versionReply.Minor, versionReply.Patch))
return false
}
db.log.Info("interfaces compatible", "client", db.opts.version.String(),
"server", fmt.Sprintf("%d.%d.%d", versionReply.Major, versionReply.Minor, versionReply.Patch))
return true
}
func (db *RemoteKV) Close() {
if db.conn != nil {
if err := db.conn.Close(); err != nil {
db.log.Warn("close remote DB", "err", err)
} else {
db.log.Info("remote database closed")
}
db.conn = nil
}
}
func (db *RemoteKV) BeginRo(ctx context.Context) (kv.Tx, error) {
streamCtx, streamCancelFn := context.WithCancel(ctx) // We create child context for the stream so we can cancel it to prevent leak
stream, err := db.remoteKV.Tx(streamCtx)
if err != nil {
streamCancelFn()
return nil, err
}
return &remoteTx{ctx: ctx, db: db, stream: stream, streamCancelFn: streamCancelFn}, nil
}
func (db *RemoteKV) BeginRw(ctx context.Context) (kv.RwTx, error) {
return nil, fmt.Errorf("remote db provider doesn't support .BeginRw method")
}
func (db *RemoteKV) View(ctx context.Context, f func(tx kv.Tx) error) (err error) {
tx, err := db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()
return f(tx)
}
func (db *RemoteKV) Update(ctx context.Context, f func(tx kv.RwTx) error) (err error) {
return fmt.Errorf("remote db provider doesn't support .Update method")
}
func (tx *remoteTx) CollectMetrics() {}
func (tx *remoteTx) IncrementSequence(bucket string, amount uint64) (uint64, error) {
panic("not implemented yet")
}
func (tx *remoteTx) ReadSequence(bucket string) (uint64, error) {
panic("not implemented yet")
}
func (tx *remoteTx) Append(bucket string, k, v []byte) error { panic("no write methods") }
func (tx *remoteTx) AppendDup(bucket string, k, v []byte) error { panic("no write methods") }
func (tx *remoteTx) Commit() error {
panic("remote db is read-only")
}
func (tx *remoteTx) Rollback() {
for _, c := range tx.cursors {
c.Close()
}
tx.closeGrpcStream()
}
func (tx *remoteTx) statelessCursor(bucket string) (kv.Cursor, error) {
if tx.statelessCursors == nil {
tx.statelessCursors = make(map[string]kv.Cursor)
}
c, ok := tx.statelessCursors[bucket]
if !ok {
var err error
c, err = tx.Cursor(bucket)
if err != nil {
return nil, err
}
tx.statelessCursors[bucket] = c
}
return c, nil
}
func (tx *remoteTx) BucketSize(name string) (uint64, error) { panic("not implemented") }
// TODO: this must be optimized - and implemented as single command on server, with server-side buffered streaming
func (tx *remoteTx) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error {
c, err := tx.Cursor(bucket)
if err != nil {
return err
}
defer c.Close()
for k, v, err := c.Seek(fromPrefix); k != nil; k, v, err = c.Next() {
if err != nil {
return err
}
if err := walker(k, v); err != nil {
return err
}
}
return nil
}
// TODO: this must be optimized - and implemented as single command on server, with server-side buffered streaming
func (tx *remoteTx) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error {
c, err := tx.Cursor(bucket)
if err != nil {
return err
}
defer c.Close()
for k, v, err := c.Seek(prefix); k != nil; k, v, err = c.Next() {
if err != nil {
return err
}
if !bytes.HasPrefix(k, prefix) {
break
}
if err := walker(k, v); err != nil {
return err
}
}
return nil
}
func (tx *remoteTx) ForAmount(bucket string, fromPrefix []byte, amount uint32, walker func(k, v []byte) error) error {
c, err := tx.Cursor(bucket)
if err != nil {
return err
}
defer c.Close()
for k, v, err := c.Seek(fromPrefix); k != nil && amount > 0; k, v, err = c.Next() {
if err != nil {
return err
}
if err := walker(k, v); err != nil {
return err
}
amount--
}
return nil
}
func (tx *remoteTx) GetOne(bucket string, key []byte) (val []byte, err error) {
c, err := tx.statelessCursor(bucket)
if err != nil {
return nil, err
}
_, val, err = c.SeekExact(key)
return val, err
}
func (tx *remoteTx) Has(bucket string, key []byte) (bool, error) {
c, err := tx.statelessCursor(bucket)
if err != nil {
return false, err
}
k, _, err := c.Seek(key)
if err != nil {
return false, err
}
return bytes.Equal(key, k), nil
}
func (c *remoteCursor) SeekExact(key []byte) (k, val []byte, err error) {
return c.seekExact(key)
}
func (c *remoteCursor) Prev() ([]byte, []byte, error) {
return c.prev()
}
func (tx *remoteTx) Cursor(bucket string) (kv.Cursor, error) {
b := tx.db.buckets[bucket]
c := &remoteCursor{tx: tx, ctx: tx.ctx, bucketName: bucket, bucketCfg: b, stream: tx.stream}
tx.cursors = append(tx.cursors, c)
if err := c.stream.Send(&remote.Cursor{Op: remote.Op_OPEN, BucketName: c.bucketName}); err != nil {
return nil, err
}
msg, err := c.stream.Recv()
if err != nil {
return nil, err
}
c.id = msg.CursorID
return c, nil
}
func (c *remoteCursor) Put(key []byte, value []byte) error { panic("not supported") }
func (c *remoteCursor) PutNoOverwrite(key []byte, value []byte) error { panic("not supported") }
func (c *remoteCursor) Append(key []byte, value []byte) error { panic("not supported") }
func (c *remoteCursor) Delete(k, v []byte) error { panic("not supported") }
func (c *remoteCursor) DeleteCurrent() error { panic("not supported") }
func (c *remoteCursor) Count() (uint64, error) { panic("not supported") }
func (c *remoteCursor) first() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_FIRST}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) next() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_NEXT}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) nextDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_NEXT_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) nextNoDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_NEXT_NO_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) prev() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_PREV}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) prevDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_PREV_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) prevNoDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_PREV_NO_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) last() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_LAST}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) setRange(k []byte) ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK, K: k}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) seekExact(k []byte) ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK_EXACT, K: k}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) getBothRange(k, v []byte) ([]byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK_BOTH, K: k, V: v}); err != nil {
return nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return nil, err
}
return pair.V, nil
}
func (c *remoteCursor) seekBothExact(k, v []byte) ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK_BOTH_EXACT, K: k, V: v}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) firstDup() ([]byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_FIRST_DUP}); err != nil {
return nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return nil, err
}
return pair.V, nil
}
func (c *remoteCursor) lastDup() ([]byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_LAST_DUP}); err != nil {
return nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return nil, err
}
return pair.V, nil
}
func (c *remoteCursor) getCurrent() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_CURRENT}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) Current() ([]byte, []byte, error) {
return c.getCurrent()
}
// Seek - doesn't start streaming (because much of code does only several .Seek calls without reading sequence of data)
// .Next() - does request streaming (if configured by user)
func (c *remoteCursor) Seek(seek []byte) ([]byte, []byte, error) {
return c.setRange(seek)
}
func (c *remoteCursor) First() ([]byte, []byte, error) {
return c.first()
}
// Next - returns next data element from server, request streaming (if configured by user)
func (c *remoteCursor) Next() ([]byte, []byte, error) {
return c.next()
}
func (c *remoteCursor) Last() ([]byte, []byte, error) {
return c.last()
}
func (tx *remoteTx) closeGrpcStream() {
if tx.stream == nil {
return
}
defer tx.streamCancelFn() // hard cancel stream if graceful wasn't successful
if tx.streamingRequested {
// if streaming is in progress, can't use `CloseSend` - because
// server will not read it right not - it busy with streaming data
// TODO: set flag 'tx.streamingRequested' to false when got terminator from server (nil key or os.EOF)
tx.streamCancelFn()
} else {
// try graceful close stream
err := tx.stream.CloseSend()
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
tx.db.log.Warn("couldn't send msg CloseSend to server", "err", err)
}
} else {
_, err = tx.stream.Recv()
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
tx.db.log.Warn("received unexpected error from server after CloseSend", "err", err)
}
}
}
tx.stream = nil
tx.streamingRequested = false
}
func (c *remoteCursor) Close() {
if c.stream == nil {
return
}
st := c.stream
c.stream = nil
if err := st.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_CLOSE}); err == nil {
_, _ = st.Recv()
}
}
func (tx *remoteTx) CursorDupSort(bucket string) (kv.CursorDupSort, error) {
c, err := tx.Cursor(bucket)
if err != nil {
return nil, err
}
return &remoteCursorDupSort{remoteCursor: c.(*remoteCursor)}, nil
}
func (c *remoteCursorDupSort) SeekBothExact(key, value []byte) ([]byte, []byte, error) {
return c.seekBothExact(key, value)
}
func (c *remoteCursorDupSort) SeekBothRange(key, value []byte) ([]byte, error) {
return c.getBothRange(key, value)
}
func (c *remoteCursorDupSort) DeleteExact(k1, k2 []byte) error { panic("not supported") }
func (c *remoteCursorDupSort) AppendDup(k []byte, v []byte) error { panic("not supported") }
func (c *remoteCursorDupSort) PutNoDupData(key, value []byte) error { panic("not supported") }
func (c *remoteCursorDupSort) DeleteCurrentDuplicates() error { panic("not supported") }
func (c *remoteCursorDupSort) CountDuplicates() (uint64, error) { panic("not supported") }
func (c *remoteCursorDupSort) FirstDup() ([]byte, error) {
return c.firstDup()
}
func (c *remoteCursorDupSort) NextDup() ([]byte, []byte, error) {
return c.nextDup()
}
func (c *remoteCursorDupSort) NextNoDup() ([]byte, []byte, error) {
return c.nextNoDup()
}
func (c *remoteCursorDupSort) PrevDup() ([]byte, []byte, error) {
return c.prevDup()
}
func (c *remoteCursorDupSort) PrevNoDup() ([]byte, []byte, error) {
return c.prevNoDup()
}
func (c *remoteCursorDupSort) LastDup() ([]byte, error) {
return c.lastDup()
}