diff --git a/swarm/shed/db.go b/swarm/shed/db.go index e128b8cbc..7377e12d2 100644 --- a/swarm/shed/db.go +++ b/swarm/shed/db.go @@ -23,14 +23,23 @@ package shed import ( + "errors" + "fmt" + "strconv" + "strings" + "time" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/log" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" ) -// The limit for LevelDB OpenFilesCacheCapacity. -const openFileLimit = 128 +const ( + openFileLimit = 128 // The limit for LevelDB OpenFilesCacheCapacity. + writePauseWarningThrottler = 1 * time.Minute +) // DB provides abstractions over LevelDB in order to // implement complex structures using fields and ordered indexes. @@ -38,11 +47,22 @@ const openFileLimit = 128 // information about naming and types. type DB struct { ldb *leveldb.DB + + compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction + compReadMeter metrics.Meter // Meter for measuring the data read during compaction + compWriteMeter metrics.Meter // Meter for measuring the data written during compaction + writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction + writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction + diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read + diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written + + quitChan chan chan error // Quit channel to stop the metrics collection before closing the database } // NewDB constructs a new DB and validates the schema // if it exists in database on the given path. -func NewDB(path string) (db *DB, err error) { +// metricsPrefix is used for metrics collection for the given DB. +func NewDB(path string, metricsPrefix string) (db *DB, err error) { ldb, err := leveldb.OpenFile(path, &opt.Options{ OpenFilesCacheCapacity: openFileLimit, }) @@ -66,6 +86,15 @@ func NewDB(path string) (db *DB, err error) { return nil, err } } + + // Configure meters for DB + db.configure(metricsPrefix) + + // Create a quit channel for the periodic metrics collector and run it + db.quitChan = make(chan chan error) + + go db.meter(10 * time.Second) + return db, nil } @@ -126,5 +155,175 @@ func (db *DB) WriteBatch(batch *leveldb.Batch) (err error) { // Close closes LevelDB database. func (db *DB) Close() (err error) { + close(db.quitChan) return db.ldb.Close() } + +// Configure configures the database metrics collectors +func (db *DB) configure(prefix string) { + // Initialize all the metrics collector at the requested prefix + db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil) + db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil) + db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil) + db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil) + db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil) + db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil) + db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil) +} + +func (db *DB) meter(refresh time.Duration) { + // Create the counters to store current and previous compaction values + compactions := make([][]float64, 2) + for i := 0; i < 2; i++ { + compactions[i] = make([]float64, 3) + } + // Create storage for iostats. + var iostats [2]float64 + + // Create storage and warning log tracer for write delay. + var ( + delaystats [2]int64 + lastWritePaused time.Time + ) + + var ( + errc chan error + merr error + ) + + // Iterate ad infinitum and collect the stats + for i := 1; errc == nil && merr == nil; i++ { + // Retrieve the database stats + stats, err := db.ldb.GetProperty("leveldb.stats") + if err != nil { + log.Error("Failed to read database stats", "err", err) + merr = err + continue + } + // Find the compaction table, skip the header + lines := strings.Split(stats, "\n") + for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" { + lines = lines[1:] + } + if len(lines) <= 3 { + log.Error("Compaction table not found") + merr = errors.New("compaction table not found") + continue + } + lines = lines[3:] + + // Iterate over all the table rows, and accumulate the entries + for j := 0; j < len(compactions[i%2]); j++ { + compactions[i%2][j] = 0 + } + for _, line := range lines { + parts := strings.Split(line, "|") + if len(parts) != 6 { + break + } + for idx, counter := range parts[3:] { + value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64) + if err != nil { + log.Error("Compaction entry parsing failed", "err", err) + merr = err + continue + } + compactions[i%2][idx] += value + } + } + // Update all the requested meters + if db.compTimeMeter != nil { + db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000)) + } + if db.compReadMeter != nil { + db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024)) + } + if db.compWriteMeter != nil { + db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) + } + + // Retrieve the write delay statistic + writedelay, err := db.ldb.GetProperty("leveldb.writedelay") + if err != nil { + log.Error("Failed to read database write delay statistic", "err", err) + merr = err + continue + } + var ( + delayN int64 + delayDuration string + duration time.Duration + paused bool + ) + if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil { + log.Error("Write delay statistic not found") + merr = err + continue + } + duration, err = time.ParseDuration(delayDuration) + if err != nil { + log.Error("Failed to parse delay duration", "err", err) + merr = err + continue + } + if db.writeDelayNMeter != nil { + db.writeDelayNMeter.Mark(delayN - delaystats[0]) + } + if db.writeDelayMeter != nil { + db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1]) + } + // If a warning that db is performing compaction has been displayed, any subsequent + // warnings will be withheld for one minute not to overwhelm the user. + if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 && + time.Now().After(lastWritePaused.Add(writePauseWarningThrottler)) { + log.Warn("Database compacting, degraded performance") + lastWritePaused = time.Now() + } + delaystats[0], delaystats[1] = delayN, duration.Nanoseconds() + + // Retrieve the database iostats. + ioStats, err := db.ldb.GetProperty("leveldb.iostats") + if err != nil { + log.Error("Failed to read database iostats", "err", err) + merr = err + continue + } + var nRead, nWrite float64 + parts := strings.Split(ioStats, " ") + if len(parts) < 2 { + log.Error("Bad syntax of ioStats", "ioStats", ioStats) + merr = fmt.Errorf("bad syntax of ioStats %s", ioStats) + continue + } + if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil { + log.Error("Bad syntax of read entry", "entry", parts[0]) + merr = err + continue + } + if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil { + log.Error("Bad syntax of write entry", "entry", parts[1]) + merr = err + continue + } + if db.diskReadMeter != nil { + db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024)) + } + if db.diskWriteMeter != nil { + db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024)) + } + iostats[0], iostats[1] = nRead, nWrite + + // Sleep a bit, then repeat the stats collection + select { + case errc = <-db.quitChan: + // Quit requesting, stop hammering the database + case <-time.After(refresh): + // Timeout, gather a new set of stats + } + } + + if errc == nil { + errc = <-db.quitChan + } + errc <- merr +} diff --git a/swarm/shed/db_test.go b/swarm/shed/db_test.go index 45325beeb..65fdac4a6 100644 --- a/swarm/shed/db_test.go +++ b/swarm/shed/db_test.go @@ -55,7 +55,7 @@ func TestDB_persistence(t *testing.T) { } defer os.RemoveAll(dir) - db, err := NewDB(dir) + db, err := NewDB(dir, "") if err != nil { t.Fatal(err) } @@ -73,7 +73,7 @@ func TestDB_persistence(t *testing.T) { t.Fatal(err) } - db2, err := NewDB(dir) + db2, err := NewDB(dir, "") if err != nil { t.Fatal(err) } @@ -101,7 +101,7 @@ func newTestDB(t *testing.T) (db *DB, cleanupFunc func()) { t.Fatal(err) } cleanupFunc = func() { os.RemoveAll(dir) } - db, err = NewDB(dir) + db, err = NewDB(dir, "") if err != nil { cleanupFunc() t.Fatal(err) diff --git a/swarm/shed/example_store_test.go b/swarm/shed/example_store_test.go index 2ed0be141..908a1e446 100644 --- a/swarm/shed/example_store_test.go +++ b/swarm/shed/example_store_test.go @@ -52,7 +52,7 @@ type Store struct { // and possible conflicts with schema from existing database is checked // automatically. func New(path string) (s *Store, err error) { - db, err := shed.NewDB(path) + db, err := shed.NewDB(path, "") if err != nil { return nil, err }