tidb txn_driver 源码

  • 2022-09-19
  • 浏览 (267)

tidb txn_driver 代码

文件路径:/store/driver/txn/txn_driver.go

// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package txn

import (
	"context"
	"sync/atomic"

	"github.com/opentracing/opentracing-go"
	"github.com/pingcap/errors"
	"github.com/pingcap/kvproto/pkg/kvrpcpb"
	"github.com/pingcap/kvproto/pkg/metapb"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/parser/model"
	"github.com/pingcap/tidb/sessionctx/binloginfo"
	derr "github.com/pingcap/tidb/store/driver/error"
	"github.com/pingcap/tidb/store/driver/options"
	"github.com/pingcap/tidb/tablecodec"
	"github.com/pingcap/tidb/util/logutil"
	tikverr "github.com/tikv/client-go/v2/error"
	tikvstore "github.com/tikv/client-go/v2/kv"
	"github.com/tikv/client-go/v2/tikv"
	"github.com/tikv/client-go/v2/tikvrpc"
	"github.com/tikv/client-go/v2/tikvrpc/interceptor"
	"github.com/tikv/client-go/v2/txnkv"
	"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
	"go.uber.org/zap"
)

type tikvTxn struct {
	*tikv.KVTxn
	idxNameCache        map[int64]*model.TableInfo
	snapshotInterceptor kv.SnapshotInterceptor
	// columnMapsCache is a cache used for the mutation checker
	columnMapsCache interface{}
}

// NewTiKVTxn returns a new Transaction.
func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction {
	txn.SetKVFilter(TiDBKVFilter{})

	entryLimit := atomic.LoadUint64(&kv.TxnEntrySizeLimit)
	totalLimit := atomic.LoadUint64(&kv.TxnTotalSizeLimit)
	txn.GetUnionStore().SetEntrySizeLimit(entryLimit, totalLimit)

	return &tikvTxn{txn, make(map[int64]*model.TableInfo), nil, nil}
}

func (txn *tikvTxn) GetTableInfo(id int64) *model.TableInfo {
	return txn.idxNameCache[id]
}

func (txn *tikvTxn) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) {
	txn.KVTxn.SetDiskFullOpt(level)
}

func (txn *tikvTxn) CacheTableInfo(id int64, info *model.TableInfo) {
	txn.idxNameCache[id] = info
}

func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput ...kv.Key) error {
	keys := toTiKVKeys(keysInput)
	err := txn.KVTxn.LockKeys(ctx, lockCtx, keys...)
	return txn.extractKeyErr(err)
}

func (txn *tikvTxn) Commit(ctx context.Context) error {
	err := txn.KVTxn.Commit(ctx)
	return txn.extractKeyErr(err)
}

func (txn *tikvTxn) GetMemDBCheckpoint() *tikv.MemDBCheckpoint {
	buf := txn.KVTxn.GetMemBuffer()
	return buf.Checkpoint()
}

func (txn *tikvTxn) RollbackMemDBToCheckpoint(savepoint *tikv.MemDBCheckpoint) {
	buf := txn.KVTxn.GetMemBuffer()
	buf.RevertToCheckpoint(savepoint)
}

// GetSnapshot returns the Snapshot binding to this transaction.
func (txn *tikvTxn) GetSnapshot() kv.Snapshot {
	return &tikvSnapshot{txn.KVTxn.GetSnapshot(), txn.snapshotInterceptor}
}

// Iter creates an Iterator positioned on the first entry that k <= entry's key.
// If such entry is not found, it returns an invalid Iterator with no error.
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
// The Iterator must be Closed after use.
func (txn *tikvTxn) Iter(k kv.Key, upperBound kv.Key) (iter kv.Iterator, err error) {
	var dirtyIter, snapIter kv.Iterator

	if dirtyIter, err = txn.GetMemBuffer().Iter(k, upperBound); err != nil {
		return nil, err
	}

	if snapIter, err = txn.GetSnapshot().Iter(k, upperBound); err != nil {
		dirtyIter.Close()
		return nil, err
	}

	iter, err = NewUnionIter(dirtyIter, snapIter, false)
	if err != nil {
		dirtyIter.Close()
		snapIter.Close()
	}

	return iter, err
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// The returned iterator will iterate from greater key to smaller key.
// If k is nil, the returned iterator will be positioned at the last key.
// TODO: Add lower bound limit
func (txn *tikvTxn) IterReverse(k kv.Key) (iter kv.Iterator, err error) {
	var dirtyIter, snapIter kv.Iterator

	if dirtyIter, err = txn.GetMemBuffer().IterReverse(k); err != nil {
		return nil, err
	}

	if snapIter, err = txn.GetSnapshot().IterReverse(k); err != nil {
		dirtyIter.Close()
		return nil, err
	}

	iter, err = NewUnionIter(dirtyIter, snapIter, true)
	if err != nil {
		dirtyIter.Close()
		snapIter.Close()
	}

	return iter, err
}

// BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage.
// Do not use len(value) == 0 or value == nil to represent non-exist.
// If a key doesn't exist, there shouldn't be any corresponding entry in the result map.
func (txn *tikvTxn) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) {
	if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
		span1 := span.Tracer().StartSpan("tikvTxn.BatchGet", opentracing.ChildOf(span.Context()))
		defer span1.Finish()
		ctx = opentracing.ContextWithSpan(ctx, span1)
	}
	return NewBufferBatchGetter(txn.GetMemBuffer(), nil, txn.GetSnapshot()).BatchGet(ctx, keys)
}

