467 lines
9.1 KiB
Go
467 lines
9.1 KiB
Go
// Copyright 2015 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package localstore
|
|
|
|
import (
|
|
"net/url"
|
|
"path/filepath"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/juju/errors"
|
|
"github.com/ngaut/log"
|
|
"github.com/pingcap/tidb/kv"
|
|
"github.com/pingcap/tidb/store/localstore/engine"
|
|
"github.com/pingcap/tidb/util/segmentmap"
|
|
"github.com/twinj/uuid"
|
|
)
|
|
|
|
var (
|
|
_ kv.Storage = (*dbStore)(nil)
|
|
)
|
|
|
|
type op int
|
|
|
|
const (
|
|
opSeek = iota + 1
|
|
opCommit
|
|
)
|
|
|
|
const (
|
|
maxSeekWorkers = 3
|
|
|
|
lowerWaterMark = 10 // second
|
|
)
|
|
|
|
type command struct {
|
|
op op
|
|
txn *dbTxn
|
|
args interface{}
|
|
reply interface{}
|
|
done chan error
|
|
}
|
|
|
|
type seekReply struct {
|
|
key []byte
|
|
value []byte
|
|
}
|
|
|
|
type commitReply struct {
|
|
err error
|
|
}
|
|
|
|
type seekArgs struct {
|
|
key []byte
|
|
}
|
|
|
|
type commitArgs struct {
|
|
}
|
|
|
|
// Seek searches for the first key in the engine which is >= key in byte order, returns (nil, nil, ErrNotFound)
|
|
// if such key is not found.
|
|
func (s *dbStore) Seek(key []byte) ([]byte, []byte, error) {
|
|
c := &command{
|
|
op: opSeek,
|
|
args: &seekArgs{key: key},
|
|
done: make(chan error, 1),
|
|
}
|
|
|
|
s.commandCh <- c
|
|
err := <-c.done
|
|
if err != nil {
|
|
return nil, nil, errors.Trace(err)
|
|
}
|
|
|
|
reply := c.reply.(*seekReply)
|
|
return reply.key, reply.value, nil
|
|
}
|
|
|
|
// Commit writes the changed data in Batch.
|
|
func (s *dbStore) CommitTxn(txn *dbTxn) error {
|
|
if len(txn.lockedKeys) == 0 {
|
|
return nil
|
|
}
|
|
c := &command{
|
|
op: opCommit,
|
|
txn: txn,
|
|
args: &commitArgs{},
|
|
done: make(chan error, 1),
|
|
}
|
|
|
|
s.commandCh <- c
|
|
err := <-c.done
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
func (s *dbStore) seekWorker(wg *sync.WaitGroup, seekCh chan *command) {
|
|
defer wg.Done()
|
|
for {
|
|
var pending []*command
|
|
select {
|
|
case cmd, ok := <-seekCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
pending = append(pending, cmd)
|
|
L:
|
|
for {
|
|
select {
|
|
case cmd, ok := <-seekCh:
|
|
if !ok {
|
|
break L
|
|
}
|
|
pending = append(pending, cmd)
|
|
default:
|
|
break L
|
|
}
|
|
}
|
|
}
|
|
|
|
s.doSeek(pending)
|
|
}
|
|
}
|
|
|
|
func (s *dbStore) scheduler() {
|
|
closed := false
|
|
seekCh := make(chan *command, 1000)
|
|
wgSeekWorkers := &sync.WaitGroup{}
|
|
wgSeekWorkers.Add(maxSeekWorkers)
|
|
for i := 0; i < maxSeekWorkers; i++ {
|
|
go s.seekWorker(wgSeekWorkers, seekCh)
|
|
}
|
|
|
|
segmentIndex := int64(0)
|
|
|
|
tick := time.NewTicker(time.Second)
|
|
defer tick.Stop()
|
|
|
|
for {
|
|
select {
|
|
case cmd := <-s.commandCh:
|
|
if closed {
|
|
cmd.done <- ErrDBClosed
|
|
continue
|
|
}
|
|
switch cmd.op {
|
|
case opSeek:
|
|
seekCh <- cmd
|
|
case opCommit:
|
|
s.doCommit(cmd)
|
|
}
|
|
case <-s.closeCh:
|
|
closed = true
|
|
// notify seek worker to exit
|
|
close(seekCh)
|
|
wgSeekWorkers.Wait()
|
|
s.wg.Done()
|
|
case <-tick.C:
|
|
segmentIndex = segmentIndex % s.recentUpdates.SegmentCount()
|
|
s.cleanRecentUpdates(segmentIndex)
|
|
segmentIndex++
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *dbStore) cleanRecentUpdates(segmentIndex int64) {
|
|
m, err := s.recentUpdates.GetSegment(segmentIndex)
|
|
if err != nil {
|
|
log.Error(err)
|
|
return
|
|
}
|
|
|
|
now := time.Now().Unix()
|
|
for k, v := range m {
|
|
dis := now - version2Second(v.(kv.Version))
|
|
if dis > lowerWaterMark {
|
|
delete(m, k)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *dbStore) tryLock(txn *dbTxn) (err error) {
|
|
// check conflict
|
|
for k := range txn.lockedKeys {
|
|
if _, ok := s.keysLocked[k]; ok {
|
|
return errors.Trace(kv.ErrLockConflict)
|
|
}
|
|
|
|
lastVer, ok := s.recentUpdates.Get([]byte(k))
|
|
if !ok {
|
|
continue
|
|
}
|
|
// If there's newer version of this key, returns error.
|
|
if lastVer.(kv.Version).Cmp(kv.Version{Ver: txn.tid}) > 0 {
|
|
return errors.Trace(kv.ErrConditionNotMatch)
|
|
}
|
|
}
|
|
|
|
// record
|
|
for k := range txn.lockedKeys {
|
|
s.keysLocked[k] = txn.tid
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *dbStore) doCommit(cmd *command) {
|
|
txn := cmd.txn
|
|
curVer, err := globalVersionProvider.CurrentVersion()
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
err = s.tryLock(txn)
|
|
if err != nil {
|
|
cmd.done <- errors.Trace(err)
|
|
return
|
|
}
|
|
// Update commit version.
|
|
txn.version = curVer
|
|
b := s.db.NewBatch()
|
|
txn.us.WalkBuffer(func(k kv.Key, value []byte) error {
|
|
mvccKey := MvccEncodeVersionKey(kv.Key(k), curVer)
|
|
if len(value) == 0 { // Deleted marker
|
|
b.Put(mvccKey, nil)
|
|
s.compactor.OnDelete(k)
|
|
} else {
|
|
b.Put(mvccKey, value)
|
|
s.compactor.OnSet(k)
|
|
}
|
|
return nil
|
|
})
|
|
err = s.writeBatch(b)
|
|
s.unLockKeys(txn)
|
|
cmd.done <- errors.Trace(err)
|
|
}
|
|
|
|
func (s *dbStore) doSeek(seekCmds []*command) {
|
|
keys := make([][]byte, 0, len(seekCmds))
|
|
for _, cmd := range seekCmds {
|
|
keys = append(keys, cmd.args.(*seekArgs).key)
|
|
}
|
|
|
|
results := s.db.MultiSeek(keys)
|
|
|
|
for i, cmd := range seekCmds {
|
|
reply := &seekReply{}
|
|
var err error
|
|
reply.key, reply.value, err = results[i].Key, results[i].Value, results[i].Err
|
|
cmd.reply = reply
|
|
cmd.done <- errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
func (s *dbStore) NewBatch() engine.Batch {
|
|
return s.db.NewBatch()
|
|
}
|
|
|
|
type dbStore struct {
|
|
db engine.DB
|
|
|
|
txns map[uint64]*dbTxn
|
|
keysLocked map[string]uint64
|
|
// TODO: clean up recentUpdates
|
|
recentUpdates *segmentmap.SegmentMap
|
|
uuid string
|
|
path string
|
|
compactor *localstoreCompactor
|
|
wg *sync.WaitGroup
|
|
|
|
commandCh chan *command
|
|
closeCh chan struct{}
|
|
|
|
mu sync.Mutex
|
|
closed bool
|
|
}
|
|
|
|
type storeCache struct {
|
|
mu sync.Mutex
|
|
cache map[string]*dbStore
|
|
}
|
|
|
|
var (
|
|
globalVersionProvider kv.VersionProvider
|
|
mc storeCache
|
|
|
|
// ErrDBClosed is the error meaning db is closed and we can use it anymore.
|
|
ErrDBClosed = errors.New("db is closed")
|
|
)
|
|
|
|
func init() {
|
|
mc.cache = make(map[string]*dbStore)
|
|
globalVersionProvider = &LocalVersionProvider{}
|
|
}
|
|
|
|
// Driver implements kv.Driver interface.
|
|
type Driver struct {
|
|
// engine.Driver is the engine driver for different local db engine.
|
|
engine.Driver
|
|
}
|
|
|
|
// IsLocalStore checks whether a storage is local or not.
|
|
func IsLocalStore(s kv.Storage) bool {
|
|
_, ok := s.(*dbStore)
|
|
return ok
|
|
}
|
|
|
|
// Open opens or creates a storage with specific format for a local engine Driver.
|
|
// The path should be a URL format which is described in tidb package.
|
|
func (d Driver) Open(path string) (kv.Storage, error) {
|
|
mc.mu.Lock()
|
|
defer mc.mu.Unlock()
|
|
|
|
u, err := url.Parse(path)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
engineSchema := filepath.Join(u.Host, u.Path)
|
|
if store, ok := mc.cache[engineSchema]; ok {
|
|
// TODO: check the cache store has the same engine with this Driver.
|
|
log.Info("[kv] cache store", engineSchema)
|
|
return store, nil
|
|
}
|
|
|
|
db, err := d.Driver.Open(engineSchema)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
log.Info("[kv] New store", engineSchema)
|
|
s := &dbStore{
|
|
txns: make(map[uint64]*dbTxn),
|
|
keysLocked: make(map[string]uint64),
|
|
uuid: uuid.NewV4().String(),
|
|
path: engineSchema,
|
|
db: db,
|
|
compactor: newLocalCompactor(localCompactDefaultPolicy, db),
|
|
commandCh: make(chan *command, 1000),
|
|
closed: false,
|
|
closeCh: make(chan struct{}),
|
|
wg: &sync.WaitGroup{},
|
|
}
|
|
s.recentUpdates, err = segmentmap.NewSegmentMap(100)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
|
|
}
|
|
mc.cache[engineSchema] = s
|
|
s.compactor.Start()
|
|
s.wg.Add(1)
|
|
go s.scheduler()
|
|
return s, nil
|
|
}
|
|
|
|
func (s *dbStore) UUID() string {
|
|
return s.uuid
|
|
}
|
|
|
|
func (s *dbStore) GetSnapshot(ver kv.Version) (kv.Snapshot, error) {
|
|
s.mu.Lock()
|
|
if s.closed {
|
|
s.mu.Unlock()
|
|
return nil, ErrDBClosed
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
currentVer, err := globalVersionProvider.CurrentVersion()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
if ver.Cmp(currentVer) > 0 {
|
|
ver = currentVer
|
|
}
|
|
|
|
return &dbSnapshot{
|
|
store: s,
|
|
version: ver,
|
|
}, nil
|
|
}
|
|
|
|
func (s *dbStore) CurrentVersion() (kv.Version, error) {
|
|
return globalVersionProvider.CurrentVersion()
|
|
}
|
|
|
|
// Begin transaction
|
|
func (s *dbStore) Begin() (kv.Transaction, error) {
|
|
s.mu.Lock()
|
|
if s.closed {
|
|
s.mu.Unlock()
|
|
return nil, ErrDBClosed
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
beginVer, err := globalVersionProvider.CurrentVersion()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
return newTxn(s, beginVer), nil
|
|
}
|
|
|
|
func (s *dbStore) Close() error {
|
|
s.mu.Lock()
|
|
if s.closed {
|
|
s.mu.Unlock()
|
|
return ErrDBClosed
|
|
}
|
|
|
|
s.closed = true
|
|
s.mu.Unlock()
|
|
|
|
mc.mu.Lock()
|
|
defer mc.mu.Unlock()
|
|
s.compactor.Stop()
|
|
s.closeCh <- struct{}{}
|
|
s.wg.Wait()
|
|
delete(mc.cache, s.path)
|
|
return s.db.Close()
|
|
}
|
|
|
|
func (s *dbStore) writeBatch(b engine.Batch) error {
|
|
if b.Len() == 0 {
|
|
return nil
|
|
}
|
|
|
|
if s.closed {
|
|
return errors.Trace(ErrDBClosed)
|
|
}
|
|
|
|
err := s.db.Commit(b)
|
|
if err != nil {
|
|
log.Error(err)
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *dbStore) newBatch() engine.Batch {
|
|
return s.db.NewBatch()
|
|
}
|
|
func (s *dbStore) unLockKeys(txn *dbTxn) error {
|
|
for k := range txn.lockedKeys {
|
|
if tid, ok := s.keysLocked[k]; !ok || tid != txn.tid {
|
|
debug.PrintStack()
|
|
log.Fatalf("should never happend:%v, %v", tid, txn.tid)
|
|
}
|
|
|
|
delete(s.keysLocked, k)
|
|
s.recentUpdates.Set([]byte(k), txn.version, true)
|
|
}
|
|
|
|
return nil
|
|
}
|