tidb backend_mgr 源码

  • 2022-09-19
  • 浏览 (506)

tidb backend_mgr 代码


// Copyright 2022 PingCAP, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//     http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ingest

import (


type backendCtxManager struct {
	generic.SyncMap[int64, *BackendContext]
	memRoot  MemRoot
	diskRoot DiskRoot

func (m *backendCtxManager) init(memRoot MemRoot, diskRoot DiskRoot) {
	m.SyncMap = generic.NewSyncMap[int64, *BackendContext](10)
	m.memRoot = memRoot
	m.diskRoot = diskRoot

// Register creates a new backend and registers it to the backend context.
func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int64, _ mysql.SQLMode) (*BackendContext, error) {
	bc, exist := m.Load(jobID)
	if !exist {
		ok := m.memRoot.CheckConsume(StructSizeBackendCtx)
		if !ok {
			return nil, genBackendAllocMemFailedErr(m.memRoot, jobID)
		cfg, err := generateLightningConfig(m.memRoot, jobID, unique)
		if err != nil {
			logutil.BgLogger().Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
			return nil, err
		bd, err := createLocalBackend(ctx, cfg, glueLit{})
		if err != nil {
			logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
			return nil, err

		bcCtx := newBackendContext(ctx, jobID, &bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot)
		m.Store(jobID, bcCtx)

		logutil.BgLogger().Info(LitInfoCreateBackend, zap.Int64("job ID", jobID),
			zap.Int64("current memory usage", m.memRoot.CurrentUsage()),
			zap.Int64("max memory quota", m.memRoot.MaxMemoryQuota()),
			zap.Bool("is unique index", unique))
		return bcCtx, nil
	return bc, nil

func createLocalBackend(ctx context.Context, cfg *config.Config, glue glue.Glue) (backend.Backend, error) {
	tls, err := cfg.ToTLS()
	if err != nil {
		logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Error(err))
		return backend.Backend{}, err

	errorMgr := errormanager.New(nil, cfg, log.Logger{Logger: logutil.BgLogger()})
	return local.NewLocalBackend(ctx, tls, cfg, glue, int(LitRLimit), errorMgr)

func newBackendContext(ctx context.Context, jobID int64, be *backend.Backend,
	cfg *config.Config, vars map[string]string, memRoot MemRoot, diskRoot DiskRoot) *BackendContext {
	bc := &BackendContext{
		jobID:    jobID,
		backend:  be,
		ctx:      ctx,
		cfg:      cfg,
		sysVars:  vars,
		diskRoot: diskRoot,
	bc.EngMgr.init(memRoot, diskRoot)
	return bc

// Unregister removes a backend context from the backend context manager.
func (m *backendCtxManager) Unregister(jobID int64) {
	bc, exist := m.Load(jobID)
	if !exist {
	logutil.BgLogger().Info(LitInfoCloseBackend, zap.Int64("job ID", jobID),
		zap.Int64("current memory usage", m.memRoot.CurrentUsage()),
		zap.Int64("max memory quota", m.memRoot.MaxMemoryQuota()))

// TotalDiskUsage returns the total disk usage of all backends.
func (m *backendCtxManager) TotalDiskUsage() uint64 {
	var totalDiskUsed uint64
	for _, key := range m.Keys() {
		bc, exists := m.Load(key)
		if exists {
			_, _, bcDiskUsed, _ := bc.backend.CheckDiskQuota(math.MaxInt64)
			totalDiskUsed += uint64(bcDiskUsed)
	return totalDiskUsed

// UpdateMemoryUsage collects the memory usages from all the backend and updates it to the memRoot.
func (m *backendCtxManager) UpdateMemoryUsage() {
	for _, key := range m.Keys() {
		bc, exists := m.Load(key)
		if exists {
			curSize := bc.backend.TotalMemoryConsume()
			m.memRoot.ConsumeWithTag(encodeBackendTag(bc.jobID), curSize)

// glueLit is used as a placeholder for the local backend initialization.
type glueLit struct{}

// OwnsSQLExecutor Implement interface OwnsSQLExecutor.
func (glueLit) OwnsSQLExecutor() bool {
	return false

// GetSQLExecutor Implement interface GetSQLExecutor.
func (glueLit) GetSQLExecutor() glue.SQLExecutor {
	return nil

// GetDB Implement interface GetDB.
func (glueLit) GetDB() (*sql.DB, error) {
	return nil, nil

// GetParser Implement interface GetParser.
func (glueLit) GetParser() *parser.Parser {
	return nil

// GetTables Implement interface GetTables.
func (glueLit) GetTables(context.Context, string) ([]*model.TableInfo, error) {
	return nil, nil

// GetSession Implement interface GetSession.
func (glueLit) GetSession(context.Context) (checkpoints.Session, error) {
	return nil, nil

// OpenCheckpointsDB Implement interface OpenCheckpointsDB.
func (glueLit) OpenCheckpointsDB(context.Context, *config.Config) (checkpoints.DB, error) {
	return nil, nil

// Record is used to report some information (key, value) to host TiDB, including progress, stage currently.
func (glueLit) Record(string, uint64) {

func encodeBackendTag(jobID int64) string {
	return fmt.Sprintf("%d", jobID)


tidb 源码目录


tidb backend 源码

tidb config 源码

tidb disk_root 源码

tidb engine 源码

tidb engine_mgr 源码

tidb env 源码

tidb mem_root 源码

tidb message 源码

0  赞