// Copyright (c) 2012, Suryandaru Triandana // All rights reserved. // // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. package leveldb import ( "encoding/binary" "fmt" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/memdb" "github.com/syndtr/goleveldb/leveldb/storage" ) // ErrBatchCorrupted records reason of batch corruption. type ErrBatchCorrupted struct { Reason string } func (e *ErrBatchCorrupted) Error() string { return fmt.Sprintf("leveldb: batch corrupted: %s", e.Reason) } func newErrBatchCorrupted(reason string) error { return errors.NewErrCorrupted(storage.FileDesc{}, &ErrBatchCorrupted{reason}) } const ( batchHdrLen = 8 + 4 batchGrowRec = 3000 ) // BatchReplay wraps basic batch operations. type BatchReplay interface { Put(key, value []byte) Delete(key []byte) } // Batch is a write batch. type Batch struct { data []byte rLen, bLen int seq uint64 sync bool } func (b *Batch) grow(n int) { off := len(b.data) if off == 0 { off = batchHdrLen if b.data != nil { b.data = b.data[:off] } } if cap(b.data)-off < n { if b.data == nil { b.data = make([]byte, off, off+n) } else { odata := b.data div := 1 if b.rLen > batchGrowRec { div = b.rLen / batchGrowRec } b.data = make([]byte, off, off+n+(off-batchHdrLen)/div) copy(b.data, odata) } } } func (b *Batch) appendRec(kt keyType, key, value []byte) { n := 1 + binary.MaxVarintLen32 + len(key) if kt == keyTypeVal { n += binary.MaxVarintLen32 + len(value) } b.grow(n) off := len(b.data) data := b.data[:off+n] data[off] = byte(kt) off++ off += binary.PutUvarint(data[off:], uint64(len(key))) copy(data[off:], key) off += len(key) if kt == keyTypeVal { off += binary.PutUvarint(data[off:], uint64(len(value))) copy(data[off:], value) off += len(value) } b.data = data[:off] b.rLen++ // Include 8-byte ikey header b.bLen += len(key) + len(value) + 8 } // Put appends 'put operation' of the given key/value pair to the batch. // It is safe to modify the contents of the argument after Put returns. func (b *Batch) Put(key, value []byte) { b.appendRec(keyTypeVal, key, value) } // Delete appends 'delete operation' of the given key to the batch. // It is safe to modify the contents of the argument after Delete returns. func (b *Batch) Delete(key []byte) { b.appendRec(keyTypeDel, key, nil) } // Dump dumps batch contents. The returned slice can be loaded into the // batch using Load method. // The returned slice is not its own copy, so the contents should not be // modified. func (b *Batch) Dump() []byte { return b.encode() } // Load loads given slice into the batch. Previous contents of the batch // will be discarded. // The given slice will not be copied and will be used as batch buffer, so // it is not safe to modify the contents of the slice. func (b *Batch) Load(data []byte) error { return b.decode(0, data) } // Replay replays batch contents. func (b *Batch) Replay(r BatchReplay) error { return b.decodeRec(func(i int, kt keyType, key, value []byte) error { switch kt { case keyTypeVal: r.Put(key, value) case keyTypeDel: r.Delete(key) } return nil }) } // Len returns number of records in the batch. func (b *Batch) Len() int { return b.rLen } // Reset resets the batch. func (b *Batch) Reset() { b.data = b.data[:0] b.seq = 0 b.rLen = 0 b.bLen = 0 b.sync = false } func (b *Batch) init(sync bool) { b.sync = sync } func (b *Batch) append(p *Batch) { if p.rLen > 0 { b.grow(len(p.data) - batchHdrLen) b.data = append(b.data, p.data[batchHdrLen:]...) b.rLen += p.rLen } if p.sync { b.sync = true } } // size returns sums of key/value pair length plus 8-bytes ikey. func (b *Batch) size() int { return b.bLen } func (b *Batch) encode() []byte { b.grow(0) binary.LittleEndian.PutUint64(b.data, b.seq) binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen)) return b.data } func (b *Batch) decode(prevSeq uint64, data []byte) error { if len(data) < batchHdrLen { return newErrBatchCorrupted("too short") } b.seq = binary.LittleEndian.Uint64(data) if b.seq < prevSeq { return newErrBatchCorrupted("invalid sequence number") } b.rLen = int(binary.LittleEndian.Uint32(data[8:])) if b.rLen < 0 { return newErrBatchCorrupted("invalid records length") } // No need to be precise at this point, it won't be used anyway b.bLen = len(data) - batchHdrLen b.data = data return nil } func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error { off := batchHdrLen for i := 0; i < b.rLen; i++ { if off >= len(b.data) { return newErrBatchCorrupted("invalid records length") } kt := keyType(b.data[off]) if kt > keyTypeVal { panic(kt) return newErrBatchCorrupted("bad record: invalid type") } off++ x, n := binary.Uvarint(b.data[off:]) off += n if n <= 0 || off+int(x) > len(b.data) { return newErrBatchCorrupted("bad record: invalid key length") } key := b.data[off : off+int(x)] off += int(x) var value []byte if kt == keyTypeVal { x, n := binary.Uvarint(b.data[off:]) off += n if n <= 0 || off+int(x) > len(b.data) { return newErrBatchCorrupted("bad record: invalid value length") } value = b.data[off : off+int(x)] off += int(x) } if err := f(i, kt, key, value); err != nil { return err } } return nil } func (b *Batch) memReplay(to *memdb.DB) error { var ikScratch []byte return b.decodeRec(func(i int, kt keyType, key, value []byte) error { ikScratch = makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) return to.Put(ikScratch, value) }) } func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error { if err := b.decode(prevSeq, data); err != nil { return err } return b.memReplay(to) } func (b *Batch) revertMemReplay(to *memdb.DB) error { var ikScratch []byte return b.decodeRec(func(i int, kt keyType, key, value []byte) error { ikScratch := makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) return to.Delete(ikScratch) }) }