diff --git a/kv/kv_interface.go b/kv/kv_interface.go index df1e7cfd8..43f042c65 100644 --- a/kv/kv_interface.go +++ b/kv/kv_interface.go @@ -31,12 +31,14 @@ import ( // RwTx - Read-Write Database Transaction // k - key // v - value +// Cursor - low-level mdbx-tide api to walk over Table +// Stream - high-level simplified api for iteration over Table, InvertedIndex, History, Domain, ... //Methods Naming: // Get: exact match of criterias -// Range: [from, to) -// Each: [from, INF) -// Prefix: Has(k, prefix) +// Range: [from, to). Range(from, nil) means [from, EndOfTable). Range(nil, to) means [StartOfTable, to). +// Each: Range(from, nil) +// Prefix: `Range(Table, prefix, dbutils.NextSubtree(prefix))` // Amount: [from, INF) AND maximum N records const ReadersLimit = 32000 // MDBX_READERS_LIMIT=32767 @@ -105,7 +107,7 @@ func (l Label) String() string { type Has interface { // Has indicates whether a key exists in the database. - Has(bucket string, key []byte) (bool, error) + Has(table string, key []byte) (bool, error) } type GetPut interface { Getter @@ -115,16 +117,16 @@ type Getter interface { Has // GetOne references a readonly section of memory that must not be accessed after txn has terminated - GetOne(bucket string, key []byte) (val []byte, err error) + GetOne(table string, key []byte) (val []byte, err error) // ForEach iterates over entries with keys greater or equal to fromPrefix. // walker is called for each eligible entry. // If walker returns an error: // - implementations of local db - stop // - implementations of remote db - do not handle this error and may finish (send all entries to client) before error happen. - ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error - ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error - ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error + ForEach(table string, fromPrefix []byte, walker func(k, v []byte) error) error + ForPrefix(table string, prefix []byte, walker func(k, v []byte) error) error + ForAmount(table string, prefix []byte, amount uint32, walker func(k, v []byte) error) error } // Putter wraps the database write operations. @@ -211,9 +213,9 @@ type StatelessReadTx interface { // Can be called for a read transaction to retrieve the current sequence value, and the increment must be zero. // Sequence changes become visible outside the current write transaction after it is committed, and discarded on abort. // Starts from 0. - ReadSequence(bucket string) (uint64, error) + ReadSequence(table string) (uint64, error) - BucketSize(bucket string) (uint64, error) + BucketSize(table string) (uint64, error) } type StatelessWriteTx interface { @@ -239,9 +241,9 @@ type StatelessWriteTx interface { } // use id */ - IncrementSequence(bucket string, amount uint64) (uint64, error) - Append(bucket string, k, v []byte) error - AppendDup(bucket string, k, v []byte) error + IncrementSequence(table string, amount uint64) (uint64, error) + Append(table string, k, v []byte) error + AppendDup(table string, k, v []byte) error } type StatelessRwTx interface { @@ -267,12 +269,14 @@ type Tx interface { // // Cursor, also provides a grain of magic - it can use a declarative configuration - and automatically break // long keys into DupSort key/values. See docs for `bucket.go:TableCfgItem` - Cursor(bucket string) (Cursor, error) - CursorDupSort(bucket string) (CursorDupSort, error) // CursorDupSort - can be used if bucket has mdbx.DupSort flag + Cursor(table string) (Cursor, error) + CursorDupSort(table string) (CursorDupSort, error) // CursorDupSort - can be used if bucket has mdbx.DupSort flag - ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error - ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error - ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error + ForEach(table string, fromPrefix []byte, walker func(k, v []byte) error) error + ForPrefix(table string, prefix []byte, walker func(k, v []byte) error) error + ForAmount(table string, prefix []byte, amount uint32, walker func(k, v []byte) error) error + + Range(table string, fromPrefix, toPrefix []byte) (Pairs, error) DBSize() (uint64, error) } @@ -288,8 +292,8 @@ type RwTx interface { StatelessWriteTx BucketMigrator - RwCursor(bucket string) (RwCursor, error) - RwCursorDupSort(bucket string) (RwCursorDupSort, error) + RwCursor(table string) (RwCursor, error) + RwCursorDupSort(table string) (RwCursorDupSort, error) // CollectMetrics - does collect all DB-related and Tx-related metrics // this method exists only in RwTx to avoid concurrency @@ -396,13 +400,11 @@ type TemporalRwDB interface { } // Stream - Iterator-like interface designed for grpc server-side streaming: 1 client request -> much responses from server -// Grpc Server send batch(array) of values. Client can process one-by-one by .Next() method, or use more performant .NextBatch() -// Iter is very limited - client has no way to terminate it (but client can cancel whole read transaction) -// Tx does 1-1 match to "grpc-stream". During 1 TX - can be created many `Iter`, `Cursor`. -// -// No `Close` method: all streams produced by TemporalTx will be closed inside `tx.Rollback()` (by casting to `kv.Closer`) -// -// K, V are valid only until next .Next() call +// - K, V are valid only until next .Next() call +// - No `Close` method: all streams produced by TemporalTx will be closed inside `tx.Rollback()` (by casting to `kv.Closer`) +// - automatically checks cancelation of `ctx` passed to `db.Begin(ctx)`, can skip this +// check in loops on stream. Stream has very limited API - user has no way to +// terminate it - but user can specify more strict conditions when creating stream (then server knows better when to stop) type Stream[K, V any] interface { Next() (K, V, error) HasNext() bool @@ -412,6 +414,7 @@ type UnaryStream[V any] interface { NextBatch() ([]V, error) HasNext() bool } +type Pairs Stream[[]byte, []byte] type ArrStream[V any] struct { arr []V diff --git a/kv/mdbx/kv_mdbx.go b/kv/mdbx/kv_mdbx.go index 8d1dca662..d3d627c5b 100644 --- a/kv/mdbx/kv_mdbx.go +++ b/kv/mdbx/kv_mdbx.go @@ -476,16 +476,21 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) { return nil, fmt.Errorf("%w, label: %s, trace: %s", err, db.opts.label.String(), stack2.Trace().String()) } return &MdbxTx{ + ctx: ctx, db: db, tx: tx, readOnly: true, }, nil } -func (db *MdbxKV) BeginRw(_ context.Context) (kv.RwTx, error) { return db.beginRw(0) } -func (db *MdbxKV) BeginRwAsync(_ context.Context) (kv.RwTx, error) { return db.beginRw(mdbx.TxNoSync) } +func (db *MdbxKV) BeginRw(ctx context.Context) (kv.RwTx, error) { + return db.beginRw(ctx, 0) +} +func (db *MdbxKV) BeginRwAsync(ctx context.Context) (kv.RwTx, error) { + return db.beginRw(ctx, mdbx.TxNoSync) +} -func (db *MdbxKV) beginRw(flags uint) (txn kv.RwTx, err error) { +func (db *MdbxKV) beginRw(ctx context.Context, flags uint) (txn kv.RwTx, err error) { if db.closed.Load() { return nil, fmt.Errorf("db closed") } @@ -502,8 +507,9 @@ func (db *MdbxKV) beginRw(flags uint) (txn kv.RwTx, err error) { return nil, fmt.Errorf("%w, lable: %s, trace: %s", err, db.opts.label.String(), stack2.Trace().String()) } return &MdbxTx{ - db: db, - tx: tx, + db: db, + tx: tx, + ctx: ctx, }, nil } @@ -511,9 +517,11 @@ type MdbxTx struct { tx *mdbx.Txn db *MdbxKV cursors map[uint64]*mdbx.Cursor + streams []kv.Closer statelessCursors map[string]kv.Cursor readOnly bool cursorID uint64 + ctx context.Context } type MdbxCursor struct { @@ -579,6 +587,58 @@ func (tx *MdbxTx) ForPrefix(bucket string, prefix []byte, walker func(k, v []byt } return nil } + +func (tx *MdbxTx) Range(table string, fromPrefix, toPrefix []byte) (kv.Pairs, error) { + if toPrefix != nil && bytes.Compare(fromPrefix, toPrefix) >= 0 { + return nil, fmt.Errorf("tx.Range: %x must be lexicographicaly before %x", fromPrefix, toPrefix) + } + s, err := tx.newStreamCursor(table) + if err != nil { + return nil, err + } + s.toPrefix = toPrefix + s.nextK, s.nextV, s.nextErr = s.c.Seek(fromPrefix) + return s, nil +} +func (tx *MdbxTx) newStreamCursor(table string) (*cursor2stream, error) { + c, err := tx.Cursor(table) + if err != nil { + return nil, err + } + s := &cursor2stream{c: c, ctx: tx.ctx} + tx.streams = append(tx.streams, s) + return s, nil +} + +type cursor2stream struct { + c kv.Cursor + nextK, nextV []byte + nextErr error + toPrefix []byte + ctx context.Context +} + +func (s *cursor2stream) Close() { s.c.Close() } +func (s *cursor2stream) HasNext() bool { + if s.toPrefix == nil { + return s.nextK != nil + } + if s.nextK == nil { + return false + } + return bytes.Compare(s.nextK, s.toPrefix) < 0 +} +func (s *cursor2stream) Next() ([]byte, []byte, error) { + k, v, err := s.nextK, s.nextV, s.nextErr + select { + case <-s.ctx.Done(): + return nil, nil, s.ctx.Err() + default: + } + s.nextK, s.nextV, s.nextErr = s.c.Next() + return k, v, err +} + func (tx *MdbxTx) ForAmount(bucket string, fromPrefix []byte, amount uint32, walker func(k, v []byte) error) error { if amount == 0 { return nil @@ -917,6 +977,11 @@ func (tx *MdbxTx) closeCursors() { } } tx.cursors = nil + for _, c := range tx.streams { + if c != nil { + c.Close() + } + } tx.statelessCursors = nil } diff --git a/kv/mdbx/kv_mdbx_test.go b/kv/mdbx/kv_mdbx_test.go index 131d06085..9ca2132ac 100644 --- a/kv/mdbx/kv_mdbx_test.go +++ b/kv/mdbx/kv_mdbx_test.go @@ -88,6 +88,40 @@ func TestSeekBothRange(t *testing.T) { require.Equal(t, "value3.3", string(v)) } +func TestRange(t *testing.T) { + _, tx, _ := BaseCase(t) + + //[from, to) + it, err := tx.Range("Table", []byte("key1"), []byte("key3")) + require.NoError(t, err) + require.True(t, it.HasNext()) + k, v, err := it.Next() + require.NoError(t, err) + require.Equal(t, "key1", string(k)) + require.Equal(t, "value1.1", string(v)) + + require.True(t, it.HasNext()) + k, v, err = it.Next() + require.NoError(t, err) + require.Equal(t, "key1", string(k)) + require.Equal(t, "value1.3", string(v)) + + require.False(t, it.HasNext()) + require.False(t, it.HasNext()) + + // [from, nil) means [from, INF) + it, err = tx.Range("Table", []byte("key1"), nil) + require.NoError(t, err) + cnt := 0 + for it.HasNext() { + _, _, err := it.Next() + require.NoError(t, err) + cnt++ + } + require.Equal(t, 4, cnt) + +} + func TestLastDup(t *testing.T) { db, tx, _ := BaseCase(t) diff --git a/kv/memdb/memory_mutation.go b/kv/memdb/memory_mutation.go index 3a74b13df..aac6c232e 100644 --- a/kv/memdb/memory_mutation.go +++ b/kv/memdb/memory_mutation.go @@ -205,6 +205,10 @@ func (m *MemoryMutation) ForEach(bucket string, fromPrefix []byte, walker func(k return nil } +func (m *MemoryMutation) Range(table string, fromPrefix, toPrefix []byte) (kv.Pairs, error) { + panic("please implement me") +} + func (m *MemoryMutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { c, err := m.Cursor(bucket) if err != nil { diff --git a/kv/remotedb/kv_remote.go b/kv/remotedb/kv_remote.go index a6aa07ba8..859a26430 100644 --- a/kv/remotedb/kv_remote.go +++ b/kv/remotedb/kv_remote.go @@ -45,6 +45,7 @@ type remoteTx struct { db *RemoteKV statelessCursors map[string]kv.Cursor cursors []*remoteCursor + streams []kv.Closer viewID, id uint64 streamingRequested bool } @@ -203,6 +204,9 @@ func (tx *remoteTx) Rollback() { // don't close opened cursors - just close stream, server will cleanup everything well tx.closeGrpcStream() tx.db.roTxsLimiter.Release(1) + for _, c := range tx.streams { + c.Close() + } } func (tx *remoteTx) DBSize() (uint64, error) { panic("not implemented") } @@ -287,6 +291,55 @@ func (tx *remoteTx) ForAmount(bucket string, fromPrefix []byte, amount uint32, w return nil } +// TODO: implement by server-side stream +func (tx *remoteTx) Range(table string, fromPrefix, toPrefix []byte) (kv.Pairs, error) { + s, err := tx.newStreamCursor(table) + if err != nil { + return nil, err + } + s.toPrefix = toPrefix + s.nextK, s.nextV, s.nextErr = s.c.Seek(fromPrefix) + return s, nil +} +func (tx *remoteTx) newStreamCursor(table string) (*cursor2stream, error) { + c, err := tx.Cursor(table) + if err != nil { + return nil, err + } + s := &cursor2stream{c: c, ctx: tx.ctx} + tx.streams = append(tx.streams, s) + return s, nil +} + +type cursor2stream struct { + c kv.Cursor + nextK, nextV []byte + nextErr error + toPrefix []byte + ctx context.Context +} + +func (s *cursor2stream) Close() { s.c.Close() } +func (s *cursor2stream) HasNext() bool { + if s.toPrefix == nil { + return s.nextK != nil + } + if s.nextK == nil { + return false + } + return bytes.Compare(s.nextK, s.toPrefix) < 0 +} +func (s *cursor2stream) Next() ([]byte, []byte, error) { + k, v, err := s.nextK, s.nextV, s.nextErr + select { + case <-s.ctx.Done(): + return nil, nil, s.ctx.Err() + default: + } + s.nextK, s.nextV, s.nextErr = s.c.Next() + return k, v, err +} + func (tx *remoteTx) GetOne(bucket string, k []byte) (val []byte, err error) { c, err := tx.statelessCursor(bucket) if err != nil {