erigon-pulse/ethdb/kv_remote.go

599 lines
17 KiB
Go
Raw Normal View History

package ethdb
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"time"
"unsafe"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
More updates to downloader, new p2psentry protocol (#1559) * Initial commit * Add sentry gRPC interface * p2psentry directory * Update README.md * Update README.md * Update README.md * Add go package * Correct syntax * add external downloader interface (#2) * Add txpool (#3) * Add private API (#4) * Invert control.proto, add PeerMinBlock, Separare incoming Tx message into a separate stream (#5) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Separate upload messages into its own stream (#6) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Only send changed accounts to listeners (#7) * Txpool interface doc (#9) * More additions * More additions * Fix locking * Intermediate * Fix separation of phases * Intermediate * Fix test * More transformations * New simplified way of downloading headers * Fix hard-coded header sync * Fixed syncing near the tip of the chain * Add architecture diagram source and picture (#10) * More fixes * rename tip to link * Use preverified hashes instead of preverified headers * Fix preverified hashes generation * more parametrisation * Continue parametrisation * Fix grpc data limit, interruption of headers stage * Add ropsten preverified hashes * Typed hashes (#11) * Typed hashes * Fix PeerId * 64-bit tx nonce * Disable penalties * Add goerli settings, bootstrap nodes * Try to fix goerly sync * Remove interfaces * Add proper golang packages, max_block into p2p sentry Status * Prepare for proto overhaul * Squashed 'interfaces/' content from commit ce36053c2 git-subtree-dir: interfaces git-subtree-split: ce36053c24db2f56e48ac752808de60afa1dfb4b * Change EtherReply to address * Adaptations to new types * Switch to new types * Fixes * Fix formatting * Fix lint * Lint fixes, reverse order in types * Fix lint * Fix lint * Fix lint * Fix test * Not supporting eth/66 yet * Fix shutdown * Fix lint * Fix lint * Fix lint * return stopped check Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me> Co-authored-by: b00ris <b00ris@mail.ru> Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> Co-authored-by: lightclient <14004106+lightclient@users.noreply.github.com> Co-authored-by: canepat <16927169+canepat@users.noreply.github.com>
2021-03-19 21:24:49 +00:00
"github.com/ledgerwatch/turbo-geth/gointerfaces/remote"
"github.com/ledgerwatch/turbo-geth/log"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/test/bufconn"
Integration tests 1 (#1793) * Initial commit * Add sentry gRPC interface * p2psentry directory * Update README.md * Update README.md * Update README.md * Add go package * Correct syntax * add external downloader interface (#2) * Add txpool (#3) * Add private API (#4) * Invert control.proto, add PeerMinBlock, Separare incoming Tx message into a separate stream (#5) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Separate upload messages into its own stream (#6) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Only send changed accounts to listeners (#7) * Txpool interface doc (#9) * Add architecture diagram source and picture (#10) * Typed hashes (#11) * Typed hashes * Fix PeerId * 64-bit tx nonce * Add proper golang packages, max_block into p2p sentry Status (#12) * Add proper golang packages, max_block into p2p sentry Status * Change EtherReply to address Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Add Rust infrastructure (#13) * DB stats methods removed by https://github.com/ledgerwatch/turbo-geth/pull/1665 * more p2p methods (#15) * add mining methods (#16) * First draft of Consensus gRPC interface (#14) * Update Rust build * Fix interfaces in architecture diagram (#17) * Fix KV interface provider * Fix Consensus interface provider * drop java attributes (#18) * tx pool remove unused import (#19) * ethbackend: add protocol version and client version (#20) * Add missing ethbackend I/F (#21) * Add interface versioning mechanism (#23) Add versioning in KV interface Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me> * spec of tx pool method (#24) * spec of tx pool method (#25) * Update version.proto * Refactor interface versioning * Refactor interface versioning * Testing interface * Remove tree * Fix * Build testing protos * Fix * Fix * Update to the newer interfaces * Add ProtocolVersion and ClientVersion stubs * Hook up ProtocolVersion and ClientVersion * Remove service * Add compatibility checks for RPC daemon * Fix typos * Properly update DB schema version * Fix test * Add test for KV compatibility| * Info messages about compability for RPC daemon * DB schema version to be one key * Update release intructions Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me> Co-authored-by: b00ris <b00ris@mail.ru> Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> Co-authored-by: lightclient <14004106+lightclient@users.noreply.github.com> Co-authored-by: canepat <16927169+canepat@users.noreply.github.com> Co-authored-by: Alex Sharov <AskAlexSharov@gmail.com> Co-authored-by: canepat <tullio.canepa@gmail.com> Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
2021-04-24 15:46:29 +00:00
"google.golang.org/protobuf/types/known/emptypb"
)
// generate the messages and services
//go:generate protoc --proto_path=../interfaces --go_out=. --go-grpc_out=. "remote/kv.proto" -I=. -I=./../build/include/google
//go:generate protoc --proto_path=../interfaces --go_out=. --go-grpc_out=. "remote/db.proto" -I=. -I=./../build/include/google
//go:generate protoc --proto_path=../interfaces --go_out=. --go-grpc_out=. "remote/ethbackend.proto" -I=. -I=./../build/include/google
type remoteOpts struct {
Integration tests 1 (#1793) * Initial commit * Add sentry gRPC interface * p2psentry directory * Update README.md * Update README.md * Update README.md * Add go package * Correct syntax * add external downloader interface (#2) * Add txpool (#3) * Add private API (#4) * Invert control.proto, add PeerMinBlock, Separare incoming Tx message into a separate stream (#5) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Separate upload messages into its own stream (#6) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Only send changed accounts to listeners (#7) * Txpool interface doc (#9) * Add architecture diagram source and picture (#10) * Typed hashes (#11) * Typed hashes * Fix PeerId * 64-bit tx nonce * Add proper golang packages, max_block into p2p sentry Status (#12) * Add proper golang packages, max_block into p2p sentry Status * Change EtherReply to address Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Add Rust infrastructure (#13) * DB stats methods removed by https://github.com/ledgerwatch/turbo-geth/pull/1665 * more p2p methods (#15) * add mining methods (#16) * First draft of Consensus gRPC interface (#14) * Update Rust build * Fix interfaces in architecture diagram (#17) * Fix KV interface provider * Fix Consensus interface provider * drop java attributes (#18) * tx pool remove unused import (#19) * ethbackend: add protocol version and client version (#20) * Add missing ethbackend I/F (#21) * Add interface versioning mechanism (#23) Add versioning in KV interface Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me> * spec of tx pool method (#24) * spec of tx pool method (#25) * Update version.proto * Refactor interface versioning * Refactor interface versioning * Testing interface * Remove tree * Fix * Build testing protos * Fix * Fix * Update to the newer interfaces * Add ProtocolVersion and ClientVersion stubs * Hook up ProtocolVersion and ClientVersion * Remove service * Add compatibility checks for RPC daemon * Fix typos * Properly update DB schema version * Fix test * Add test for KV compatibility| * Info messages about compability for RPC daemon * DB schema version to be one key * Update release intructions Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me> Co-authored-by: b00ris <b00ris@mail.ru> Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> Co-authored-by: lightclient <14004106+lightclient@users.noreply.github.com> Co-authored-by: canepat <16927169+canepat@users.noreply.github.com> Co-authored-by: Alex Sharov <AskAlexSharov@gmail.com> Co-authored-by: canepat <tullio.canepa@gmail.com> Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
2021-04-24 15:46:29 +00:00
DialAddress string
inMemConn *bufconn.Listener // for tests
bucketsCfg BucketConfigsFunc
versionMajor, versionMinor, versionPatch uint32 // KV interface version of the client - to perform compatibility check when opening
}
type RemoteKV struct {
opts remoteOpts
remoteKV remote.KVClient
conn *grpc.ClientConn
log log.Logger
buckets dbutils.BucketsCfg
}
type remoteTx struct {
ctx context.Context
db *RemoteKV
cursors []*remoteCursor
stream remote.KV_TxClient
streamCancelFn context.CancelFunc
streamingRequested bool
statelessCursors map[string]Cursor
}
type remoteCursor struct {
2021-04-02 09:15:41 +00:00
id uint32
ctx context.Context
stream remote.KV_TxClient
tx *remoteTx
bucketName string
bucketCfg dbutils.BucketConfigItem
}
type remoteCursorDupSort struct {
*remoteCursor
}
func (opts remoteOpts) ReadOnly() remoteOpts {
return opts
}
func (opts remoteOpts) Path(path string) remoteOpts {
opts.DialAddress = path
return opts
}
func (opts remoteOpts) WithBucketsConfig(f BucketConfigsFunc) remoteOpts {
opts.bucketsCfg = f
return opts
}
func (opts remoteOpts) InMem(listener *bufconn.Listener) remoteOpts {
opts.inMemConn = listener
return opts
}
func (opts remoteOpts) Open(certFile, keyFile, caCert string, cancelFn context.CancelFunc) (*RemoteKV, error) {
var dialOpts []grpc.DialOption
dialOpts = []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig, MinConnectTimeout: 10 * time.Minute}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(5 * datasize.MB))),
grpc.WithKeepaliveParams(keepalive.ClientParameters{}),
}
if certFile == "" {
dialOpts = append(dialOpts, grpc.WithInsecure())
} else {
var creds credentials.TransportCredentials
var err error
if caCert == "" {
creds, err = credentials.NewClientTLSFromFile(certFile, "")
if err != nil {
return nil, err
}
} else {
// load peer cert/key, ca cert
peerCert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
log.Error("load peer cert/key error:%v", err)
return nil, err
}
caCert, err := ioutil.ReadFile(caCert)
if err != nil {
log.Error("read ca cert file error:%v", err)
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
creds = credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{peerCert},
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
//nolint:gosec
InsecureSkipVerify: true, // This is to make it work when Common Name does not match - remove when procedure is updated for common name
})
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
}
if opts.inMemConn != nil {
dialOpts = append(dialOpts, grpc.WithContextDialer(func(ctx context.Context, url string) (net.Conn, error) {
return opts.inMemConn.Dial()
}))
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, opts.DialAddress, dialOpts...)
if err != nil {
return nil, err
}
Integration tests 1 (#1793) * Initial commit * Add sentry gRPC interface * p2psentry directory * Update README.md * Update README.md * Update README.md * Add go package * Correct syntax * add external downloader interface (#2) * Add txpool (#3) * Add private API (#4) * Invert control.proto, add PeerMinBlock, Separare incoming Tx message into a separate stream (#5) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Separate upload messages into its own stream (#6) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Only send changed accounts to listeners (#7) * Txpool interface doc (#9) * Add architecture diagram source and picture (#10) * Typed hashes (#11) * Typed hashes * Fix PeerId * 64-bit tx nonce * Add proper golang packages, max_block into p2p sentry Status (#12) * Add proper golang packages, max_block into p2p sentry Status * Change EtherReply to address Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Add Rust infrastructure (#13) * DB stats methods removed by https://github.com/ledgerwatch/turbo-geth/pull/1665 * more p2p methods (#15) * add mining methods (#16) * First draft of Consensus gRPC interface (#14) * Update Rust build * Fix interfaces in architecture diagram (#17) * Fix KV interface provider * Fix Consensus interface provider * drop java attributes (#18) * tx pool remove unused import (#19) * ethbackend: add protocol version and client version (#20) * Add missing ethbackend I/F (#21) * Add interface versioning mechanism (#23) Add versioning in KV interface Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me> * spec of tx pool method (#24) * spec of tx pool method (#25) * Update version.proto * Refactor interface versioning * Refactor interface versioning * Testing interface * Remove tree * Fix * Build testing protos * Fix * Fix * Update to the newer interfaces * Add ProtocolVersion and ClientVersion stubs * Hook up ProtocolVersion and ClientVersion * Remove service * Add compatibility checks for RPC daemon * Fix typos * Properly update DB schema version * Fix test * Add test for KV compatibility| * Info messages about compability for RPC daemon * DB schema version to be one key * Update release intructions Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me> Co-authored-by: b00ris <b00ris@mail.ru> Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> Co-authored-by: lightclient <14004106+lightclient@users.noreply.github.com> Co-authored-by: canepat <16927169+canepat@users.noreply.github.com> Co-authored-by: Alex Sharov <AskAlexSharov@gmail.com> Co-authored-by: canepat <tullio.canepa@gmail.com> Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
2021-04-24 15:46:29 +00:00
kvClient := remote.NewKVClient(conn)
// Perform compatibility check
go func() {
versionReply, err := kvClient.Version(context.Background(), &emptypb.Empty{}, grpc.WaitForReady(true))
if err != nil {
log.Error("getting Version info from remove KV", "error", err)
cancelFn()
return
}
var compatible bool
if versionReply.Major != opts.versionMajor {
compatible = false
} else if versionReply.Minor != opts.versionMinor {
compatible = false
} else {
compatible = true
}
if !compatible {
log.Error("incompatible KV interface versions", "client", fmt.Sprintf("%d.%d.%d", opts.versionMajor, opts.versionMinor, opts.versionPatch),
"server", fmt.Sprintf("%d.%d.%d", versionReply.Major, versionReply.Minor, versionReply.Patch))
cancelFn()
return
}
log.Info("KV interfaces compatible", "client", fmt.Sprintf("%d.%d.%d", opts.versionMajor, opts.versionMinor, opts.versionPatch),
"server", fmt.Sprintf("%d.%d.%d", versionReply.Major, versionReply.Minor, versionReply.Patch))
}()
db := &RemoteKV{
opts: opts,
conn: conn,
remoteKV: kvClient,
log: log.New("remote_db", opts.DialAddress),
buckets: dbutils.BucketsCfg{},
}
customBuckets := opts.bucketsCfg(dbutils.BucketsConfigs)
for name, cfg := range customBuckets { // copy map to avoid changing global variable
db.buckets[name] = cfg
}
return db, nil
}
2021-03-30 09:53:54 +00:00
func (opts remoteOpts) MustOpen() RwKV {
db, err := opts.Open("", "", "", func() {})
if err != nil {
panic(err)
}
return db
}
Integration tests 1 (#1793) * Initial commit * Add sentry gRPC interface * p2psentry directory * Update README.md * Update README.md * Update README.md * Add go package * Correct syntax * add external downloader interface (#2) * Add txpool (#3) * Add private API (#4) * Invert control.proto, add PeerMinBlock, Separare incoming Tx message into a separate stream (#5) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Separate upload messages into its own stream (#6) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Only send changed accounts to listeners (#7) * Txpool interface doc (#9) * Add architecture diagram source and picture (#10) * Typed hashes (#11) * Typed hashes * Fix PeerId * 64-bit tx nonce * Add proper golang packages, max_block into p2p sentry Status (#12) * Add proper golang packages, max_block into p2p sentry Status * Change EtherReply to address Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Add Rust infrastructure (#13) * DB stats methods removed by https://github.com/ledgerwatch/turbo-geth/pull/1665 * more p2p methods (#15) * add mining methods (#16) * First draft of Consensus gRPC interface (#14) * Update Rust build * Fix interfaces in architecture diagram (#17) * Fix KV interface provider * Fix Consensus interface provider * drop java attributes (#18) * tx pool remove unused import (#19) * ethbackend: add protocol version and client version (#20) * Add missing ethbackend I/F (#21) * Add interface versioning mechanism (#23) Add versioning in KV interface Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me> * spec of tx pool method (#24) * spec of tx pool method (#25) * Update version.proto * Refactor interface versioning * Refactor interface versioning * Testing interface * Remove tree * Fix * Build testing protos * Fix * Fix * Update to the newer interfaces * Add ProtocolVersion and ClientVersion stubs * Hook up ProtocolVersion and ClientVersion * Remove service * Add compatibility checks for RPC daemon * Fix typos * Properly update DB schema version * Fix test * Add test for KV compatibility| * Info messages about compability for RPC daemon * DB schema version to be one key * Update release intructions Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me> Co-authored-by: b00ris <b00ris@mail.ru> Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> Co-authored-by: lightclient <14004106+lightclient@users.noreply.github.com> Co-authored-by: canepat <16927169+canepat@users.noreply.github.com> Co-authored-by: Alex Sharov <AskAlexSharov@gmail.com> Co-authored-by: canepat <tullio.canepa@gmail.com> Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
2021-04-24 15:46:29 +00:00
// NewRemote defines new remove KV connection (without actually opening it)
// version parameters represent the version the KV client is expecting,
// compatibility check will be performed when the KV connection opens
func NewRemote(versionMajor, versionMinor, versionPatch uint32) remoteOpts {
return remoteOpts{bucketsCfg: DefaultBucketConfigs, versionMajor: versionMajor, versionMinor: versionMinor, versionPatch: versionPatch}
}
func (db *RemoteKV) AllBuckets() dbutils.BucketsCfg {
return db.buckets
}
func (db *RemoteKV) GrpcConn() *grpc.ClientConn {
return db.conn
}
// Close
// All transactions must be closed before closing the database.
func (db *RemoteKV) Close() {
if db.conn != nil {
if err := db.conn.Close(); err != nil {
db.log.Warn("failed to close remote DB", "err", err)
} else {
db.log.Info("remote database closed")
}
db.conn = nil
}
}
func (db *RemoteKV) CollectMetrics() {}
2021-04-03 06:26:00 +00:00
func (db *RemoteKV) BeginRo(ctx context.Context) (Tx, error) {
streamCtx, streamCancelFn := context.WithCancel(ctx) // We create child context for the stream so we can cancel it to prevent leak
stream, err := db.remoteKV.Tx(streamCtx)
if err != nil {
streamCancelFn()
return nil, err
}
return &remoteTx{ctx: ctx, db: db, stream: stream, streamCancelFn: streamCancelFn}, nil
}
func (db *RemoteKV) BeginRw(ctx context.Context) (RwTx, error) {
return nil, fmt.Errorf("remote db provider doesn't support .BeginRw method")
}
func (db *RemoteKV) View(ctx context.Context, f func(tx Tx) error) (err error) {
2021-04-03 06:26:00 +00:00
tx, err := db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()
return f(tx)
}
func (db *RemoteKV) Update(ctx context.Context, f func(tx RwTx) error) (err error) {
return fmt.Errorf("remote db provider doesn't support .Update method")
}
func (tx *remoteTx) Comparator(bucket string) dbutils.CmpFunc { panic("not implemented yet") }
2021-04-27 08:32:41 +00:00
func (tx *remoteTx) CollectMetrics() {}
func (tx *remoteTx) CHandle() unsafe.Pointer { panic("not implemented yet") }
func (tx *remoteTx) IncrementSequence(bucket string, amount uint64) (uint64, error) {
panic("not implemented yet")
}
func (tx *remoteTx) ReadSequence(bucket string) (uint64, error) {
panic("not implemented yet")
}
2021-04-03 06:26:00 +00:00
func (tx *remoteTx) Commit() error {
panic("remote db is read-only")
}
func (tx *remoteTx) Rollback() {
for _, c := range tx.cursors {
c.Close()
}
tx.closeGrpcStream()
}
func (tx *remoteTx) statelessCursor(bucket string) (Cursor, error) {
if tx.statelessCursors == nil {
tx.statelessCursors = make(map[string]Cursor)
}
c, ok := tx.statelessCursors[bucket]
if !ok {
var err error
c, err = tx.Cursor(bucket)
if err != nil {
return nil, err
}
tx.statelessCursors[bucket] = c
}
return c, nil
}
func (tx *remoteTx) GetOne(bucket string, key []byte) (val []byte, err error) {
c, err := tx.statelessCursor(bucket)
if err != nil {
return nil, err
}
ChangeSets dupsort (#1342) * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * squash * squash * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * history_early_stop * history_early_stop * vmConfig with ReadOnly false * auto_increment * auto_increment * rebase master Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-11-16 12:08:28 +00:00
_, val, err = c.SeekExact(key)
return val, err
}
2021-04-03 12:04:58 +00:00
func (tx *remoteTx) Has(bucket string, key []byte) (bool, error) {
c, err := tx.statelessCursor(bucket)
if err != nil {
return false, err
}
k, _, err := c.Seek(key)
if err != nil {
return false, err
}
return bytes.Equal(key, k), nil
}
ChangeSets dupsort (#1342) * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * squash * squash * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * history_early_stop * history_early_stop * vmConfig with ReadOnly false * auto_increment * auto_increment * rebase master Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-11-16 12:08:28 +00:00
func (c *remoteCursor) SeekExact(key []byte) (k, val []byte, err error) {
return c.seekExact(key)
}
func (c *remoteCursor) Prev() ([]byte, []byte, error) {
return c.prev()
}
func (tx *remoteTx) Cursor(bucket string) (Cursor, error) {
b := tx.db.buckets[bucket]
c := &remoteCursor{tx: tx, ctx: tx.ctx, bucketName: bucket, bucketCfg: b, stream: tx.stream}
tx.cursors = append(tx.cursors, c)
if err := c.stream.Send(&remote.Cursor{Op: remote.Op_OPEN, BucketName: c.bucketName}); err != nil {
2021-04-02 09:15:41 +00:00
return nil, err
}
msg, err := c.stream.Recv()
if err != nil {
2021-04-02 09:15:41 +00:00
return nil, err
}
c.id = msg.CursorID
2021-04-02 09:15:41 +00:00
return c, nil
}
func (c *remoteCursor) Put(key []byte, value []byte) error { panic("not supported") }
func (c *remoteCursor) PutNoOverwrite(key []byte, value []byte) error { panic("not supported") }
func (c *remoteCursor) Append(key []byte, value []byte) error { panic("not supported") }
func (c *remoteCursor) Delete(k, v []byte) error { panic("not supported") }
func (c *remoteCursor) DeleteCurrent() error { panic("not supported") }
func (c *remoteCursor) Count() (uint64, error) { panic("not supported") }
func (c *remoteCursor) first() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_FIRST}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) next() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_NEXT}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) nextDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_NEXT_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) nextNoDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_NEXT_NO_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) prev() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_PREV}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) prevDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_PREV_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) prevNoDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_PREV_NO_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) last() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_LAST}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) setRange(k []byte) ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK, K: k}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
ChangeSets dupsort (#1342) * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * squash * squash * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * history_early_stop * history_early_stop * vmConfig with ReadOnly false * auto_increment * auto_increment * rebase master Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-11-16 12:08:28 +00:00
func (c *remoteCursor) seekExact(k []byte) ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK_EXACT, K: k}); err != nil {
ChangeSets dupsort (#1342) * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * squash * squash * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * history_early_stop * history_early_stop * vmConfig with ReadOnly false * auto_increment * auto_increment * rebase master Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-11-16 12:08:28 +00:00
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
ChangeSets dupsort (#1342) * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * squash * squash * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * history_early_stop * history_early_stop * vmConfig with ReadOnly false * auto_increment * auto_increment * rebase master Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-11-16 12:08:28 +00:00
return []byte{}, nil, err
}
ChangeSets dupsort (#1342) * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * change_set_dup * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * working version * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * aa * squash * squash * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * fix * history_early_stop * history_early_stop * vmConfig with ReadOnly false * auto_increment * auto_increment * rebase master Co-authored-by: Alexey Akhunov <akhounov@gmail.com>
2020-11-16 12:08:28 +00:00
return pair.K, pair.V, nil
}
func (c *remoteCursor) getBothRange(k, v []byte) ([]byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK_BOTH, K: k, V: v}); err != nil {
return nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return nil, err
}
return pair.V, nil
}
func (c *remoteCursor) seekBothExact(k, v []byte) ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK_BOTH_EXACT, K: k, V: v}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) firstDup() ([]byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_FIRST_DUP}); err != nil {
return nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return nil, err
}
return pair.V, nil
}
func (c *remoteCursor) lastDup() ([]byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_LAST_DUP}); err != nil {
return nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return nil, err
}
return pair.V, nil
}
func (c *remoteCursor) getCurrent() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_CURRENT}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) Current() ([]byte, []byte, error) {
return c.getCurrent()
}
// Seek - doesn't start streaming (because much of code does only several .Seek calls without reading sequence of data)
// .Next() - does request streaming (if configured by user)
func (c *remoteCursor) Seek(seek []byte) ([]byte, []byte, error) {
return c.setRange(seek)
}
func (c *remoteCursor) First() ([]byte, []byte, error) {
return c.first()
}
// Next - returns next data element from server, request streaming (if configured by user)
func (c *remoteCursor) Next() ([]byte, []byte, error) {
return c.next()
}
func (c *remoteCursor) Last() ([]byte, []byte, error) {
return c.last()
}
func (tx *remoteTx) closeGrpcStream() {
if tx.stream == nil {
return
}
defer tx.streamCancelFn() // hard cancel stream if graceful wasn't successful
if tx.streamingRequested {
// if streaming is in progress, can't use `CloseSend` - because
// server will not read it right not - it busy with streaming data
// TODO: set flag 'tx.streamingRequested' to false when got terminator from server (nil key or os.EOF)
tx.streamCancelFn()
} else {
// try graceful close stream
err := tx.stream.CloseSend()
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
log.Warn("couldn't send msg CloseSend to server", "err", err)
}
} else {
_, err = tx.stream.Recv()
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
log.Warn("received unexpected error from server after CloseSend", "err", err)
}
}
}
tx.stream = nil
tx.streamingRequested = false
}
func (c *remoteCursor) Close() {
2021-04-02 09:15:41 +00:00
if c.stream == nil {
return
}
st := c.stream
c.stream = nil
if err := st.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_CLOSE}); err == nil {
_, _ = st.Recv()
}
}
func (tx *remoteTx) CursorDupSort(bucket string) (CursorDupSort, error) {
c, err := tx.Cursor(bucket)
if err != nil {
return nil, err
}
return &remoteCursorDupSort{remoteCursor: c.(*remoteCursor)}, nil
}
func (c *remoteCursorDupSort) SeekBothExact(key, value []byte) ([]byte, []byte, error) {
return c.seekBothExact(key, value)
}
func (c *remoteCursorDupSort) SeekBothRange(key, value []byte) ([]byte, error) {
return c.getBothRange(key, value)
}
func (c *remoteCursorDupSort) DeleteExact(k1, k2 []byte) error { panic("not supported") }
func (c *remoteCursorDupSort) AppendDup(k []byte, v []byte) error { panic("not supported") }
func (c *remoteCursorDupSort) PutNoDupData(key, value []byte) error { panic("not supported") }
func (c *remoteCursorDupSort) DeleteCurrentDuplicates() error { panic("not supported") }
func (c *remoteCursorDupSort) CountDuplicates() (uint64, error) { panic("not supported") }
func (c *remoteCursorDupSort) FirstDup() ([]byte, error) {
return c.firstDup()
}
func (c *remoteCursorDupSort) NextDup() ([]byte, []byte, error) {
return c.nextDup()
}
func (c *remoteCursorDupSort) NextNoDup() ([]byte, []byte, error) {
return c.nextNoDup()
}
func (c *remoteCursorDupSort) PrevDup() ([]byte, []byte, error) {
return c.prevDup()
}
func (c *remoteCursorDupSort) PrevNoDup() ([]byte, []byte, error) {
return c.prevNoDup()
}
func (c *remoteCursorDupSort) LastDup() ([]byte, error) {
return c.lastDup()
}