sub-transaction to not call fsync, sub-transactions to not call runtime.LockThread() (#945)

This commit is contained in:
Alex Sharov 2020-08-21 13:31:43 +07:00 committed by GitHub
parent f46a72bdd9
commit ebbaa55672
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -215,8 +215,12 @@ func (db *LmdbKV) Begin(ctx context.Context, parent Tx, writable bool) (Tx, erro
if db.env == nil {
return nil, fmt.Errorf("db closed")
}
runtime.LockOSThread()
db.wg.Add(1)
isSubTx := parent != nil
if !isSubTx {
runtime.LockOSThread()
db.wg.Add(1)
}
flags := uint(0)
if !writable {
flags |= lmdb.Readonly
@ -227,18 +231,22 @@ func (db *LmdbKV) Begin(ctx context.Context, parent Tx, writable bool) (Tx, erro
}
tx, err := db.env.BeginTxn(parentTx, flags)
if err != nil {
runtime.UnlockOSThread() // unlock only in case of error. normal flow is "defer .Rollback()"
if !isSubTx {
runtime.UnlockOSThread() // unlock only in case of error. normal flow is "defer .Rollback()"
}
return nil, err
}
tx.RawRead = true
return &lmdbTx{
db: db,
ctx: ctx,
tx: tx,
db: db,
ctx: ctx,
tx: tx,
isSubTx: isSubTx,
}, nil
}
type lmdbTx struct {
isSubTx bool
tx *lmdb.Txn
ctx context.Context
db *LmdbKV
@ -402,8 +410,10 @@ func (tx *lmdbTx) Commit(ctx context.Context) error {
}
defer func() {
tx.tx = nil
tx.db.wg.Done()
runtime.UnlockOSThread()
if !tx.isSubTx {
tx.db.wg.Done()
runtime.UnlockOSThread()
}
}()
tx.closeCursors()
@ -416,13 +426,15 @@ func (tx *lmdbTx) Commit(ctx context.Context) error {
log.Info("Batch", "commit", commitTook)
}
fsyncTimer := time.Now()
if err := tx.db.env.Sync(true); err != nil {
log.Warn("fsync after commit failed: \n", err)
}
fsyncTook := time.Since(fsyncTimer)
if fsyncTook > 1*time.Second {
log.Info("Batch", "fsync", fsyncTook)
if !tx.isSubTx { // call fsync only after main transaction commit
fsyncTimer := time.Now()
if err := tx.db.env.Sync(true); err != nil {
log.Warn("fsync after commit failed: \n", err)
}
fsyncTook := time.Since(fsyncTimer)
if fsyncTook > 1*time.Second {
log.Info("Batch", "fsync", fsyncTook)
}
}
return nil
}
@ -434,13 +446,12 @@ func (tx *lmdbTx) Rollback() {
if tx.tx == nil {
return
}
if tx.tx == nil {
return
}
defer func() {
tx.tx = nil
tx.db.wg.Done()
runtime.UnlockOSThread()
if !tx.isSubTx {
tx.db.wg.Done()
runtime.UnlockOSThread()
}
}()
tx.closeCursors()
tx.tx.Abort()