2019-05-27 13:51:49 +00:00
package core
import (
"bytes"
"encoding/binary"
2020-02-03 12:02:26 +00:00
"fmt"
2019-10-30 17:33:01 +00:00
"sync"
"time"
2020-03-30 16:01:24 +00:00
"github.com/ledgerwatch/turbo-geth/common/changeset"
"github.com/ledgerwatch/turbo-geth/common/debug"
2019-05-27 13:51:49 +00:00
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
)
const DeleteLimit = 70000
type BlockChainer interface {
CurrentBlock ( ) * types . Block
}
func NewBasicPruner ( database ethdb . Database , chainer BlockChainer , config * CacheConfig ) ( * BasicPruner , error ) {
if config . BlocksToPrune == 0 || config . PruneTimeout . Seconds ( ) < 1 {
2020-02-03 12:02:26 +00:00
return nil , fmt . Errorf ( "incorrect config BlocksToPrune - %v, PruneTimeout - %v" , config . BlocksToPrune , config . PruneTimeout . Seconds ( ) )
2019-05-27 13:51:49 +00:00
}
return & BasicPruner {
wg : new ( sync . WaitGroup ) ,
db : database ,
chain : chainer ,
config : config ,
LastPrunedBlockNum : 0 ,
stop : make ( chan struct { } , 1 ) ,
} , nil
}
type BasicPruner struct {
wg * sync . WaitGroup
stop chan struct { }
db ethdb . Database
chain BlockChainer
LastPrunedBlockNum uint64
config * CacheConfig
}
func ( p * BasicPruner ) Start ( ) error {
2019-10-30 17:33:01 +00:00
db := p . db
2019-05-27 13:51:49 +00:00
p . LastPrunedBlockNum = p . ReadLastPrunedBlockNum ( )
p . wg . Add ( 1 )
go p . pruningLoop ( db )
log . Info ( "Pruner started" )
return nil
}
2020-02-12 13:52:59 +00:00
2019-10-30 17:33:01 +00:00
func ( p * BasicPruner ) pruningLoop ( db ethdb . Database ) {
2019-05-27 13:51:49 +00:00
prunerRun := time . NewTicker ( p . config . PruneTimeout )
saveLastPrunedBlockNum := time . NewTicker ( time . Minute * 5 )
defer prunerRun . Stop ( )
defer saveLastPrunedBlockNum . Stop ( )
defer p . wg . Done ( )
for {
select {
case <- p . stop :
p . WriteLastPrunedBlockNum ( p . LastPrunedBlockNum )
log . Info ( "Pruning stopped" )
return
case <- saveLastPrunedBlockNum . C :
log . Info ( "Save last pruned block num" , "num" , p . LastPrunedBlockNum )
p . WriteLastPrunedBlockNum ( p . LastPrunedBlockNum )
case <- prunerRun . C :
cb := p . chain . CurrentBlock ( )
if cb == nil || cb . Number ( ) == nil {
continue
}
from , to , ok := calculateNumOfPrunedBlocks ( cb . Number ( ) . Uint64 ( ) , p . LastPrunedBlockNum , p . config . BlocksBeforePruning , p . config . BlocksToPrune )
if ! ok {
continue
}
log . Debug ( "Pruning history" , "from" , from , "to" , to )
err := Prune ( db , from , to )
if err != nil {
log . Error ( "Pruning error" , "err" , err )
return
}
2020-02-12 13:52:59 +00:00
err = PruneStorageOfSelfDestructedAccounts ( db )
if err != nil {
log . Error ( "PruneStorageOfSelfDestructedAccounts error" , "err" , err )
return
}
2019-05-27 13:51:49 +00:00
p . LastPrunedBlockNum = to
}
}
}
2019-11-07 15:51:06 +00:00
func calculateNumOfPrunedBlocks ( currentBlock , lastPrunedBlock uint64 , blocksBeforePruning uint64 , blocksBatch uint64 ) ( uint64 , uint64 , bool ) {
//underflow see https://github.com/ledgerwatch/turbo-geth/issues/115
if currentBlock <= lastPrunedBlock {
return lastPrunedBlock , lastPrunedBlock , false
}
diff := currentBlock - lastPrunedBlock
if diff <= blocksBeforePruning {
return lastPrunedBlock , lastPrunedBlock , false
}
diff = diff - blocksBeforePruning
2019-05-27 13:51:49 +00:00
switch {
case diff >= blocksBatch :
return lastPrunedBlock , lastPrunedBlock + blocksBatch , true
2019-11-07 15:51:06 +00:00
case diff < blocksBatch :
2019-05-27 13:51:49 +00:00
return lastPrunedBlock , lastPrunedBlock + diff , true
default :
return lastPrunedBlock , lastPrunedBlock , false
}
}
2020-01-16 21:21:40 +00:00
2019-05-27 13:51:49 +00:00
func ( p * BasicPruner ) Stop ( ) {
p . stop <- struct { } { }
p . wg . Wait ( )
log . Info ( "Pruning stopped" )
}
func ( p * BasicPruner ) ReadLastPrunedBlockNum ( ) uint64 {
data , _ := p . db . Get ( dbutils . LastPrunedBlockKey , dbutils . LastPrunedBlockKey )
if len ( data ) == 0 {
return 0
}
return binary . LittleEndian . Uint64 ( data )
}
// WriteHeadBlockHash stores the head block's hash.
func ( p * BasicPruner ) WriteLastPrunedBlockNum ( num uint64 ) {
b := make ( [ ] byte , 8 )
binary . LittleEndian . PutUint64 ( b , num )
if err := p . db . Put ( dbutils . LastPrunedBlockKey , dbutils . LastPrunedBlockKey , b ) ; err != nil {
log . Crit ( "Failed to store last pruned block's num" , "err" , err )
}
}
2020-02-12 13:52:59 +00:00
func PruneStorageOfSelfDestructedAccounts ( db ethdb . Database ) error {
keysToRemove := newKeysToRemove ( )
if err := db . Walk ( dbutils . IntermediateTrieHashBucket , [ ] byte { } , 0 , func ( k , v [ ] byte ) ( b bool , e error ) {
if len ( v ) > 0 && len ( k ) != common . HashLength { // marker of self-destructed account is - empty value
return true , nil
}
if err := db . Walk ( dbutils . StorageBucket , k , common . HashLength * 8 , func ( k , _ [ ] byte ) ( b bool , e error ) {
2020-03-30 16:01:24 +00:00
keysToRemove . StorageKeys = append ( keysToRemove . StorageKeys , common . CopyBytes ( k ) )
2020-02-12 13:52:59 +00:00
return true , nil
} ) ; err != nil {
return false , err
}
if err := db . Walk ( dbutils . IntermediateTrieHashBucket , k , common . HashLength * 8 , func ( k , _ [ ] byte ) ( b bool , e error ) {
2020-03-30 16:01:24 +00:00
keysToRemove . IntermediateTrieHashKeys = append ( keysToRemove . IntermediateTrieHashKeys , common . CopyBytes ( k ) )
2020-02-12 13:52:59 +00:00
return true , nil
} ) ; err != nil {
return false , err
}
return true , nil
} ) ; err != nil {
return err
}
//return batchDelete(db, keysToRemove)
log . Debug ( "PruneStorageOfSelfDestructedAccounts can remove rows amount" , "storage_bucket" , len ( keysToRemove . StorageKeys ) , "intermediate_bucket" , len ( keysToRemove . IntermediateTrieHashKeys ) )
return nil
}
2019-10-30 17:33:01 +00:00
func Prune ( db ethdb . Database , blockNumFrom uint64 , blockNumTo uint64 ) error {
2019-05-27 13:51:49 +00:00
keysToRemove := newKeysToRemove ( )
2020-01-16 21:21:40 +00:00
err := db . Walk ( dbutils . AccountChangeSetBucket , [ ] byte { } , 0 , func ( key , v [ ] byte ) ( b bool , e error ) {
2019-05-27 13:51:49 +00:00
timestamp , _ := dbutils . DecodeTimestamp ( key )
if timestamp < blockNumFrom {
return true , nil
}
if timestamp > blockNumTo {
return false , nil
}
2020-04-11 13:28:15 +00:00
keysToRemove . AccountChangeSet = append ( keysToRemove . AccountChangeSet , common . CopyBytes ( key ) )
2019-05-27 13:51:49 +00:00
2020-01-22 10:25:07 +00:00
innerErr := changeset . Walk ( v , func ( cKey , _ [ ] byte ) error {
2019-05-27 13:51:49 +00:00
compKey , _ := dbutils . CompositeKeySuffix ( cKey , timestamp )
2020-01-16 21:21:40 +00:00
keysToRemove . AccountHistoryKeys = append ( keysToRemove . AccountHistoryKeys , compKey )
2019-05-27 13:51:49 +00:00
return nil
} )
2020-01-16 21:21:40 +00:00
if innerErr != nil {
return false , innerErr
}
return true , nil
} )
if err != nil {
return err
}
err = db . Walk ( dbutils . StorageChangeSetBucket , [ ] byte { } , 0 , func ( key , v [ ] byte ) ( b bool , e error ) {
timestamp , _ := dbutils . DecodeTimestamp ( key )
if timestamp < blockNumFrom {
return true , nil
}
if timestamp > blockNumTo {
return false , nil
}
2020-04-11 13:28:15 +00:00
keysToRemove . StorageChangeSet = append ( keysToRemove . StorageChangeSet , common . CopyBytes ( key ) )
2020-01-22 10:25:07 +00:00
var innerErr error
if debug . IsThinHistory ( ) {
innerErr = changeset . StorageChangeSetBytes ( v ) . Walk ( func ( cKey , _ [ ] byte ) error {
//todo implement pruning for thin history
return nil
} )
} else {
innerErr = changeset . Walk ( v , func ( cKey , _ [ ] byte ) error {
compKey , _ := dbutils . CompositeKeySuffix ( cKey , timestamp )
2020-04-11 13:28:15 +00:00
keysToRemove . StorageHistoryKeys = append ( keysToRemove . StorageHistoryKeys , common . CopyBytes ( compKey ) )
2020-01-22 10:25:07 +00:00
return nil
} )
}
2020-01-16 21:21:40 +00:00
if innerErr != nil {
return false , innerErr
2019-05-27 13:51:49 +00:00
}
return true , nil
} )
if err != nil {
return err
}
2019-10-30 17:33:01 +00:00
err = batchDelete ( db , keysToRemove )
2019-05-27 13:51:49 +00:00
if err != nil {
return err
}
return nil
}
2019-10-30 17:33:01 +00:00
func batchDelete ( db ethdb . Database , keys * keysToRemove ) error {
2020-01-16 21:21:40 +00:00
log . Debug ( "Removing: " , "accounts" , len ( keys . AccountHistoryKeys ) , "storage" , len ( keys . StorageHistoryKeys ) , "suffix" , len ( keys . AccountChangeSet ) )
2019-05-27 13:51:49 +00:00
iterator := LimitIterator ( keys , DeleteLimit )
for iterator . HasMore ( ) {
iterator . ResetLimit ( )
2019-10-30 17:33:01 +00:00
batch := db . NewBatch ( )
for {
key , bucketKey , ok := iterator . GetNext ( )
if ! ok {
break
2019-05-27 13:51:49 +00:00
}
2019-10-30 17:33:01 +00:00
err := batch . Delete ( bucketKey , key )
if err != nil {
log . Warn ( "Unable to remove" , "bucket" , bucketKey , "addr" , common . Bytes2Hex ( key ) , "err" , err )
continue
}
}
_ , err := batch . Commit ( )
2019-05-27 13:51:49 +00:00
if err != nil {
return err
}
}
return nil
}
func newKeysToRemove ( ) * keysToRemove {
return & keysToRemove {
2020-02-12 13:52:59 +00:00
AccountHistoryKeys : make ( Keys , 0 ) ,
StorageHistoryKeys : make ( Keys , 0 ) ,
2020-02-24 18:47:26 +00:00
AccountChangeSet : make ( Keys , 0 ) ,
StorageChangeSet : make ( Keys , 0 ) ,
2020-02-12 13:52:59 +00:00
StorageKeys : make ( Keys , 0 ) ,
IntermediateTrieHashKeys : make ( Keys , 0 ) ,
2019-05-27 13:51:49 +00:00
}
}
2020-02-12 13:52:59 +00:00
type Keys [ ] [ ] byte
type Batch struct {
bucket [ ] byte
keys Keys
}
2019-05-27 13:51:49 +00:00
type keysToRemove struct {
2020-02-12 13:52:59 +00:00
AccountHistoryKeys Keys
StorageHistoryKeys Keys
2020-02-24 18:47:26 +00:00
AccountChangeSet Keys
StorageChangeSet Keys
2020-02-12 13:52:59 +00:00
StorageKeys Keys
IntermediateTrieHashKeys Keys
2019-05-27 13:51:49 +00:00
}
func LimitIterator ( k * keysToRemove , limit int ) * limitIterator {
2020-02-12 13:52:59 +00:00
i := & limitIterator {
2019-05-27 13:51:49 +00:00
k : k ,
limit : limit ,
}
2020-02-24 18:47:26 +00:00
2020-02-12 13:52:59 +00:00
i . batches = [ ] Batch {
{ bucket : dbutils . AccountsHistoryBucket , keys : i . k . AccountHistoryKeys } ,
{ bucket : dbutils . StorageHistoryBucket , keys : i . k . StorageHistoryKeys } ,
{ bucket : dbutils . StorageBucket , keys : i . k . StorageKeys } ,
2020-02-24 18:47:26 +00:00
{ bucket : dbutils . AccountChangeSetBucket , keys : i . k . AccountChangeSet } ,
{ bucket : dbutils . StorageChangeSetBucket , keys : i . k . StorageChangeSet } ,
2020-02-12 13:52:59 +00:00
{ bucket : dbutils . IntermediateTrieHashBucket , keys : i . k . IntermediateTrieHashKeys } ,
}
return i
2019-05-27 13:51:49 +00:00
}
type limitIterator struct {
k * keysToRemove
counter uint64
currentBucket [ ] byte
currentNum int
limit int
2020-02-12 13:52:59 +00:00
batches [ ] Batch
2019-05-27 13:51:49 +00:00
}
func ( i * limitIterator ) GetNext ( ) ( [ ] byte , [ ] byte , bool ) {
if i . limit <= i . currentNum {
return nil , nil , false
}
i . updateBucket ( )
if ! i . HasMore ( ) {
return nil , nil , false
}
defer func ( ) {
i . currentNum ++
i . counter ++
} ( )
2020-02-12 13:52:59 +00:00
for batchIndex , batch := range i . batches {
if batchIndex == len ( i . batches ) - 1 {
break
}
if bytes . Equal ( i . currentBucket , batch . bucket ) {
return batch . keys [ i . currentNum ] , batch . bucket , true
}
2019-05-27 13:51:49 +00:00
}
return nil , nil , false
}
func ( i * limitIterator ) ResetLimit ( ) {
i . counter = 0
}
func ( i * limitIterator ) HasMore ( ) bool {
2020-02-12 13:52:59 +00:00
lastBatch := i . batches [ len ( i . batches ) - 1 ]
if bytes . Equal ( i . currentBucket , lastBatch . bucket ) && len ( lastBatch . keys ) == i . currentNum {
2019-05-27 13:51:49 +00:00
return false
}
2020-04-11 13:28:15 +00:00
2019-05-27 13:51:49 +00:00
return true
}
func ( i * limitIterator ) updateBucket ( ) {
if i . currentBucket == nil {
2020-02-12 13:52:59 +00:00
i . currentBucket = i . batches [ 0 ] . bucket
2019-05-27 13:51:49 +00:00
}
2020-02-12 13:52:59 +00:00
for batchIndex , batch := range i . batches {
if batchIndex == len ( i . batches ) - 1 {
break
}
2020-01-16 21:21:40 +00:00
2020-02-12 13:52:59 +00:00
if bytes . Equal ( i . currentBucket , batch . bucket ) && len ( batch . keys ) == i . currentNum {
i . currentBucket = i . batches [ batchIndex + 1 ] . bucket
i . currentNum = 0
}
2019-05-27 13:51:49 +00:00
}
}