erigon-pulse/erigon-lib/kv/remotedb/kv_remote.go
canepat 9568567eda
Add RPC daemon using Silkworm (#8486)
This introduces _experimental_ RPC daemon run by embedded Silkworm
library. Same notes as in PR #8353 apply here plus the following ones:

- activated if `http` command-line option is enabled and `silkworm.path`
option is present, nothing more is required (i.e. currently, both block
execution and RPC daemon run by Silkworm when specifying
`silkworm.path`, just to keep things as simple as possible)
- only Execution API endpoints are implemented by Silkworm RPCDaemon,
whilst Engine API endpoints are still served by Erigon RPCDaemon
- some features are still missing, in particular:
  - state change notification handling
- custom JSON RPC settings (i.e. Erigon RPC settings are not passed to
Silkworm yet)
2023-10-18 06:37:16 +07:00

738 lines
22 KiB
Go

/*
Copyright 2021 Erigon contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotedb
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"runtime"
"unsafe"
"github.com/ledgerwatch/erigon-lib/kv/iter"
"github.com/ledgerwatch/erigon-lib/kv/order"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
)
// generate the messages and services
type remoteOpts struct {
remoteKV remote.KVClient
log log.Logger
bucketsCfg kv.TableCfg
DialAddress string
version gointerfaces.Version
}
var _ kv.TemporalTx = (*tx)(nil)
type DB struct {
remoteKV remote.KVClient
log log.Logger
buckets kv.TableCfg
roTxsLimiter *semaphore.Weighted
opts remoteOpts
}
type tx struct {
stream remote.KV_TxClient
ctx context.Context
streamCancelFn context.CancelFunc
db *DB
statelessCursors map[string]kv.Cursor
cursors []*remoteCursor
streams []kv.Closer
viewID, id uint64
streamingRequested bool
}
type remoteCursor struct {
ctx context.Context
stream remote.KV_TxClient
tx *tx
bucketName string
bucketCfg kv.TableCfgItem
id uint32
}
type remoteCursorDupSort struct {
*remoteCursor
}
func (opts remoteOpts) ReadOnly() remoteOpts {
return opts
}
func (opts remoteOpts) WithBucketsConfig(c kv.TableCfg) remoteOpts {
opts.bucketsCfg = c
return opts
}
func (opts remoteOpts) Open() (*DB, error) {
targetSemCount := int64(runtime.GOMAXPROCS(-1)) - 1
if targetSemCount <= 1 {
targetSemCount = 2
}
db := &DB{
opts: opts,
remoteKV: opts.remoteKV,
log: log.New("remote_db", opts.DialAddress),
buckets: kv.TableCfg{},
roTxsLimiter: semaphore.NewWeighted(targetSemCount), // 1 less than max to allow unlocking
}
customBuckets := opts.bucketsCfg
for name, cfg := range customBuckets { // copy map to avoid changing global variable
db.buckets[name] = cfg
}
return db, nil
}
func (opts remoteOpts) MustOpen() kv.RwDB {
db, err := opts.Open()
if err != nil {
panic(err)
}
return db
}
// NewRemote defines new remove KV connection (without actually opening it)
// version parameters represent the version the KV client is expecting,
// compatibility check will be performed when the KV connection opens
func NewRemote(v gointerfaces.Version, logger log.Logger, remoteKV remote.KVClient) remoteOpts {
return remoteOpts{bucketsCfg: kv.ChaindataTablesCfg, version: v, log: logger, remoteKV: remoteKV}
}
func (db *DB) PageSize() uint64 { panic("not implemented") }
func (db *DB) ReadOnly() bool { return true }
func (db *DB) AllTables() kv.TableCfg { return db.buckets }
func (db *DB) EnsureVersionCompatibility() bool {
versionReply, err := db.remoteKV.Version(context.Background(), &emptypb.Empty{}, grpc.WaitForReady(true))
if err != nil {
db.log.Error("getting Version", "error", err)
return false
}
if !gointerfaces.EnsureVersion(db.opts.version, versionReply) {
db.log.Error("incompatible interface versions", "client", db.opts.version.String(),
"server", fmt.Sprintf("%d.%d.%d", versionReply.Major, versionReply.Minor, versionReply.Patch))
return false
}
db.log.Info("interfaces compatible", "client", db.opts.version.String(),
"server", fmt.Sprintf("%d.%d.%d", versionReply.Major, versionReply.Minor, versionReply.Patch))
return true
}
func (db *DB) Close() {}
func (db *DB) CHandle() unsafe.Pointer {
panic("CHandle not implemented")
}
func (db *DB) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
return nil, semErr
}
defer func() {
// ensure we release the semaphore on error
if txn == nil {
db.roTxsLimiter.Release(1)
}
}()
streamCtx, streamCancelFn := context.WithCancel(ctx) // We create child context for the stream so we can cancel it to prevent leak
stream, err := db.remoteKV.Tx(streamCtx)
if err != nil {
streamCancelFn()
return nil, err
}
msg, err := stream.Recv()
if err != nil {
streamCancelFn()
return nil, err
}
return &tx{ctx: ctx, db: db, stream: stream, streamCancelFn: streamCancelFn, viewID: msg.ViewId, id: msg.TxId}, nil
}
func (db *DB) BeginTemporalRo(ctx context.Context) (kv.TemporalTx, error) {
t, err := db.BeginRo(ctx)
if err != nil {
return nil, err
}
return t.(kv.TemporalTx), nil
}
func (db *DB) BeginRw(ctx context.Context) (kv.RwTx, error) {
return nil, fmt.Errorf("remote db provider doesn't support .BeginRw method")
}
func (db *DB) BeginRwNosync(ctx context.Context) (kv.RwTx, error) {
return nil, fmt.Errorf("remote db provider doesn't support .BeginRw method")
}
func (db *DB) BeginTemporalRw(ctx context.Context) (kv.RwTx, error) {
return nil, fmt.Errorf("remote db provider doesn't support .BeginTemporalRw method")
}
func (db *DB) BeginTemporalRwNosync(ctx context.Context) (kv.RwTx, error) {
return nil, fmt.Errorf("remote db provider doesn't support .BeginTemporalRwNosync method")
}
func (db *DB) View(ctx context.Context, f func(tx kv.Tx) error) (err error) {
tx, err := db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()
return f(tx)
}
func (db *DB) ViewTemporal(ctx context.Context, f func(tx kv.TemporalTx) error) (err error) {
tx, err := db.BeginTemporalRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()
return f(tx)
}
func (db *DB) Update(ctx context.Context, f func(tx kv.RwTx) error) (err error) {
return fmt.Errorf("remote db provider doesn't support .Update method")
}
func (db *DB) UpdateNosync(ctx context.Context, f func(tx kv.RwTx) error) (err error) {
return fmt.Errorf("remote db provider doesn't support .UpdateNosync method")
}
func (tx *tx) ViewID() uint64 { return tx.viewID }
func (tx *tx) CollectMetrics() {}
func (tx *tx) IncrementSequence(bucket string, amount uint64) (uint64, error) {
panic("not implemented yet")
}
func (tx *tx) ReadSequence(bucket string) (uint64, error) {
panic("not implemented yet")
}
func (tx *tx) Append(bucket string, k, v []byte) error { panic("no write methods") }
func (tx *tx) AppendDup(bucket string, k, v []byte) error { panic("no write methods") }
func (tx *tx) Commit() error {
panic("remote db is read-only")
}
func (tx *tx) Rollback() {
// don't close opened cursors - just close stream, server will cleanup everything well
tx.closeGrpcStream()
tx.db.roTxsLimiter.Release(1)
for _, c := range tx.streams {
c.Close()
}
}
func (tx *tx) DBSize() (uint64, error) { panic("not implemented") }
func (tx *tx) statelessCursor(bucket string) (kv.Cursor, error) {
if tx.statelessCursors == nil {
tx.statelessCursors = make(map[string]kv.Cursor)
}
c, ok := tx.statelessCursors[bucket]
if !ok {
var err error
c, err = tx.Cursor(bucket)
if err != nil {
return nil, err
}
tx.statelessCursors[bucket] = c
}
return c, nil
}
func (tx *tx) BucketSize(name string) (uint64, error) { panic("not implemented") }
func (tx *tx) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error {
it, err := tx.Range(bucket, fromPrefix, nil)
if err != nil {
return err
}
for it.HasNext() {
k, v, err := it.Next()
if err != nil {
return err
}
if err := walker(k, v); err != nil {
return err
}
}
return nil
}
func (tx *tx) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error {
it, err := tx.Prefix(bucket, prefix)
if err != nil {
return err
}
for it.HasNext() {
k, v, err := it.Next()
if err != nil {
return err
}
if err := walker(k, v); err != nil {
return err
}
}
return nil
}
// TODO: this must be deprecated
func (tx *tx) ForAmount(bucket string, fromPrefix []byte, amount uint32, walker func(k, v []byte) error) error {
if amount == 0 {
return nil
}
c, err := tx.Cursor(bucket)
if err != nil {
return err
}
defer c.Close()
for k, v, err := c.Seek(fromPrefix); k != nil && amount > 0; k, v, err = c.Next() {
if err != nil {
return err
}
if err := walker(k, v); err != nil {
return err
}
amount--
}
return nil
}
func (tx *tx) GetOne(bucket string, k []byte) (val []byte, err error) {
c, err := tx.statelessCursor(bucket)
if err != nil {
return nil, err
}
_, val, err = c.SeekExact(k)
return val, err
}
func (tx *tx) Has(bucket string, k []byte) (bool, error) {
c, err := tx.statelessCursor(bucket)
if err != nil {
return false, err
}
kk, _, err := c.Seek(k)
if err != nil {
return false, err
}
return bytes.Equal(k, kk), nil
}
func (c *remoteCursor) SeekExact(k []byte) (key, val []byte, err error) {
return c.seekExact(k)
}
func (c *remoteCursor) Prev() ([]byte, []byte, error) {
return c.prev()
}
func (tx *tx) Cursor(bucket string) (kv.Cursor, error) {
b := tx.db.buckets[bucket]
c := &remoteCursor{tx: tx, ctx: tx.ctx, bucketName: bucket, bucketCfg: b, stream: tx.stream}
tx.cursors = append(tx.cursors, c)
if err := c.stream.Send(&remote.Cursor{Op: remote.Op_OPEN, BucketName: c.bucketName}); err != nil {
return nil, err
}
msg, err := c.stream.Recv()
if err != nil {
return nil, err
}
c.id = msg.CursorId
return c, nil
}
func (tx *tx) ListBuckets() ([]string, error) {
return nil, fmt.Errorf("function ListBuckets is not implemented for remoteTx")
}
// func (c *remoteCursor) Put(k []byte, v []byte) error { panic("not supported") }
// func (c *remoteCursor) PutNoOverwrite(k []byte, v []byte) error { panic("not supported") }
// func (c *remoteCursor) Append(k []byte, v []byte) error { panic("not supported") }
// func (c *remoteCursor) Delete(k []byte) error { panic("not supported") }
// func (c *remoteCursor) DeleteCurrent() error { panic("not supported") }
func (c *remoteCursor) Count() (uint64, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_COUNT}); err != nil {
return 0, err
}
pair, err := c.stream.Recv()
if err != nil {
return 0, err
}
return binary.BigEndian.Uint64(pair.V), nil
}
func (c *remoteCursor) first() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_FIRST}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) next() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_NEXT}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) nextDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_NEXT_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) nextNoDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_NEXT_NO_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) prev() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_PREV}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) prevDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_PREV_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) prevNoDup() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_PREV_NO_DUP}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) last() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_LAST}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) setRange(k []byte) ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK, K: k}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) seekExact(k []byte) ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK_EXACT, K: k}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) getBothRange(k, v []byte) ([]byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK_BOTH, K: k, V: v}); err != nil {
return nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return nil, err
}
return pair.V, nil
}
func (c *remoteCursor) seekBothExact(k, v []byte) ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_SEEK_BOTH_EXACT, K: k, V: v}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) firstDup() ([]byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_FIRST_DUP}); err != nil {
return nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return nil, err
}
return pair.V, nil
}
func (c *remoteCursor) lastDup() ([]byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_LAST_DUP}); err != nil {
return nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return nil, err
}
return pair.V, nil
}
func (c *remoteCursor) getCurrent() ([]byte, []byte, error) {
if err := c.stream.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_CURRENT}); err != nil {
return []byte{}, nil, err
}
pair, err := c.stream.Recv()
if err != nil {
return []byte{}, nil, err
}
return pair.K, pair.V, nil
}
func (c *remoteCursor) Current() ([]byte, []byte, error) {
return c.getCurrent()
}
// Seek - doesn't start streaming (because much of code does only several .Seek calls without reading sequence of data)
// .Next() - does request streaming (if configured by user)
func (c *remoteCursor) Seek(seek []byte) ([]byte, []byte, error) {
return c.setRange(seek)
}
func (c *remoteCursor) First() ([]byte, []byte, error) {
return c.first()
}
// Next - returns next data element from server, request streaming (if configured by user)
func (c *remoteCursor) Next() ([]byte, []byte, error) {
return c.next()
}
func (c *remoteCursor) Last() ([]byte, []byte, error) {
return c.last()
}
func (tx *tx) closeGrpcStream() {
if tx.stream == nil {
return
}
defer tx.streamCancelFn() // hard cancel stream if graceful wasn't successful
if tx.streamingRequested {
// if streaming is in progress, can't use `CloseSend` - because
// server will not read it right not - it busy with streaming data
// TODO: set flag 'tx.streamingRequested' to false when got terminator from server (nil key or os.EOF)
tx.streamCancelFn()
} else {
// try graceful close stream
err := tx.stream.CloseSend()
if err != nil {
doLog := !grpcutil.IsEndOfStream(err)
if doLog {
log.Warn("couldn't send msg CloseSend to server", "err", err)
}
} else {
_, err = tx.stream.Recv()
if err != nil {
doLog := !grpcutil.IsEndOfStream(err)
if doLog {
log.Warn("received unexpected error from server after CloseSend", "err", err)
}
}
}
}
tx.stream = nil
tx.streamingRequested = false
}
func (c *remoteCursor) Close() {
if c.stream == nil {
return
}
st := c.stream
c.stream = nil
if err := st.Send(&remote.Cursor{Cursor: c.id, Op: remote.Op_CLOSE}); err == nil {
_, _ = st.Recv()
}
}
func (tx *tx) CursorDupSort(bucket string) (kv.CursorDupSort, error) {
b := tx.db.buckets[bucket]
c := &remoteCursor{tx: tx, ctx: tx.ctx, bucketName: bucket, bucketCfg: b, stream: tx.stream}
tx.cursors = append(tx.cursors, c)
if err := c.stream.Send(&remote.Cursor{Op: remote.Op_OPEN_DUP_SORT, BucketName: c.bucketName}); err != nil {
return nil, err
}
msg, err := c.stream.Recv()
if err != nil {
return nil, err
}
c.id = msg.CursorId
return &remoteCursorDupSort{remoteCursor: c}, nil
}
func (c *remoteCursorDupSort) SeekBothExact(k, v []byte) ([]byte, []byte, error) {
return c.seekBothExact(k, v)
}
func (c *remoteCursorDupSort) SeekBothRange(k, v []byte) ([]byte, error) {
return c.getBothRange(k, v)
}
func (c *remoteCursorDupSort) DeleteExact(k1, k2 []byte) error { panic("not supported") }
func (c *remoteCursorDupSort) AppendDup(k []byte, v []byte) error { panic("not supported") }
func (c *remoteCursorDupSort) PutNoDupData(k, v []byte) error { panic("not supported") }
func (c *remoteCursorDupSort) DeleteCurrentDuplicates() error { panic("not supported") }
func (c *remoteCursorDupSort) CountDuplicates() (uint64, error) { panic("not supported") }
func (c *remoteCursorDupSort) FirstDup() ([]byte, error) { return c.firstDup() }
func (c *remoteCursorDupSort) NextDup() ([]byte, []byte, error) { return c.nextDup() }
func (c *remoteCursorDupSort) NextNoDup() ([]byte, []byte, error) { return c.nextNoDup() }
func (c *remoteCursorDupSort) PrevDup() ([]byte, []byte, error) { return c.prevDup() }
func (c *remoteCursorDupSort) PrevNoDup() ([]byte, []byte, error) { return c.prevNoDup() }
func (c *remoteCursorDupSort) LastDup() ([]byte, error) { return c.lastDup() }
// Temporal Methods
func (tx *tx) DomainGetAsOf(name kv.Domain, k, k2 []byte, ts uint64) (v []byte, ok bool, err error) {
reply, err := tx.db.remoteKV.DomainGet(tx.ctx, &remote.DomainGetReq{TxId: tx.id, Table: string(name), K: k, K2: k2, Ts: ts})
if err != nil {
return nil, false, err
}
return reply.V, reply.Ok, nil
}
func (tx *tx) DomainGet(name kv.Domain, k, k2 []byte) (v []byte, ok bool, err error) {
reply, err := tx.db.remoteKV.DomainGet(tx.ctx, &remote.DomainGetReq{TxId: tx.id, Table: string(name), K: k, K2: k2, Latest: true})
if err != nil {
return nil, false, err
}
return reply.V, reply.Ok, nil
}
func (tx *tx) DomainRange(name kv.Domain, fromKey, toKey []byte, ts uint64, asc order.By, limit int) (it iter.KV, err error) {
return iter.PaginateKV(func(pageToken string) (keys, vals [][]byte, nextPageToken string, err error) {
reply, err := tx.db.remoteKV.DomainRange(tx.ctx, &remote.DomainRangeReq{TxId: tx.id, Table: string(name), FromKey: fromKey, ToKey: toKey, Ts: ts, OrderAscend: bool(asc), Limit: int64(limit)})
if err != nil {
return nil, nil, "", err
}
return reply.Keys, reply.Values, reply.NextPageToken, nil
}), nil
}
func (tx *tx) HistoryGet(name kv.History, k []byte, ts uint64) (v []byte, ok bool, err error) {
reply, err := tx.db.remoteKV.HistoryGet(tx.ctx, &remote.HistoryGetReq{TxId: tx.id, Table: string(name), K: k, Ts: ts})
if err != nil {
return nil, false, err
}
return reply.V, reply.Ok, nil
}
func (tx *tx) HistoryRange(name kv.History, fromTs, toTs int, asc order.By, limit int) (it iter.KV, err error) {
return iter.PaginateKV(func(pageToken string) (keys, vals [][]byte, nextPageToken string, err error) {
reply, err := tx.db.remoteKV.HistoryRange(tx.ctx, &remote.HistoryRangeReq{TxId: tx.id, Table: string(name), FromTs: int64(fromTs), ToTs: int64(toTs), OrderAscend: bool(asc), Limit: int64(limit)})
if err != nil {
return nil, nil, "", err
}
return reply.Keys, reply.Values, reply.NextPageToken, nil
}), nil
}
func (tx *tx) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs int, asc order.By, limit int) (timestamps iter.U64, err error) {
return iter.PaginateU64(func(pageToken string) (arr []uint64, nextPageToken string, err error) {
req := &remote.IndexRangeReq{TxId: tx.id, Table: string(name), K: k, FromTs: int64(fromTs), ToTs: int64(toTs), OrderAscend: bool(asc), Limit: int64(limit)}
reply, err := tx.db.remoteKV.IndexRange(tx.ctx, req)
if err != nil {
return nil, "", err
}
return reply.Timestamps, reply.NextPageToken, nil
}), nil
}
func (tx *tx) Prefix(table string, prefix []byte) (iter.KV, error) {
nextPrefix, ok := kv.NextSubtree(prefix)
if !ok {
return tx.Range(table, prefix, nil)
}
return tx.Range(table, prefix, nextPrefix)
}
func (tx *tx) rangeOrderLimit(table string, fromPrefix, toPrefix []byte, asc order.By, limit int) (iter.KV, error) {
return iter.PaginateKV(func(pageToken string) (keys [][]byte, values [][]byte, nextPageToken string, err error) {
req := &remote.RangeReq{TxId: tx.id, Table: table, FromPrefix: fromPrefix, ToPrefix: toPrefix, OrderAscend: bool(asc), Limit: int64(limit)}
reply, err := tx.db.remoteKV.Range(tx.ctx, req)
if err != nil {
return nil, nil, "", err
}
return reply.Keys, reply.Values, reply.NextPageToken, nil
}), nil
}
func (tx *tx) Range(table string, fromPrefix, toPrefix []byte) (iter.KV, error) {
return tx.rangeOrderLimit(table, fromPrefix, toPrefix, order.Asc, -1)
}
func (tx *tx) RangeAscend(table string, fromPrefix, toPrefix []byte, limit int) (iter.KV, error) {
return tx.rangeOrderLimit(table, fromPrefix, toPrefix, order.Asc, limit)
}
func (tx *tx) RangeDescend(table string, fromPrefix, toPrefix []byte, limit int) (iter.KV, error) {
return tx.rangeOrderLimit(table, fromPrefix, toPrefix, order.Desc, limit)
}
func (tx *tx) RangeDupSort(table string, key []byte, fromPrefix, toPrefix []byte, asc order.By, limit int) (iter.KV, error) {
panic("not implemented yet")
}
func (tx *tx) CHandle() unsafe.Pointer {
panic("CHandle not implemented")
}