func (txn *tikvTxn) Delete(k kv.Key) error {
	err := txn.KVTxn.Delete(k)
	return derr.ToTiDBErr(err)
}

func (txn *tikvTxn) Get(ctx context.Context, k kv.Key) ([]byte, error) {
	val, err := txn.GetMemBuffer().Get(ctx, k)
	if kv.ErrNotExist.Equal(err) {
		val, err = txn.GetSnapshot().Get(ctx, k)
	}

	if err == nil && len(val) == 0 {
		return nil, kv.ErrNotExist
	}

	return val, err
}

func (txn *tikvTxn) Set(k kv.Key, v []byte) error {
	err := txn.KVTxn.Set(k, v)
	return derr.ToTiDBErr(err)
}

func (txn *tikvTxn) GetMemBuffer() kv.MemBuffer {
	return newMemBuffer(txn.KVTxn.GetMemBuffer())
}

func (txn *tikvTxn) SetOption(opt int, val interface{}) {
	switch opt {
	case kv.BinlogInfo:
		txn.SetBinlogExecutor(&binlogExecutor{
			txn:     txn.KVTxn,
			binInfo: val.(*binloginfo.BinlogInfo), // val cannot be other type.
		})
	case kv.SchemaChecker:
		txn.SetSchemaLeaseChecker(val.(tikv.SchemaLeaseChecker))
	case kv.IsolationLevel:
		level := getTiKVIsolationLevel(val.(kv.IsoLevel))
		txn.KVTxn.GetSnapshot().SetIsolationLevel(level)
	case kv.Priority:
		txn.KVTxn.SetPriority(getTiKVPriority(val.(int)))
	case kv.NotFillCache:
		txn.KVTxn.GetSnapshot().SetNotFillCache(val.(bool))
	case kv.Pessimistic:
		txn.SetPessimistic(val.(bool))
	case kv.SnapshotTS:
		txn.KVTxn.GetSnapshot().SetSnapshotTS(val.(uint64))
	case kv.ReplicaRead:
		t := options.GetTiKVReplicaReadType(val.(kv.ReplicaReadType))
		txn.KVTxn.GetSnapshot().SetReplicaRead(t)
	case kv.TaskID:
		txn.KVTxn.GetSnapshot().SetTaskID(val.(uint64))
	case kv.InfoSchema:
		txn.SetSchemaVer(val.(tikv.SchemaVer))
	case kv.CollectRuntimeStats:
		if val == nil {
			txn.KVTxn.GetSnapshot().SetRuntimeStats(nil)
		} else {
			txn.KVTxn.GetSnapshot().SetRuntimeStats(val.(*txnsnapshot.SnapshotRuntimeStats))
		}
	case kv.SchemaAmender:
		txn.SetSchemaAmender(val.(tikv.SchemaAmender))
	case kv.SampleStep:
		txn.KVTxn.GetSnapshot().SetSampleStep(val.(uint32))
	case kv.CommitHook:
		txn.SetCommitCallback(val.(func(string, error)))
	case kv.EnableAsyncCommit:
		txn.SetEnableAsyncCommit(val.(bool))
	case kv.Enable1PC:
		txn.SetEnable1PC(val.(bool))
	case kv.GuaranteeLinearizability:
		txn.SetCausalConsistency(!val.(bool))
	case kv.TxnScope:
		txn.SetScope(val.(string))
	case kv.IsStalenessReadOnly:
		txn.KVTxn.GetSnapshot().SetIsStalenessReadOnly(val.(bool))
	case kv.MatchStoreLabels:
		txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel))
	case kv.ResourceGroupTag:
		txn.KVTxn.SetResourceGroupTag(val.([]byte))
	case kv.ResourceGroupTagger:
		txn.KVTxn.SetResourceGroupTagger(val.(tikvrpc.ResourceGroupTagger))
	case kv.KVFilter:
		txn.KVTxn.SetKVFilter(val.(tikv.KVFilter))
	case kv.SnapInterceptor:
		txn.snapshotInterceptor = val.(kv.SnapshotInterceptor)
	case kv.CommitTSUpperBoundCheck:
		txn.KVTxn.SetCommitTSUpperBoundCheck(val.(func(commitTS uint64) bool))
	case kv.RPCInterceptor:
		txn.KVTxn.SetRPCInterceptor(val.(interceptor.RPCInterceptor))
	case kv.AssertionLevel:
		txn.KVTxn.SetAssertionLevel(val.(kvrpcpb.AssertionLevel))
	case kv.TableToColumnMaps:
		txn.columnMapsCache = val
	case kv.RequestSourceInternal:
		txn.KVTxn.SetRequestSourceInternal(val.(bool))
	case kv.RequestSourceType:
		txn.KVTxn.SetRequestSourceType(val.(string))
	case kv.ReplicaReadAdjuster:
		txn.KVTxn.GetSnapshot().SetReplicaReadAdjuster(val.(txnkv.ReplicaReadAdjuster))
	}
}

