KV: tx.Range() with stream api (#803)

This commit is contained in:
Alex Sharov 2022-12-25 17:40:04 +07:00 committed by GitHub
parent 1371990bed
commit 0aeae327e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 191 additions and 32 deletions

View File

@ -31,12 +31,14 @@ import (
// RwTx - Read-Write Database Transaction
// k - key
// v - value
// Cursor - low-level mdbx-tide api to walk over Table
// Stream - high-level simplified api for iteration over Table, InvertedIndex, History, Domain, ...
//Methods Naming:
// Get: exact match of criterias
// Range: [from, to)
// Each: [from, INF)
// Prefix: Has(k, prefix)
// Range: [from, to). Range(from, nil) means [from, EndOfTable). Range(nil, to) means [StartOfTable, to).
// Each: Range(from, nil)
// Prefix: `Range(Table, prefix, dbutils.NextSubtree(prefix))`
// Amount: [from, INF) AND maximum N records
const ReadersLimit = 32000 // MDBX_READERS_LIMIT=32767
@ -105,7 +107,7 @@ func (l Label) String() string {
type Has interface {
// Has indicates whether a key exists in the database.
Has(bucket string, key []byte) (bool, error)
Has(table string, key []byte) (bool, error)
}
type GetPut interface {
Getter
@ -115,16 +117,16 @@ type Getter interface {
Has
// GetOne references a readonly section of memory that must not be accessed after txn has terminated
GetOne(bucket string, key []byte) (val []byte, err error)
GetOne(table string, key []byte) (val []byte, err error)
// ForEach iterates over entries with keys greater or equal to fromPrefix.
// walker is called for each eligible entry.
// If walker returns an error:
// - implementations of local db - stop
// - implementations of remote db - do not handle this error and may finish (send all entries to client) before error happen.
ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error
ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error
ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error
ForEach(table string, fromPrefix []byte, walker func(k, v []byte) error) error
ForPrefix(table string, prefix []byte, walker func(k, v []byte) error) error
ForAmount(table string, prefix []byte, amount uint32, walker func(k, v []byte) error) error
}
// Putter wraps the database write operations.
@ -211,9 +213,9 @@ type StatelessReadTx interface {
// Can be called for a read transaction to retrieve the current sequence value, and the increment must be zero.
// Sequence changes become visible outside the current write transaction after it is committed, and discarded on abort.
// Starts from 0.
ReadSequence(bucket string) (uint64, error)
ReadSequence(table string) (uint64, error)
BucketSize(bucket string) (uint64, error)
BucketSize(table string) (uint64, error)
}
type StatelessWriteTx interface {
@ -239,9 +241,9 @@ type StatelessWriteTx interface {
}
// use id
*/
IncrementSequence(bucket string, amount uint64) (uint64, error)
Append(bucket string, k, v []byte) error
AppendDup(bucket string, k, v []byte) error
IncrementSequence(table string, amount uint64) (uint64, error)
Append(table string, k, v []byte) error
AppendDup(table string, k, v []byte) error
}
type StatelessRwTx interface {
@ -267,12 +269,14 @@ type Tx interface {
//
// Cursor, also provides a grain of magic - it can use a declarative configuration - and automatically break
// long keys into DupSort key/values. See docs for `bucket.go:TableCfgItem`
Cursor(bucket string) (Cursor, error)
CursorDupSort(bucket string) (CursorDupSort, error) // CursorDupSort - can be used if bucket has mdbx.DupSort flag
Cursor(table string) (Cursor, error)
CursorDupSort(table string) (CursorDupSort, error) // CursorDupSort - can be used if bucket has mdbx.DupSort flag
ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error
ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error
ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error
ForEach(table string, fromPrefix []byte, walker func(k, v []byte) error) error
ForPrefix(table string, prefix []byte, walker func(k, v []byte) error) error
ForAmount(table string, prefix []byte, amount uint32, walker func(k, v []byte) error) error
Range(table string, fromPrefix, toPrefix []byte) (Pairs, error)
DBSize() (uint64, error)
}
@ -288,8 +292,8 @@ type RwTx interface {
StatelessWriteTx
BucketMigrator
RwCursor(bucket string) (RwCursor, error)
RwCursorDupSort(bucket string) (RwCursorDupSort, error)
RwCursor(table string) (RwCursor, error)
RwCursorDupSort(table string) (RwCursorDupSort, error)
// CollectMetrics - does collect all DB-related and Tx-related metrics
// this method exists only in RwTx to avoid concurrency
@ -396,13 +400,11 @@ type TemporalRwDB interface {
}
// Stream - Iterator-like interface designed for grpc server-side streaming: 1 client request -> much responses from server
// Grpc Server send batch(array) of values. Client can process one-by-one by .Next() method, or use more performant .NextBatch()
// Iter is very limited - client has no way to terminate it (but client can cancel whole read transaction)
// Tx does 1-1 match to "grpc-stream". During 1 TX - can be created many `Iter`, `Cursor`.
//
// No `Close` method: all streams produced by TemporalTx will be closed inside `tx.Rollback()` (by casting to `kv.Closer`)
//
// K, V are valid only until next .Next() call
// - K, V are valid only until next .Next() call
// - No `Close` method: all streams produced by TemporalTx will be closed inside `tx.Rollback()` (by casting to `kv.Closer`)
// - automatically checks cancelation of `ctx` passed to `db.Begin(ctx)`, can skip this
// check in loops on stream. Stream has very limited API - user has no way to
// terminate it - but user can specify more strict conditions when creating stream (then server knows better when to stop)
type Stream[K, V any] interface {
Next() (K, V, error)
HasNext() bool
@ -412,6 +414,7 @@ type UnaryStream[V any] interface {
NextBatch() ([]V, error)
HasNext() bool
}
type Pairs Stream[[]byte, []byte]
type ArrStream[V any] struct {
arr []V

View File

@ -476,16 +476,21 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
return nil, fmt.Errorf("%w, label: %s, trace: %s", err, db.opts.label.String(), stack2.Trace().String())
}
return &MdbxTx{
ctx: ctx,
db: db,
tx: tx,
readOnly: true,
}, nil
}
func (db *MdbxKV) BeginRw(_ context.Context) (kv.RwTx, error) { return db.beginRw(0) }
func (db *MdbxKV) BeginRwAsync(_ context.Context) (kv.RwTx, error) { return db.beginRw(mdbx.TxNoSync) }
func (db *MdbxKV) BeginRw(ctx context.Context) (kv.RwTx, error) {
return db.beginRw(ctx, 0)
}
func (db *MdbxKV) BeginRwAsync(ctx context.Context) (kv.RwTx, error) {
return db.beginRw(ctx, mdbx.TxNoSync)
}
func (db *MdbxKV) beginRw(flags uint) (txn kv.RwTx, err error) {
func (db *MdbxKV) beginRw(ctx context.Context, flags uint) (txn kv.RwTx, err error) {
if db.closed.Load() {
return nil, fmt.Errorf("db closed")
}
@ -502,8 +507,9 @@ func (db *MdbxKV) beginRw(flags uint) (txn kv.RwTx, err error) {
return nil, fmt.Errorf("%w, lable: %s, trace: %s", err, db.opts.label.String(), stack2.Trace().String())
}
return &MdbxTx{
db: db,
tx: tx,
db: db,
tx: tx,
ctx: ctx,
}, nil
}
@ -511,9 +517,11 @@ type MdbxTx struct {
tx *mdbx.Txn
db *MdbxKV
cursors map[uint64]*mdbx.Cursor
streams []kv.Closer
statelessCursors map[string]kv.Cursor
readOnly bool
cursorID uint64
ctx context.Context
}
type MdbxCursor struct {
@ -579,6 +587,58 @@ func (tx *MdbxTx) ForPrefix(bucket string, prefix []byte, walker func(k, v []byt
}
return nil
}
func (tx *MdbxTx) Range(table string, fromPrefix, toPrefix []byte) (kv.Pairs, error) {
if toPrefix != nil && bytes.Compare(fromPrefix, toPrefix) >= 0 {
return nil, fmt.Errorf("tx.Range: %x must be lexicographicaly before %x", fromPrefix, toPrefix)
}
s, err := tx.newStreamCursor(table)
if err != nil {
return nil, err
}
s.toPrefix = toPrefix
s.nextK, s.nextV, s.nextErr = s.c.Seek(fromPrefix)
return s, nil
}
func (tx *MdbxTx) newStreamCursor(table string) (*cursor2stream, error) {
c, err := tx.Cursor(table)
if err != nil {
return nil, err
}
s := &cursor2stream{c: c, ctx: tx.ctx}
tx.streams = append(tx.streams, s)
return s, nil
}
type cursor2stream struct {
c kv.Cursor
nextK, nextV []byte
nextErr error
toPrefix []byte
ctx context.Context
}
func (s *cursor2stream) Close() { s.c.Close() }
func (s *cursor2stream) HasNext() bool {
if s.toPrefix == nil {
return s.nextK != nil
}
if s.nextK == nil {
return false
}
return bytes.Compare(s.nextK, s.toPrefix) < 0
}
func (s *cursor2stream) Next() ([]byte, []byte, error) {
k, v, err := s.nextK, s.nextV, s.nextErr
select {
case <-s.ctx.Done():
return nil, nil, s.ctx.Err()
default:
}
s.nextK, s.nextV, s.nextErr = s.c.Next()
return k, v, err
}
func (tx *MdbxTx) ForAmount(bucket string, fromPrefix []byte, amount uint32, walker func(k, v []byte) error) error {
if amount == 0 {
return nil
@ -917,6 +977,11 @@ func (tx *MdbxTx) closeCursors() {
}
}
tx.cursors = nil
for _, c := range tx.streams {
if c != nil {
c.Close()
}
}
tx.statelessCursors = nil
}

View File

@ -88,6 +88,40 @@ func TestSeekBothRange(t *testing.T) {
require.Equal(t, "value3.3", string(v))
}
func TestRange(t *testing.T) {
_, tx, _ := BaseCase(t)
//[from, to)
it, err := tx.Range("Table", []byte("key1"), []byte("key3"))
require.NoError(t, err)
require.True(t, it.HasNext())
k, v, err := it.Next()
require.NoError(t, err)
require.Equal(t, "key1", string(k))
require.Equal(t, "value1.1", string(v))
require.True(t, it.HasNext())
k, v, err = it.Next()
require.NoError(t, err)
require.Equal(t, "key1", string(k))
require.Equal(t, "value1.3", string(v))
require.False(t, it.HasNext())
require.False(t, it.HasNext())
// [from, nil) means [from, INF)
it, err = tx.Range("Table", []byte("key1"), nil)
require.NoError(t, err)
cnt := 0
for it.HasNext() {
_, _, err := it.Next()
require.NoError(t, err)
cnt++
}
require.Equal(t, 4, cnt)
}
func TestLastDup(t *testing.T) {
db, tx, _ := BaseCase(t)

View File

@ -205,6 +205,10 @@ func (m *MemoryMutation) ForEach(bucket string, fromPrefix []byte, walker func(k
return nil
}
func (m *MemoryMutation) Range(table string, fromPrefix, toPrefix []byte) (kv.Pairs, error) {
panic("please implement me")
}
func (m *MemoryMutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error {
c, err := m.Cursor(bucket)
if err != nil {

View File

@ -45,6 +45,7 @@ type remoteTx struct {
db *RemoteKV
statelessCursors map[string]kv.Cursor
cursors []*remoteCursor
streams []kv.Closer
viewID, id uint64
streamingRequested bool
}
@ -203,6 +204,9 @@ func (tx *remoteTx) Rollback() {
// don't close opened cursors - just close stream, server will cleanup everything well
tx.closeGrpcStream()
tx.db.roTxsLimiter.Release(1)
for _, c := range tx.streams {
c.Close()
}
}
func (tx *remoteTx) DBSize() (uint64, error) { panic("not implemented") }
@ -287,6 +291,55 @@ func (tx *remoteTx) ForAmount(bucket string, fromPrefix []byte, amount uint32, w
return nil
}
// TODO: implement by server-side stream
func (tx *remoteTx) Range(table string, fromPrefix, toPrefix []byte) (kv.Pairs, error) {
s, err := tx.newStreamCursor(table)
if err != nil {
return nil, err
}
s.toPrefix = toPrefix
s.nextK, s.nextV, s.nextErr = s.c.Seek(fromPrefix)
return s, nil
}
func (tx *remoteTx) newStreamCursor(table string) (*cursor2stream, error) {
c, err := tx.Cursor(table)
if err != nil {
return nil, err
}
s := &cursor2stream{c: c, ctx: tx.ctx}
tx.streams = append(tx.streams, s)
return s, nil
}
type cursor2stream struct {
c kv.Cursor
nextK, nextV []byte
nextErr error
toPrefix []byte
ctx context.Context
}
func (s *cursor2stream) Close() { s.c.Close() }
func (s *cursor2stream) HasNext() bool {
if s.toPrefix == nil {
return s.nextK != nil
}
if s.nextK == nil {
return false
}
return bytes.Compare(s.nextK, s.toPrefix) < 0
}
func (s *cursor2stream) Next() ([]byte, []byte, error) {
k, v, err := s.nextK, s.nextV, s.nextErr
select {
case <-s.ctx.Done():
return nil, nil, s.ctx.Err()
default:
}
s.nextK, s.nextV, s.nextErr = s.c.Next()
return k, v, err
}
func (tx *remoteTx) GetOne(bucket string, k []byte) (val []byte, err error) {
c, err := tx.statelessCursor(bucket)
if err != nil {