2022-05-24 17:59:57 +00:00
/ *
Copyright 2022 Erigon contributors
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package state
import (
"bytes"
2022-05-29 18:57:09 +00:00
"container/heap"
2022-05-24 17:59:57 +00:00
"context"
"encoding/binary"
"fmt"
"io/fs"
"os"
"path/filepath"
"regexp"
"strconv"
2022-09-26 14:59:24 +00:00
"sync/atomic"
2022-07-02 18:38:34 +00:00
"time"
2022-05-24 17:59:57 +00:00
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/google/btree"
2022-09-29 05:14:45 +00:00
"github.com/ledgerwatch/erigon-lib/common/dir"
2022-09-26 14:59:24 +00:00
"github.com/ledgerwatch/log/v3"
2022-05-29 18:57:09 +00:00
"github.com/ledgerwatch/erigon-lib/common"
2022-05-24 17:59:57 +00:00
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/recsplit"
"github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32"
)
2022-05-29 18:57:09 +00:00
var (
historyValCountKey = [ ] byte ( "ValCount" )
)
2022-05-24 17:59:57 +00:00
// filesItem corresponding to a pair of files (.dat and .idx)
type filesItem struct {
startTxNum uint64
endTxNum uint64
decompressor * compress . Decompressor
index * recsplit . Index
}
2022-07-18 09:05:04 +00:00
func filesItemLess ( i , j * filesItem ) bool {
if i . endTxNum == j . endTxNum {
return i . startTxNum > j . startTxNum
}
return i . endTxNum < j . endTxNum
}
2022-06-18 21:54:36 +00:00
type DomainStats struct {
2022-09-26 14:59:24 +00:00
MergesCount uint64
HistoryQueries uint64
2022-07-02 18:38:34 +00:00
EfSearchTime time . Duration
2022-09-26 14:59:24 +00:00
DataSize uint64
IndexSize uint64
FilesCount uint64
2022-07-02 18:38:34 +00:00
}
func ( ds * DomainStats ) Accumulate ( other DomainStats ) {
ds . HistoryQueries += other . HistoryQueries
ds . EfSearchTime += other . EfSearchTime
2022-09-26 14:59:24 +00:00
ds . IndexSize += other . IndexSize
ds . DataSize += other . DataSize
ds . FilesCount += other . FilesCount
2022-06-18 21:54:36 +00:00
}
2022-05-24 17:59:57 +00:00
// Domain is a part of the state (examples are Accounts, Storage, Code)
// Domain should not have any go routines or locks
type Domain struct {
2022-07-28 07:47:13 +00:00
* History
2022-08-18 08:02:24 +00:00
keysTable string // key -> invertedStep , invertedStep = ^(txNum / aggregationStep), Needs to be table with DupSort
valsTable string // key + invertedStep -> values
2022-09-26 14:59:24 +00:00
files * btree . BTreeG [ * filesItem ] // Static files pertaining to this domain, items are of type `filesItem`
prefixLen int // Number of bytes in the keys that can be used for prefix iteration
stats DomainStats
mergesCount uint64
defaultDc * DomainContext
2022-05-24 17:59:57 +00:00
}
func NewDomain (
dir string ,
aggregationStep uint64 ,
filenameBase string ,
keysTable string ,
valsTable string ,
2022-07-28 07:47:13 +00:00
indexKeysTable string ,
2022-05-24 17:59:57 +00:00
historyValsTable string ,
2022-05-29 18:57:09 +00:00
settingsTable string ,
2022-05-24 17:59:57 +00:00
indexTable string ,
2022-05-29 18:57:09 +00:00
prefixLen int ,
2022-06-17 11:39:49 +00:00
compressVals bool ,
2022-05-24 17:59:57 +00:00
) ( * Domain , error ) {
d := & Domain {
2022-07-28 07:47:13 +00:00
keysTable : keysTable ,
valsTable : valsTable ,
prefixLen : prefixLen ,
2022-09-18 10:38:43 +00:00
files : btree . NewG [ * filesItem ] ( 32 , filesItemLess ) ,
2022-05-24 17:59:57 +00:00
}
2022-09-18 10:38:43 +00:00
var err error
2022-07-28 07:47:13 +00:00
if d . History , err = NewHistory ( dir , aggregationStep , filenameBase , indexKeysTable , indexTable , historyValsTable , settingsTable , compressVals ) ; err != nil {
return nil , err
2022-05-24 17:59:57 +00:00
}
2022-09-18 10:38:43 +00:00
files , err := os . ReadDir ( dir )
if err != nil {
return nil , err
}
2022-05-24 17:59:57 +00:00
d . scanStateFiles ( files )
2022-07-28 07:47:13 +00:00
if err = d . openFiles ( ) ; err != nil {
return nil , err
2022-05-24 17:59:57 +00:00
}
2022-07-23 08:06:52 +00:00
d . defaultDc = d . MakeContext ( )
2022-05-24 17:59:57 +00:00
return d , nil
}
2022-06-18 21:54:36 +00:00
func ( d * Domain ) GetAndResetStats ( ) DomainStats {
r := d . stats
d . stats = DomainStats { }
return r
}
2022-05-24 17:59:57 +00:00
func ( d * Domain ) scanStateFiles ( files [ ] fs . DirEntry ) {
2022-09-13 09:01:41 +00:00
re := regexp . MustCompile ( "^" + d . filenameBase + ".([0-9]+)-([0-9]+).(kv|kvi)$" )
2022-05-24 17:59:57 +00:00
var err error
for _ , f := range files {
2022-09-18 10:38:43 +00:00
if ! f . Type ( ) . IsRegular ( ) {
continue
}
2022-05-24 17:59:57 +00:00
name := f . Name ( )
subs := re . FindStringSubmatch ( name )
2022-07-28 07:47:13 +00:00
if len ( subs ) != 4 {
2022-05-24 17:59:57 +00:00
if len ( subs ) != 0 {
2022-09-26 14:59:24 +00:00
log . Warn ( "File ignored by domain scan, more than 4 submatches" , "name" , name , "submatches" , len ( subs ) )
2022-05-24 17:59:57 +00:00
}
continue
}
var startTxNum , endTxNum uint64
2022-07-28 07:47:13 +00:00
if startTxNum , err = strconv . ParseUint ( subs [ 1 ] , 10 , 64 ) ; err != nil {
2022-05-24 17:59:57 +00:00
log . Warn ( "File ignored by domain scan, parsing startTxNum" , "error" , err , "name" , name )
continue
}
2022-07-28 07:47:13 +00:00
if endTxNum , err = strconv . ParseUint ( subs [ 2 ] , 10 , 64 ) ; err != nil {
2022-05-24 17:59:57 +00:00
log . Warn ( "File ignored by domain scan, parsing endTxNum" , "error" , err , "name" , name )
continue
}
if startTxNum > endTxNum {
log . Warn ( "File ignored by domain scan, startTxNum > endTxNum" , "name" , name )
continue
}
2022-06-12 09:14:18 +00:00
var item = & filesItem { startTxNum : startTxNum * d . aggregationStep , endTxNum : endTxNum * d . aggregationStep }
2022-05-24 17:59:57 +00:00
var foundI * filesItem
2022-07-28 07:47:13 +00:00
d . files . AscendGreaterOrEqual ( & filesItem { startTxNum : endTxNum * d . aggregationStep , endTxNum : endTxNum * d . aggregationStep } , func ( it * filesItem ) bool {
2022-05-24 17:59:57 +00:00
if it . endTxNum == endTxNum {
foundI = it
}
return false
} )
if foundI == nil || foundI . startTxNum > startTxNum {
2022-07-28 07:47:13 +00:00
//log.Info("Load state file", "name", name, "startTxNum", startTxNum*d.aggregationStep, "endTxNum", endTxNum*d.aggregationStep)
d . files . ReplaceOrInsert ( item )
2022-05-24 17:59:57 +00:00
}
}
}
2022-07-28 07:47:13 +00:00
func ( d * Domain ) openFiles ( ) error {
2022-05-24 17:59:57 +00:00
var err error
var totalKeys uint64
2022-09-26 14:59:24 +00:00
invalidFileItems := make ( [ ] * filesItem , 0 )
2022-07-28 07:47:13 +00:00
d . files . Ascend ( func ( item * filesItem ) bool {
2022-09-29 05:14:45 +00:00
if item . decompressor != nil {
item . decompressor . Close ( )
}
fromStep , toStep := item . startTxNum / d . aggregationStep , item . endTxNum / d . aggregationStep
datPath := filepath . Join ( d . dir , fmt . Sprintf ( "%s.%d-%d.kv" , d . filenameBase , fromStep , toStep ) )
if ! dir . FileExist ( datPath ) {
2022-09-26 14:59:24 +00:00
invalidFileItems = append ( invalidFileItems , item )
return true
}
2022-06-02 20:40:58 +00:00
if item . decompressor , err = compress . NewDecompressor ( datPath ) ; err != nil {
2022-05-24 17:59:57 +00:00
return false
}
2022-09-26 14:59:24 +00:00
2022-09-29 05:14:45 +00:00
if item . index == nil {
idxPath := filepath . Join ( d . dir , fmt . Sprintf ( "%s.%d-%d.kvi" , d . filenameBase , fromStep , toStep ) )
if dir . FileExist ( idxPath ) {
if item . index , err = recsplit . OpenIndex ( idxPath ) ; err != nil {
log . Debug ( "InvertedIndex.openFiles: %w, %s" , err , idxPath )
return false
}
totalKeys += item . index . KeyCount ( )
}
2022-05-24 17:59:57 +00:00
}
return true
} )
if err != nil {
return err
}
2022-09-26 14:59:24 +00:00
for _ , item := range invalidFileItems {
d . files . Delete ( item )
}
2022-05-24 17:59:57 +00:00
return nil
}
2022-07-28 07:47:13 +00:00
func ( d * Domain ) closeFiles ( ) {
d . files . Ascend ( func ( item * filesItem ) bool {
2022-05-24 17:59:57 +00:00
if item . decompressor != nil {
item . decompressor . Close ( )
}
if item . index != nil {
item . index . Close ( )
}
return true
} )
}
func ( d * Domain ) Close ( ) {
// Closing state files only after background aggregation goroutine is finished
2022-07-28 07:47:13 +00:00
d . History . Close ( )
d . closeFiles ( )
2022-05-24 17:59:57 +00:00
}
2022-09-26 14:59:24 +00:00
func ( dc * DomainContext ) get ( key [ ] byte , fromTxNum uint64 , roTx kv . Tx ) ( [ ] byte , bool , error ) {
2022-05-24 17:59:57 +00:00
var invertedStep [ 8 ] byte
2022-09-26 14:59:24 +00:00
binary . BigEndian . PutUint64 ( invertedStep [ : ] , ^ ( fromTxNum / dc . d . aggregationStep ) )
2022-07-23 08:06:52 +00:00
keyCursor , err := roTx . CursorDupSort ( dc . d . keysTable )
2022-05-24 17:59:57 +00:00
if err != nil {
return nil , false , err
}
defer keyCursor . Close ( )
foundInvStep , err := keyCursor . SeekBothRange ( key , invertedStep [ : ] )
if err != nil {
return nil , false , err
}
2022-09-26 14:59:24 +00:00
if len ( foundInvStep ) == 0 {
atomic . AddUint64 ( & dc . d . stats . HistoryQueries , 1 )
v , found := dc . readFromFiles ( key , fromTxNum )
2022-05-24 17:59:57 +00:00
return v , found , nil
}
keySuffix := make ( [ ] byte , len ( key ) + 8 )
copy ( keySuffix , key )
copy ( keySuffix [ len ( key ) : ] , foundInvStep )
2022-07-23 08:06:52 +00:00
v , err := roTx . GetOne ( dc . d . valsTable , keySuffix )
2022-05-24 17:59:57 +00:00
if err != nil {
return nil , false , err
}
return v , true , nil
}
2022-07-28 07:47:13 +00:00
func ( dc * DomainContext ) Get ( key1 , key2 [ ] byte , roTx kv . Tx ) ( [ ] byte , error ) {
key := make ( [ ] byte , len ( key1 ) + len ( key2 ) )
copy ( key , key1 )
copy ( key [ len ( key1 ) : ] , key2 )
2022-09-26 14:59:24 +00:00
v , _ , err := dc . get ( key , dc . d . txNum , roTx )
2022-05-24 17:59:57 +00:00
return v , err
}
func ( d * Domain ) update ( key , original [ ] byte ) error {
var invertedStep [ 8 ] byte
binary . BigEndian . PutUint64 ( invertedStep [ : ] , ^ ( d . txNum / d . aggregationStep ) )
if err := d . tx . Put ( d . keysTable , key , invertedStep [ : ] ) ; err != nil {
return err
}
return nil
}
2022-07-28 07:47:13 +00:00
func ( d * Domain ) Put ( key1 , key2 , val [ ] byte ) error {
key := make ( [ ] byte , len ( key1 ) + len ( key2 ) )
copy ( key , key1 )
copy ( key [ len ( key1 ) : ] , key2 )
2022-09-26 14:59:24 +00:00
original , _ , err := d . defaultDc . get ( key , d . txNum , d . tx )
2022-05-24 17:59:57 +00:00
if err != nil {
return err
}
2022-07-02 18:38:34 +00:00
if bytes . Equal ( original , val ) {
return nil
}
2022-05-24 17:59:57 +00:00
// This call to update needs to happen before d.tx.Put() later, because otherwise the content of `original`` slice is invalidated
2022-07-28 07:47:13 +00:00
if err = d . History . AddPrevValue ( key1 , key2 , original ) ; err != nil {
return err
}
2022-05-24 17:59:57 +00:00
if err = d . update ( key , original ) ; err != nil {
return err
}
invertedStep := ^ ( d . txNum / d . aggregationStep )
keySuffix := make ( [ ] byte , len ( key ) + 8 )
copy ( keySuffix , key )
binary . BigEndian . PutUint64 ( keySuffix [ len ( key ) : ] , invertedStep )
if err = d . tx . Put ( d . valsTable , keySuffix , val ) ; err != nil {
return err
}
return nil
}
2022-07-28 07:47:13 +00:00
func ( d * Domain ) Delete ( key1 , key2 [ ] byte ) error {
key := make ( [ ] byte , len ( key1 ) + len ( key2 ) )
copy ( key , key1 )
copy ( key [ len ( key1 ) : ] , key2 )
2022-09-26 14:59:24 +00:00
original , found , err := d . defaultDc . get ( key , d . txNum , d . tx )
2022-05-24 17:59:57 +00:00
if err != nil {
return err
}
2022-07-02 18:38:34 +00:00
if ! found {
return nil
}
2022-05-24 17:59:57 +00:00
// This call to update needs to happen before d.tx.Delete() later, because otherwise the content of `original`` slice is invalidated
2022-07-28 07:47:13 +00:00
if err = d . History . AddPrevValue ( key1 , key2 , original ) ; err != nil {
return err
}
2022-05-24 17:59:57 +00:00
if err = d . update ( key , original ) ; err != nil {
return err
}
invertedStep := ^ ( d . txNum / d . aggregationStep )
keySuffix := make ( [ ] byte , len ( key ) + 8 )
copy ( keySuffix , key )
binary . BigEndian . PutUint64 ( keySuffix [ len ( key ) : ] , invertedStep )
2022-07-26 05:47:08 +00:00
if err = d . tx . Delete ( d . valsTable , keySuffix ) ; err != nil {
2022-05-24 17:59:57 +00:00
return err
}
return nil
}
2022-05-29 18:57:09 +00:00
type CursorType uint8
const (
FILE_CURSOR CursorType = iota
DB_CURSOR
)
// CursorItem is the item in the priority queue used to do merge interation
// over storage of a given account
type CursorItem struct {
2022-06-17 18:24:56 +00:00
t CursorType // Whether this item represents state file or DB record, or tree
2022-07-23 08:06:52 +00:00
reverse bool
2022-06-17 18:24:56 +00:00
endTxNum uint64
key , val [ ] byte
2022-07-23 08:06:52 +00:00
dg , dg2 * compress . Getter
2022-06-17 18:24:56 +00:00
c kv . CursorDupSort
2022-05-29 18:57:09 +00:00
}
type CursorHeap [ ] * CursorItem
func ( ch CursorHeap ) Len ( ) int {
return len ( ch )
}
func ( ch CursorHeap ) Less ( i , j int ) bool {
cmp := bytes . Compare ( ch [ i ] . key , ch [ j ] . key )
if cmp == 0 {
// when keys match, the items with later blocks are preferred
2022-07-23 08:06:52 +00:00
if ch [ i ] . reverse {
return ch [ i ] . endTxNum > ch [ j ] . endTxNum
}
return ch [ i ] . endTxNum < ch [ j ] . endTxNum
2022-05-29 18:57:09 +00:00
}
return cmp < 0
}
func ( ch * CursorHeap ) Swap ( i , j int ) {
( * ch ) [ i ] , ( * ch ) [ j ] = ( * ch ) [ j ] , ( * ch ) [ i ]
}
func ( ch * CursorHeap ) Push ( x interface { } ) {
* ch = append ( * ch , x . ( * CursorItem ) )
}
func ( ch * CursorHeap ) Pop ( ) interface { } {
old := * ch
n := len ( old )
x := old [ n - 1 ]
* ch = old [ 0 : n - 1 ]
return x
}
2022-07-23 08:06:52 +00:00
// filesItem corresponding to a pair of files (.dat and .idx)
type ctxItem struct {
startTxNum uint64
endTxNum uint64
getter * compress . Getter
reader * recsplit . IndexReader
}
2022-09-26 02:42:44 +00:00
func ctxItemLess ( i , j ctxItem ) bool {
2022-07-23 08:06:52 +00:00
if i . endTxNum == j . endTxNum {
return i . startTxNum > j . startTxNum
}
return i . endTxNum < j . endTxNum
}
// DomainContext allows accesing the same domain from multiple go-routines
type DomainContext struct {
d * Domain
2022-09-26 02:42:44 +00:00
files * btree . BTreeG [ ctxItem ]
2022-07-28 07:47:13 +00:00
hc * HistoryContext
2022-07-23 08:06:52 +00:00
}
func ( d * Domain ) MakeContext ( ) * DomainContext {
dc := & DomainContext { d : d }
2022-07-28 07:47:13 +00:00
dc . hc = d . History . MakeContext ( )
2022-09-26 02:42:44 +00:00
bt := btree . NewG [ ctxItem ] ( 32 , ctxItemLess )
2022-07-28 07:47:13 +00:00
dc . files = bt
2022-09-26 14:59:24 +00:00
var datsz , idxsz , files uint64
2022-07-28 07:47:13 +00:00
d . files . Ascend ( func ( item * filesItem ) bool {
2022-09-29 05:14:45 +00:00
if item . index == nil {
return false
}
2022-09-26 14:59:24 +00:00
getter := item . decompressor . MakeGetter ( )
datsz += uint64 ( getter . Size ( ) )
idxsz += uint64 ( item . index . Size ( ) )
files += 2
2022-09-26 02:42:44 +00:00
bt . ReplaceOrInsert ( ctxItem {
2022-07-28 07:47:13 +00:00
startTxNum : item . startTxNum ,
endTxNum : item . endTxNum ,
2022-09-26 14:59:24 +00:00
getter : getter ,
2022-07-28 07:47:13 +00:00
reader : recsplit . NewIndexReader ( item . index ) ,
2022-07-23 08:06:52 +00:00
} )
2022-07-28 07:47:13 +00:00
return true
} )
2022-09-26 14:59:24 +00:00
d . stats . DataSize , d . stats . IndexSize , d . stats . FilesCount = datsz , idxsz , files
2022-07-23 08:06:52 +00:00
return dc
}
2022-05-31 17:42:04 +00:00
// IteratePrefix iterates over key-value pairs of the domain that start with given prefix
// The length of the prefix has to match the `prefixLen` parameter used to create the domain
// Such iteration is not intended to be used in public API, therefore it uses read-write transaction
// inside the domain. Another version of this for public API use needs to be created, that uses
// roTx instead and supports ending the iterations before it reaches the end.
2022-07-23 08:06:52 +00:00
func ( dc * DomainContext ) IteratePrefix ( prefix [ ] byte , it func ( k , v [ ] byte ) ) error {
if len ( prefix ) != dc . d . prefixLen {
return fmt . Errorf ( "wrong prefix length, this %s domain supports prefixLen %d, given [%x]" , dc . d . filenameBase , dc . d . prefixLen , prefix )
2022-05-29 18:57:09 +00:00
}
var cp CursorHeap
heap . Init ( & cp )
2022-07-02 18:38:34 +00:00
var k , v [ ] byte
var err error
2022-07-23 08:06:52 +00:00
keysCursor , err := dc . d . tx . CursorDupSort ( dc . d . keysTable )
2022-05-24 17:59:57 +00:00
if err != nil {
return err
}
defer keysCursor . Close ( )
2022-05-29 18:57:09 +00:00
if k , v , err = keysCursor . Seek ( prefix ) ; err != nil {
return err
}
if bytes . HasPrefix ( k , prefix ) {
2022-05-24 17:59:57 +00:00
keySuffix := make ( [ ] byte , len ( k ) + 8 )
copy ( keySuffix , k )
copy ( keySuffix [ len ( k ) : ] , v )
2022-05-29 18:57:09 +00:00
step := ^ binary . BigEndian . Uint64 ( v )
2022-07-23 08:06:52 +00:00
txNum := step * dc . d . aggregationStep
if v , err = dc . d . tx . GetOne ( dc . d . valsTable , keySuffix ) ; err != nil {
2022-05-24 17:59:57 +00:00
return err
}
2022-07-23 08:06:52 +00:00
heap . Push ( & cp , & CursorItem { t : DB_CURSOR , key : common . Copy ( k ) , val : common . Copy ( v ) , c : keysCursor , endTxNum : txNum , reverse : true } )
2022-05-24 17:59:57 +00:00
}
2022-09-26 02:42:44 +00:00
dc . files . Ascend ( func ( item ctxItem ) bool {
2022-07-23 08:06:52 +00:00
if item . reader . Empty ( ) {
2022-05-29 18:57:09 +00:00
return true
}
2022-07-23 08:06:52 +00:00
offset := item . reader . Lookup ( prefix )
2022-07-02 18:38:34 +00:00
// Creating dedicated getter because the one in the item may be used to delete storage, for example
2022-07-23 08:06:52 +00:00
g := item . getter
2022-05-29 18:57:09 +00:00
g . Reset ( offset )
if g . HasNext ( ) {
if keyMatch , _ := g . Match ( prefix ) ; ! keyMatch {
return true
}
g . Skip ( )
}
if g . HasNext ( ) {
key , _ := g . Next ( nil )
if bytes . HasPrefix ( key , prefix ) {
val , _ := g . Next ( nil )
2022-07-23 08:06:52 +00:00
heap . Push ( & cp , & CursorItem { t : FILE_CURSOR , key : key , val : val , dg : g , endTxNum : item . endTxNum , reverse : true } )
2022-05-29 18:57:09 +00:00
}
}
return true
} )
for cp . Len ( ) > 0 {
lastKey := common . Copy ( cp [ 0 ] . key )
lastVal := common . Copy ( cp [ 0 ] . val )
// Advance all the items that have this key (including the top)
for cp . Len ( ) > 0 && bytes . Equal ( cp [ 0 ] . key , lastKey ) {
ci1 := cp [ 0 ]
switch ci1 . t {
case FILE_CURSOR :
if ci1 . dg . HasNext ( ) {
ci1 . key , _ = ci1 . dg . Next ( ci1 . key [ : 0 ] )
if bytes . HasPrefix ( ci1 . key , prefix ) {
ci1 . val , _ = ci1 . dg . Next ( ci1 . val [ : 0 ] )
heap . Fix ( & cp , 0 )
} else {
heap . Pop ( & cp )
}
} else {
heap . Pop ( & cp )
}
case DB_CURSOR :
k , v , err = ci1 . c . NextNoDup ( )
if err != nil {
return err
}
if k != nil && bytes . HasPrefix ( k , prefix ) {
ci1 . key = common . Copy ( k )
keySuffix := make ( [ ] byte , len ( k ) + 8 )
copy ( keySuffix , k )
copy ( keySuffix [ len ( k ) : ] , v )
2022-07-23 08:06:52 +00:00
if v , err = dc . d . tx . GetOne ( dc . d . valsTable , keySuffix ) ; err != nil {
2022-05-29 18:57:09 +00:00
return err
}
ci1 . val = common . Copy ( v )
heap . Fix ( & cp , 0 )
} else {
heap . Pop ( & cp )
}
}
}
if len ( lastVal ) > 0 {
it ( lastKey , lastVal )
}
2022-05-24 17:59:57 +00:00
}
return nil
}
// Collation is the set of compressors created after aggregation
type Collation struct {
valuesPath string
valuesComp * compress . Compressor
valuesCount int
historyPath string
historyComp * compress . Compressor
historyCount int
indexBitmaps map [ string ] * roaring64 . Bitmap
}
2022-06-02 20:40:58 +00:00
func ( c Collation ) Close ( ) {
if c . valuesComp != nil {
c . valuesComp . Close ( )
}
if c . historyComp != nil {
c . historyComp . Close ( )
}
}
2022-05-24 17:59:57 +00:00
// collate gathers domain changes over the specified step, using read-only transaction,
// and returns compressors, elias fano, and bitmaps
// [txFrom; txTo)
2022-07-23 08:06:52 +00:00
func ( d * Domain ) collate ( step , txFrom , txTo uint64 , roTx kv . Tx ) ( Collation , error ) {
2022-07-28 07:47:13 +00:00
hCollation , err := d . History . collate ( step , txFrom , txTo , roTx )
if err != nil {
return Collation { } , err
}
var valuesComp * compress . Compressor
2022-05-24 17:59:57 +00:00
closeComp := true
defer func ( ) {
if closeComp {
2022-07-28 07:47:13 +00:00
hCollation . Close ( )
2022-05-24 17:59:57 +00:00
if valuesComp != nil {
valuesComp . Close ( )
}
}
} ( )
2022-07-28 07:47:13 +00:00
valuesPath := filepath . Join ( d . dir , fmt . Sprintf ( "%s.%d-%d.kv" , d . filenameBase , step , step + 1 ) )
2022-05-24 17:59:57 +00:00
if valuesComp , err = compress . NewCompressor ( context . Background ( ) , "collate values" , valuesPath , d . dir , compress . MinPatternScore , 1 , log . LvlDebug ) ; err != nil {
return Collation { } , fmt . Errorf ( "create %s values compressor: %w" , d . filenameBase , err )
}
keysCursor , err := roTx . CursorDupSort ( d . keysTable )
if err != nil {
return Collation { } , fmt . Errorf ( "create %s keys cursor: %w" , d . filenameBase , err )
}
defer keysCursor . Close ( )
2022-05-29 18:57:09 +00:00
var prefix [ ] byte // Track prefix to insert it before entries
2022-05-24 17:59:57 +00:00
var k , v [ ] byte
valuesCount := 0
for k , _ , err = keysCursor . First ( ) ; err == nil && k != nil ; k , _ , err = keysCursor . NextNoDup ( ) {
if v , err = keysCursor . LastDup ( ) ; err != nil {
return Collation { } , fmt . Errorf ( "find last %s key for aggregation step k=[%x]: %w" , d . filenameBase , k , err )
}
s := ^ binary . BigEndian . Uint64 ( v )
if s == step {
keySuffix := make ( [ ] byte , len ( k ) + 8 )
copy ( keySuffix , k )
copy ( keySuffix [ len ( k ) : ] , v )
v , err := roTx . GetOne ( d . valsTable , keySuffix )
if err != nil {
return Collation { } , fmt . Errorf ( "find last %s value for aggregation step k=[%x]: %w" , d . filenameBase , k , err )
}
2022-05-29 18:57:09 +00:00
if d . prefixLen > 0 && ( prefix == nil || ! bytes . HasPrefix ( k , prefix ) ) {
prefix = append ( prefix [ : 0 ] , k [ : d . prefixLen ] ... )
if err = valuesComp . AddUncompressedWord ( prefix ) ; err != nil {
return Collation { } , fmt . Errorf ( "add %s values prefix [%x]: %w" , d . filenameBase , prefix , err )
}
if err = valuesComp . AddUncompressedWord ( nil ) ; err != nil {
return Collation { } , fmt . Errorf ( "add %s values prefix val [%x]: %w" , d . filenameBase , prefix , err )
}
valuesCount ++
}
2022-05-24 17:59:57 +00:00
if err = valuesComp . AddUncompressedWord ( k ) ; err != nil {
return Collation { } , fmt . Errorf ( "add %s values key [%x]: %w" , d . filenameBase , k , err )
}
valuesCount ++ // Only counting keys, not values
if err = valuesComp . AddUncompressedWord ( v ) ; err != nil {
return Collation { } , fmt . Errorf ( "add %s values val [%x]=>[%x]: %w" , d . filenameBase , k , v , err )
}
}
}
if err != nil {
return Collation { } , fmt . Errorf ( "iterate over %s keys cursor: %w" , d . filenameBase , err )
}
closeComp = false
return Collation {
valuesPath : valuesPath ,
valuesComp : valuesComp ,
valuesCount : valuesCount ,
2022-07-28 07:47:13 +00:00
historyPath : hCollation . historyPath ,
historyComp : hCollation . historyComp ,
historyCount : hCollation . historyCount ,
indexBitmaps : hCollation . indexBitmaps ,
2022-05-24 17:59:57 +00:00
} , nil
}
type StaticFiles struct {
valuesDecomp * compress . Decompressor
valuesIdx * recsplit . Index
historyDecomp * compress . Decompressor
historyIdx * recsplit . Index
efHistoryDecomp * compress . Decompressor
efHistoryIdx * recsplit . Index
}
func ( sf StaticFiles ) Close ( ) {
2022-06-05 21:32:34 +00:00
if sf . valuesDecomp != nil {
sf . valuesDecomp . Close ( )
}
if sf . valuesIdx != nil {
sf . valuesIdx . Close ( )
}
if sf . historyDecomp != nil {
sf . historyDecomp . Close ( )
}
if sf . historyIdx != nil {
sf . historyIdx . Close ( )
}
if sf . efHistoryDecomp != nil {
sf . efHistoryDecomp . Close ( )
}
if sf . efHistoryIdx != nil {
sf . efHistoryIdx . Close ( )
}
2022-05-24 17:59:57 +00:00
}
// buildFiles performs potentially resource intensive operations of creating
// static files and their indices
func ( d * Domain ) buildFiles ( step uint64 , collation Collation ) ( StaticFiles , error ) {
2022-07-28 07:47:13 +00:00
hStaticFiles , err := d . History . buildFiles ( step , HistoryCollation {
historyPath : collation . historyPath ,
historyComp : collation . historyComp ,
historyCount : collation . historyCount ,
indexBitmaps : collation . indexBitmaps ,
} )
if err != nil {
return StaticFiles { } , err
}
2022-05-24 17:59:57 +00:00
valuesComp := collation . valuesComp
2022-07-28 07:47:13 +00:00
var valuesDecomp * compress . Decompressor
var valuesIdx * recsplit . Index
2022-05-24 17:59:57 +00:00
closeComp := true
defer func ( ) {
if closeComp {
2022-07-28 07:47:13 +00:00
hStaticFiles . Close ( )
2022-05-24 17:59:57 +00:00
if valuesComp != nil {
valuesComp . Close ( )
}
if valuesDecomp != nil {
valuesDecomp . Close ( )
}
if valuesIdx != nil {
valuesIdx . Close ( )
}
}
} ( )
2022-07-28 07:47:13 +00:00
valuesIdxPath := filepath . Join ( d . dir , fmt . Sprintf ( "%s.%d-%d.kvi" , d . filenameBase , step , step + 1 ) )
2022-05-24 17:59:57 +00:00
if err = valuesComp . Compress ( ) ; err != nil {
return StaticFiles { } , fmt . Errorf ( "compress %s values: %w" , d . filenameBase , err )
}
valuesComp . Close ( )
valuesComp = nil
if valuesDecomp , err = compress . NewDecompressor ( collation . valuesPath ) ; err != nil {
return StaticFiles { } , fmt . Errorf ( "open %s values decompressor: %w" , d . filenameBase , err )
}
2022-05-29 18:57:09 +00:00
if valuesIdx , err = buildIndex ( valuesDecomp , valuesIdxPath , d . dir , collation . valuesCount , false /* values */ ) ; err != nil {
2022-05-24 17:59:57 +00:00
return StaticFiles { } , fmt . Errorf ( "build %s values idx: %w" , d . filenameBase , err )
}
closeComp = false
return StaticFiles {
valuesDecomp : valuesDecomp ,
valuesIdx : valuesIdx ,
2022-07-28 07:47:13 +00:00
historyDecomp : hStaticFiles . historyDecomp ,
historyIdx : hStaticFiles . historyIdx ,
efHistoryDecomp : hStaticFiles . efHistoryDecomp ,
efHistoryIdx : hStaticFiles . efHistoryIdx ,
2022-05-24 17:59:57 +00:00
} , nil
}
2022-09-29 05:14:45 +00:00
func ( d * Domain ) missedIdxFiles ( ) ( l [ ] * filesItem ) {
d . files . Ascend ( func ( item * filesItem ) bool { // don't run slow logic while iterating on btree
fromStep , toStep := item . startTxNum / d . aggregationStep , item . endTxNum / d . aggregationStep
if ! dir . FileExist ( filepath . Join ( d . dir , fmt . Sprintf ( "%s.%d-%d.kvi" , d . filenameBase , fromStep , toStep ) ) ) {
l = append ( l , item )
}
return true
} )
return l
}
// BuildMissedIndices - produce .efi/.vi/.kvi from .ef/.v/.kv
func ( d * Domain ) BuildMissedIndices ( ) ( err error ) {
if err := d . History . BuildMissedIndices ( ) ; err != nil {
return err
}
for _ , item := range d . missedIdxFiles ( ) {
//TODO: build .kvi
_ = item
}
return d . openFiles ( )
}
2022-05-29 18:57:09 +00:00
func buildIndex ( d * compress . Decompressor , idxPath , dir string , count int , values bool ) ( * recsplit . Index , error ) {
2022-05-24 17:59:57 +00:00
var rs * recsplit . RecSplit
var err error
if rs , err = recsplit . NewRecSplit ( recsplit . RecSplitArgs {
KeyCount : count ,
Enums : false ,
BucketSize : 2000 ,
LeafSize : 8 ,
TmpDir : dir ,
2022-09-26 02:42:44 +00:00
IndexFile : idxPath ,
2022-05-24 17:59:57 +00:00
} ) ; err != nil {
return nil , fmt . Errorf ( "create recsplit: %w" , err )
}
defer rs . Close ( )
2022-09-29 05:14:45 +00:00
defer d . EnableReadAhead ( ) . DisableReadAhead ( )
2022-05-24 17:59:57 +00:00
word := make ( [ ] byte , 0 , 256 )
2022-05-29 18:57:09 +00:00
var keyPos , valPos uint64
2022-05-24 17:59:57 +00:00
g := d . MakeGetter ( )
for {
g . Reset ( 0 )
for g . HasNext ( ) {
2022-05-29 18:57:09 +00:00
word , valPos = g . Next ( word [ : 0 ] )
if values {
if err = rs . AddKey ( word , valPos ) ; err != nil {
return nil , fmt . Errorf ( "add idx key [%x]: %w" , word , err )
}
} else {
if err = rs . AddKey ( word , keyPos ) ; err != nil {
return nil , fmt . Errorf ( "add idx key [%x]: %w" , word , err )
}
2022-05-24 17:59:57 +00:00
}
// Skip value
2022-05-29 18:57:09 +00:00
keyPos = g . Skip ( )
2022-05-24 17:59:57 +00:00
}
if err = rs . Build ( ) ; err != nil {
if rs . Collision ( ) {
log . Info ( "Building recsplit. Collision happened. It's ok. Restarting..." )
rs . ResetNextSalt ( )
} else {
return nil , fmt . Errorf ( "build idx: %w" , err )
}
} else {
break
}
}
var idx * recsplit . Index
if idx , err = recsplit . OpenIndex ( idxPath ) ; err != nil {
return nil , fmt . Errorf ( "open idx: %w" , err )
}
return idx , nil
}
func ( d * Domain ) integrateFiles ( sf StaticFiles , txNumFrom , txNumTo uint64 ) {
2022-07-28 07:47:13 +00:00
d . History . integrateFiles ( HistoryFiles {
historyDecomp : sf . historyDecomp ,
historyIdx : sf . historyIdx ,
efHistoryDecomp : sf . efHistoryDecomp ,
efHistoryIdx : sf . efHistoryIdx ,
} , txNumFrom , txNumTo )
d . files . ReplaceOrInsert ( & filesItem {
2022-05-24 17:59:57 +00:00
startTxNum : txNumFrom ,
endTxNum : txNumTo ,
decompressor : sf . valuesDecomp ,
index : sf . valuesIdx ,
} )
}
// [txFrom; txTo)
func ( d * Domain ) prune ( step uint64 , txFrom , txTo uint64 ) error {
// It is important to clean up tables in a specific order
// First keysTable, because it is the first one access in the `get` function, i.e. if the record is deleted from there, other tables will not be accessed
keysCursor , err := d . tx . RwCursorDupSort ( d . keysTable )
if err != nil {
return fmt . Errorf ( "%s keys cursor: %w" , d . filenameBase , err )
}
defer keysCursor . Close ( )
var k , v [ ] byte
2022-09-26 14:59:24 +00:00
keyMaxSteps := make ( map [ string ] uint64 )
for k , v , err = keysCursor . First ( ) ; err == nil && k != nil ; k , v , err = keysCursor . Next ( ) {
s := ^ binary . BigEndian . Uint64 ( v )
if maxS , seen := keyMaxSteps [ string ( k ) ] ; ! seen || s > maxS {
keyMaxSteps [ string ( k ) ] = s
}
}
if err != nil {
return fmt . Errorf ( "iterate of %s keys: %w" , d . filenameBase , err )
}
2022-05-24 17:59:57 +00:00
for k , v , err = keysCursor . First ( ) ; err == nil && k != nil ; k , v , err = keysCursor . Next ( ) {
s := ^ binary . BigEndian . Uint64 ( v )
if s == step {
2022-09-26 14:59:24 +00:00
if maxS := keyMaxSteps [ string ( k ) ] ; maxS <= step {
continue
}
2022-05-24 17:59:57 +00:00
if err = keysCursor . DeleteCurrent ( ) ; err != nil {
return fmt . Errorf ( "clean up %s for [%x]=>[%x]: %w" , d . filenameBase , k , v , err )
}
2022-09-26 14:59:24 +00:00
//fmt.Printf("domain prune key %x [s%d]\n", string(k), s)
2022-05-24 17:59:57 +00:00
}
}
if err != nil {
return fmt . Errorf ( "iterate of %s keys: %w" , d . filenameBase , err )
}
var valsCursor kv . RwCursor
if valsCursor , err = d . tx . RwCursor ( d . valsTable ) ; err != nil {
return fmt . Errorf ( "%s vals cursor: %w" , d . filenameBase , err )
}
defer valsCursor . Close ( )
for k , _ , err = valsCursor . First ( ) ; err == nil && k != nil ; k , _ , err = valsCursor . Next ( ) {
s := ^ binary . BigEndian . Uint64 ( k [ len ( k ) - 8 : ] )
if s == step {
2022-09-26 14:59:24 +00:00
if maxS := keyMaxSteps [ string ( k [ : len ( k ) - 8 ] ) ] ; maxS <= step {
continue
}
2022-05-24 17:59:57 +00:00
if err = valsCursor . DeleteCurrent ( ) ; err != nil {
return fmt . Errorf ( "clean up %s for [%x]: %w" , d . filenameBase , k , err )
}
2022-09-26 14:59:24 +00:00
//fmt.Printf("domain prune value for %x (invs %x) [s%d]\n", string(k),k[len(k)-8):], s)
2022-05-24 17:59:57 +00:00
}
}
if err != nil {
return fmt . Errorf ( "iterate over %s vals: %w" , d . filenameBase , err )
}
2022-09-26 14:59:24 +00:00
2022-10-02 03:03:49 +00:00
if err = d . History . prune ( txFrom , txTo ) ; err != nil {
2022-09-26 14:59:24 +00:00
return fmt . Errorf ( "prune history at step %d [%d, %d): %w" , step , txFrom , txTo , err )
2022-07-26 05:47:08 +00:00
}
2022-05-24 17:59:57 +00:00
return nil
}
2022-09-26 14:59:24 +00:00
func ( dc * DomainContext ) readFromFiles ( filekey [ ] byte , fromTxNum uint64 ) ( [ ] byte , bool ) {
2022-05-24 17:59:57 +00:00
var val [ ] byte
var found bool
2022-09-26 14:59:24 +00:00
2022-09-26 02:42:44 +00:00
dc . files . Descend ( func ( item ctxItem ) bool {
2022-09-26 14:59:24 +00:00
if item . endTxNum < fromTxNum {
return false
}
2022-07-23 08:06:52 +00:00
if item . reader . Empty ( ) {
2022-05-24 17:59:57 +00:00
return true
}
2022-07-23 08:06:52 +00:00
offset := item . reader . Lookup ( filekey )
2022-05-24 17:59:57 +00:00
g := item . getter
g . Reset ( offset )
if g . HasNext ( ) {
if keyMatch , _ := g . Match ( filekey ) ; keyMatch {
val , _ = g . Next ( nil )
found = true
return false
}
}
return true
} )
return val , found
}
2022-06-09 13:48:16 +00:00
// historyBeforeTxNum searches history for a value of specified key before txNum
2022-05-24 17:59:57 +00:00
// second return value is true if the value is found in the history (even if it is nil)
2022-07-23 08:06:52 +00:00
func ( dc * DomainContext ) historyBeforeTxNum ( key [ ] byte , txNum uint64 , roTx kv . Tx ) ( [ ] byte , bool , error ) {
var search ctxItem
2022-06-09 13:48:16 +00:00
search . startTxNum = txNum
search . endTxNum = txNum
2022-05-24 17:59:57 +00:00
var foundTxNum uint64
var foundEndTxNum uint64
var foundStartTxNum uint64
var found bool
2022-05-31 17:42:04 +00:00
var anyItem bool // Whether any filesItem has been looked at in the loop below
2022-09-26 02:42:44 +00:00
var topState ctxItem
dc . files . AscendGreaterOrEqual ( search , func ( i ctxItem ) bool {
2022-07-18 09:05:04 +00:00
topState = i
2022-06-20 07:39:29 +00:00
return false
} )
2022-09-26 02:42:44 +00:00
dc . hc . indexFiles . AscendGreaterOrEqual ( search , func ( item ctxItem ) bool {
2022-05-31 17:42:04 +00:00
anyItem = true
2022-07-23 08:06:52 +00:00
offset := item . reader . Lookup ( key )
2022-05-24 17:59:57 +00:00
g := item . getter
g . Reset ( offset )
2022-06-17 11:39:49 +00:00
if k , _ := g . NextUncompressed ( ) ; bytes . Equal ( k , key ) {
2022-05-24 17:59:57 +00:00
eliasVal , _ := g . NextUncompressed ( )
ef , _ := eliasfano32 . ReadEliasFano ( eliasVal )
2022-07-02 18:38:34 +00:00
//start := time.Now()
n , ok := ef . Search ( txNum )
//d.stats.EfSearchTime += time.Since(start)
if ok {
2022-05-24 17:59:57 +00:00
foundTxNum = n
foundEndTxNum = item . endTxNum
foundStartTxNum = item . startTxNum
found = true
return false
2022-06-20 07:39:29 +00:00
} else if item . endTxNum > txNum && item . endTxNum >= topState . endTxNum {
return false
2022-05-24 17:59:57 +00:00
}
}
return true
} )
if ! found {
2022-05-31 17:42:04 +00:00
if anyItem {
// If there were no changes but there were history files, the value can be obtained from value files
var val [ ] byte
2022-09-26 02:42:44 +00:00
dc . files . DescendLessOrEqual ( topState , func ( item ctxItem ) bool {
2022-07-23 08:06:52 +00:00
if item . reader . Empty ( ) {
2022-05-31 17:42:04 +00:00
return true
}
2022-07-23 08:06:52 +00:00
offset := item . reader . Lookup ( key )
2022-05-31 17:42:04 +00:00
g := item . getter
g . Reset ( offset )
if g . HasNext ( ) {
2022-06-17 11:39:49 +00:00
if k , _ := g . NextUncompressed ( ) ; bytes . Equal ( k , key ) {
2022-07-23 08:06:52 +00:00
if dc . d . compressVals {
2022-06-17 11:39:49 +00:00
val , _ = g . Next ( nil )
} else {
val , _ = g . NextUncompressed ( )
}
2022-05-31 17:42:04 +00:00
return false
}
}
return true
} )
2022-06-09 13:48:16 +00:00
return val , true , nil
2022-05-31 17:42:04 +00:00
}
2022-05-29 18:57:09 +00:00
// Value not found in history files, look in the recent history
2022-05-31 17:42:04 +00:00
if roTx == nil {
return nil , false , fmt . Errorf ( "roTx is nil" )
}
2022-07-23 08:06:52 +00:00
indexCursor , err := roTx . CursorDupSort ( dc . d . indexTable )
2022-05-29 18:57:09 +00:00
if err != nil {
return nil , false , err
}
defer indexCursor . Close ( )
var txKey [ 8 ] byte
2022-06-09 13:48:16 +00:00
binary . BigEndian . PutUint64 ( txKey [ : ] , txNum )
2022-05-29 18:57:09 +00:00
var foundTxNumVal [ ] byte
if foundTxNumVal , err = indexCursor . SeekBothRange ( key , txKey [ : ] ) ; err != nil {
return nil , false , err
}
if foundTxNumVal != nil {
var historyKeysCursor kv . CursorDupSort
2022-07-28 07:47:13 +00:00
if historyKeysCursor , err = roTx . CursorDupSort ( dc . d . indexKeysTable ) ; err != nil {
2022-05-29 18:57:09 +00:00
return nil , false , err
}
defer historyKeysCursor . Close ( )
var vn [ ] byte
if vn , err = historyKeysCursor . SeekBothRange ( foundTxNumVal , key ) ; err != nil {
return nil , false , err
}
valNum := binary . BigEndian . Uint64 ( vn [ len ( vn ) - 8 : ] )
if valNum == 0 {
// This is special valNum == 0, which is empty value
return nil , true , nil
}
var v [ ] byte
2022-07-23 08:06:52 +00:00
if v , err = roTx . GetOne ( dc . d . historyValsTable , vn [ len ( vn ) - 8 : ] ) ; err != nil {
2022-05-29 18:57:09 +00:00
return nil , false , err
}
return v , true , nil
}
2022-05-24 17:59:57 +00:00
return nil , false , nil
}
2022-06-17 11:39:49 +00:00
var txKey [ 8 ] byte
binary . BigEndian . PutUint64 ( txKey [ : ] , foundTxNum )
2022-09-26 02:42:44 +00:00
var historyItem ctxItem
2022-05-24 17:59:57 +00:00
search . startTxNum = foundStartTxNum
search . endTxNum = foundEndTxNum
2022-09-26 02:42:44 +00:00
historyItem , ok := dc . hc . historyFiles . Get ( search )
if ! ok {
2022-07-23 08:06:52 +00:00
return nil , false , fmt . Errorf ( "no %s file found for [%x]" , dc . d . filenameBase , key )
2022-05-24 17:59:57 +00:00
}
2022-07-23 08:06:52 +00:00
offset := historyItem . reader . Lookup2 ( txKey [ : ] , key )
2022-05-24 17:59:57 +00:00
g := historyItem . getter
g . Reset ( offset )
2022-07-23 08:06:52 +00:00
if dc . d . compressVals {
2022-06-17 11:39:49 +00:00
v , _ := g . Next ( nil )
return v , true , nil
}
v , _ := g . NextUncompressed ( )
2022-05-24 17:59:57 +00:00
return v , true , nil
}
2022-06-09 13:48:16 +00:00
// GetBeforeTxNum does not always require usage of roTx. If it is possible to determine
2022-05-31 17:42:04 +00:00
// historical value based only on static files, roTx will not be used.
2022-07-23 08:06:52 +00:00
func ( dc * DomainContext ) GetBeforeTxNum ( key [ ] byte , txNum uint64 , roTx kv . Tx ) ( [ ] byte , error ) {
v , hOk , err := dc . historyBeforeTxNum ( key , txNum , roTx )
2022-05-24 17:59:57 +00:00
if err != nil {
return nil , err
}
if hOk {
return v , nil
}
2022-09-26 14:59:24 +00:00
if v , _ , err = dc . get ( key , txNum - 1 , roTx ) ; err != nil {
2022-05-24 17:59:57 +00:00
return nil , err
}
return v , nil
}