func (txn *tikvTxn) GetOption(opt int) interface{} {
	switch opt {
	case kv.GuaranteeLinearizability:
		return !txn.KVTxn.IsCasualConsistency()
	case kv.TxnScope:
		return txn.KVTxn.GetScope()
	case kv.TableToColumnMaps:
		return txn.columnMapsCache
	case kv.RequestSourceType:
		return txn.RequestSourceType
	default:
		return nil
	}
}

// SetVars sets variables to the transaction.
func (txn *tikvTxn) SetVars(vars interface{}) {
	if vs, ok := vars.(*tikv.Variables); ok {
		txn.KVTxn.SetVars(vs)
	}
}

func (txn *tikvTxn) GetVars() interface{} {
	return txn.KVTxn.GetVars()
}

func (txn *tikvTxn) extractKeyErr(err error) error {
	if e, ok := errors.Cause(err).(*tikverr.ErrKeyExist); ok {
		return txn.extractKeyExistsErr(e.GetKey())
	}
	return extractKeyErr(err)
}

func (txn *tikvTxn) extractKeyExistsErr(key kv.Key) error {
	tableID, indexID, isRecord, err := tablecodec.DecodeKeyHead(key)
	if err != nil {
		return genKeyExistsError("UNKNOWN", key.String(), err)
	}
	indexID = tablecodec.IndexIDMask & indexID

	tblInfo := txn.GetTableInfo(tableID)
	if tblInfo == nil {
		return genKeyExistsError("UNKNOWN", key.String(), errors.New("cannot find table info"))
	}
	value, err := txn.KVTxn.GetUnionStore().GetMemBuffer().SelectValueHistory(key, func(value []byte) bool { return len(value) != 0 })
	if err != nil {
		return genKeyExistsError("UNKNOWN", key.String(), err)
	}

	if isRecord {
		return extractKeyExistsErrFromHandle(key, value, tblInfo)
	}
	return extractKeyExistsErrFromIndex(key, value, tblInfo, indexID)
}

// SetAssertion sets an assertion for the key operation.
func (txn *tikvTxn) SetAssertion(key []byte, assertion ...kv.FlagsOp) error {
	f, err := txn.GetUnionStore().GetMemBuffer().GetFlags(key)
	if err != nil && !tikverr.IsErrNotFound(err) {
		return err
	}
	if err == nil && f.HasAssertionFlags() {
		return nil
	}
	txn.UpdateMemBufferFlags(key, assertion...)
	return nil
}

func (txn *tikvTxn) UpdateMemBufferFlags(key []byte, flags ...kv.FlagsOp) {
	txn.GetUnionStore().GetMemBuffer().UpdateFlags(key, getTiKVFlagsOps(flags)...)
}

// TiDBKVFilter is the filter specific to TiDB to filter out KV pairs that needn't be committed.
type TiDBKVFilter struct{}

// IsUnnecessaryKeyValue defines which kinds of KV pairs from TiDB needn't be committed.
func (f TiDBKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) (bool, error) {
	isUntouchedValue := tablecodec.IsUntouchedIndexKValue(key, value)
	if isUntouchedValue && flags.HasPresumeKeyNotExists() {
		logutil.BgLogger().Error("unexpected path the untouched key value with PresumeKeyNotExists flag",
			zap.Stringer("key", kv.Key(key)), zap.Stringer("value", kv.Key(value)),
			zap.Uint16("flags", uint16(flags)), zap.Stack("stack"))
		return false, errors.Errorf(
			"unexpected path the untouched key=%s value=%s contains PresumeKeyNotExists flag keyFlags=%v",
			kv.Key(key).String(), kv.Key(value).String(), flags)
	}
	return isUntouchedValue, nil
}

相关信息

tidb 源码目录

相关文章

tidb batch_getter 源码

tidb binlog 源码

tidb error 源码

tidb scanner 源码

tidb snapshot 源码

tidb union_iter 源码

tidb unionstore_driver 源码

0  赞