gitroast/vendor/github.com/pingcap/go-themis/themis_lock_manager.go

148 lines
3.7 KiB
Go

package themis
import (
"bytes"
"encoding/binary"
"math"
"strings"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/go-hbase"
)
var _ LockManager = (*themisLockManager)(nil)
type themisLockManager struct {
rpc *themisRPC
hbaseClient hbase.HBaseClient
}
func newThemisLockManager(rpc *themisRPC, hbaseCli hbase.HBaseClient) LockManager {
return &themisLockManager{
rpc: rpc,
hbaseClient: hbaseCli,
}
}
func getDataColFromMetaCol(lockOrWriteCol hbase.Column) hbase.Column {
// get data column from lock column
// key is like => L:family#qual, #p:family#qual
parts := strings.Split(string(lockOrWriteCol.Qual), "#")
if len(parts) != 2 {
return lockOrWriteCol
}
c := hbase.Column{
Family: []byte(parts[0]),
Qual: []byte(parts[1]),
}
return c
}
func getLocksFromResults(tbl []byte, lockKvs []*hbase.Kv, client *themisRPC) ([]Lock, error) {
var locks []Lock
for _, kv := range lockKvs {
col := &hbase.ColumnCoordinate{
Table: tbl,
Row: kv.Row,
Column: hbase.Column{
Family: kv.Family,
Qual: kv.Qual,
},
}
if !isLockColumn(col.Column) {
return nil, errors.New("invalid lock")
}
l, err := parseLockFromBytes(kv.Value)
if err != nil {
return nil, errors.Trace(err)
}
cc := &hbase.ColumnCoordinate{
Table: tbl,
Row: kv.Row,
Column: getDataColFromMetaCol(col.Column),
}
l.SetCoordinate(cc)
client.checkAndSetLockIsExpired(l)
locks = append(locks, l)
}
return locks, nil
}
func (m *themisLockManager) IsLockExists(cc *hbase.ColumnCoordinate, startTs, endTs uint64) (bool, error) {
get := hbase.NewGet(cc.Row)
get.AddTimeRange(startTs, endTs+1)
get.AddStringColumn(string(LockFamilyName), string(cc.Family)+"#"+string(cc.Qual))
// check if lock exists
rs, err := m.hbaseClient.Get(string(cc.Table), get)
if err != nil {
return false, errors.Trace(err)
}
// primary lock has been released
if rs == nil {
return false, nil
}
return true, nil
}
func (m *themisLockManager) GetCommitTimestamp(cc *hbase.ColumnCoordinate, prewriteTs uint64) (uint64, error) {
g := hbase.NewGet(cc.Row)
// add put write column
qual := string(cc.Family) + "#" + string(cc.Qual)
g.AddStringColumn("#p", qual)
// add del write column
g.AddStringColumn("#d", qual)
// time range => [ours startTs, +Inf)
g.AddTimeRange(prewriteTs, math.MaxInt64)
g.SetMaxVersion(math.MaxInt32)
r, err := m.hbaseClient.Get(string(cc.Table), g)
if err != nil {
return 0, errors.Trace(err)
}
// may delete by other client
if r == nil {
return 0, nil
}
for _, kv := range r.SortedColumns {
for commitTs, val := range kv.Values {
var ts uint64
binary.Read(bytes.NewBuffer(val), binary.BigEndian, &ts)
if ts == prewriteTs {
// get this commit's commitTs
return commitTs, nil
}
}
}
// no such transction
return 0, nil
}
func (m *themisLockManager) CleanLock(cc *hbase.ColumnCoordinate, prewriteTs uint64) (uint64, Lock, error) {
l, err := m.rpc.getLockAndErase(cc, prewriteTs)
if err != nil {
return 0, nil, errors.Trace(err)
}
pl, _ := l.(*themisPrimaryLock)
// if primary lock is nil, means someothers have already committed
if pl == nil {
commitTs, err := m.GetCommitTimestamp(cc, prewriteTs)
if err != nil {
return 0, nil, errors.Trace(err)
}
return commitTs, nil, nil
}
return 0, pl, nil
}
func (m *themisLockManager) EraseLockAndData(cc *hbase.ColumnCoordinate, prewriteTs uint64) error {
log.Debugf("erase row=%q txn=%d", cc.Row, prewriteTs)
d := hbase.NewDelete(cc.Row)
d.AddColumnWithTimestamp(LockFamilyName, []byte(string(cc.Family)+"#"+string(cc.Qual)), prewriteTs)
d.AddColumnWithTimestamp(cc.Family, cc.Qual, prewriteTs)
ok, err := m.hbaseClient.Delete(string(cc.Table), d)
if !ok {
log.Error(err)
}
return errors.Trace(err)
}