tidb router 源码

  • 2022-09-19
  • 浏览 (283)

tidb router 代码

文件路径:/util/table-router/router.go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package router

import (
	"fmt"
	"regexp"
	"strings"

	"github.com/pingcap/errors"
	selector "github.com/pingcap/tidb/util/table-rule-selector"
)

// TableRule is a rule to route schema/table to target schema/table
// pattern format refers 'pkg/table-rule-selector'
type TableRule struct {
	TableExtractor  *TableExtractor  `json:"extract-table,omitempty" toml:"extract-table,omitempty" yaml:"extract-table,omitempty"`
	SchemaExtractor *SchemaExtractor `json:"extract-schema,omitempty" toml:"extract-schema,omitempty" yaml:"extract-schema,omitempty"`
	SourceExtractor *SourceExtractor `json:"extract-source,omitempty" toml:"extract-source,omitempty" yaml:"extract-source,omitempty"`
	SchemaPattern   string           `json:"schema-pattern" toml:"schema-pattern" yaml:"schema-pattern"`
	TablePattern    string           `json:"table-pattern" toml:"table-pattern" yaml:"table-pattern"`
	TargetSchema    string           `json:"target-schema" toml:"target-schema" yaml:"target-schema"`
	TargetTable     string           `json:"target-table" toml:"target-table" yaml:"target-table"`
}

// TableExtractor extracts table name to column
type TableExtractor struct {
	regexp       *regexp.Regexp
	TargetColumn string `json:"target-column" toml:"target-column" yaml:"target-column"`
	TableRegexp  string `json:"table-regexp" toml:"table-regexp" yaml:"table-regexp"`
}

// SchemaExtractor extracts schema name to column
type SchemaExtractor struct {
	regexp       *regexp.Regexp
	TargetColumn string `json:"target-column" toml:"target-column" yaml:"target-column"`
	SchemaRegexp string `json:"schema-regexp" toml:"schema-regexp" yaml:"schema-regexp"`
}

// SourceExtractor extracts source name to column
type SourceExtractor struct {
	regexp       *regexp.Regexp
	TargetColumn string `json:"target-column" toml:"target-column" yaml:"target-column"`
	SourceRegexp string `json:"source-regexp" toml:"source-regexp" yaml:"source-regexp"`
}

// Valid checks validity of rule
func (t *TableRule) Valid() error {
	if len(t.SchemaPattern) == 0 {
		return errors.New("schema pattern of table route rule should not be empty")
	}

	if len(t.TargetSchema) == 0 {
		return errors.New("target schema of table route rule should not be empty")
	}

	if t.TableExtractor != nil {
		tableRe := t.TableExtractor.TableRegexp
		re, err := regexp.Compile(tableRe)
		if err != nil {
			return fmt.Errorf("table extractor table regexp illegal %s", tableRe)
		}
		if len(t.TableExtractor.TargetColumn) == 0 {
			return errors.New("table extractor target column cannot be empty")
		}
		t.TableExtractor.regexp = re
	}
	if t.SchemaExtractor != nil {
		schemaRe := t.SchemaExtractor.SchemaRegexp
		re, err := regexp.Compile(schemaRe)
		if err != nil {
			return fmt.Errorf("schema extractor schema regexp illegal %s", schemaRe)
		}
		if len(t.SchemaExtractor.TargetColumn) == 0 {
			return errors.New("schema extractor target column cannot be empty")
		}
		t.SchemaExtractor.regexp = re
	}
	if t.SourceExtractor != nil {
		sourceRe := t.SourceExtractor.SourceRegexp
		re, err := regexp.Compile(sourceRe)
		if err != nil {
			return fmt.Errorf("source extractor source regexp illegal %s", sourceRe)
		}
		if len(t.SourceExtractor.TargetColumn) == 0 {
			return errors.New("source extractor target column cannot be empty")
		}
		t.SourceExtractor.regexp = re
	}
	return nil
}

// ToLower covert schema/table parttern to lower case
func (t *TableRule) ToLower() {
	t.SchemaPattern = strings.ToLower(t.SchemaPattern)
	t.TablePattern = strings.ToLower(t.TablePattern)
}

// Table routes schema/table to target schema/table by given route rules
type Table struct {
	selector.Selector

	caseSensitive bool
}

// NewTableRouter returns a table router
func NewTableRouter(caseSensitive bool, rules []*TableRule) (*Table, error) {
	r := &Table{
		Selector:      selector.NewTrieSelector(),
		caseSensitive: caseSensitive,
	}

	for _, rule := range rules {
		if err := r.AddRule(rule); err != nil {
			return nil, errors.Annotatef(err, "initial rule %+v in table router", rule)
		}
	}

	return r, nil
}

// AddRule adds a rule into table router
func (r *Table) AddRule(rule *TableRule) error {
	err := rule.Valid()
	if err != nil {
		return errors.Trace(err)
	}
	if !r.caseSensitive {
		rule.ToLower()
	}

	err = r.Insert(rule.SchemaPattern, rule.TablePattern, rule, selector.Insert)
	if err != nil {
		return errors.Annotatef(err, "add rule %+v into table router", rule)
	}

	return nil
}

// UpdateRule updates rule
func (r *Table) UpdateRule(rule *TableRule) error {
	err := rule.Valid()
	if err != nil {
		return errors.Trace(err)
	}
	if !r.caseSensitive {
		rule.ToLower()
	}

	err = r.Insert(rule.SchemaPattern, rule.TablePattern, rule, selector.Replace)
	if err != nil {
		return errors.Annotatef(err, "update rule %+v into table router", rule)
	}

	return nil
}

// RemoveRule removes a rule from table router
func (r *Table) RemoveRule(rule *TableRule) error {
	if !r.caseSensitive {
		rule.ToLower()
	}

	err := r.Remove(rule.SchemaPattern, rule.TablePattern)
	if err != nil {
		return errors.Annotatef(err, "remove rule %+v from table router", rule)
	}

	return nil
}

// Route routes schema/table to target schema/table
// don't support to route schema/table to multiple schema/table
func (r *Table) Route(schema, table string) (targetSchema string, targetTable string, err error) {
	schemaL, tableL := schema, table
	if !r.caseSensitive {
		schemaL, tableL = strings.ToLower(schema), strings.ToLower(table)
	}

	rules := r.Match(schemaL, tableL)
	var (
		schemaRules = make([]*TableRule, 0, len(rules))
		tableRules  = make([]*TableRule, 0, len(rules))
	)
	// classify rules into schema level rules and table level
	// table level rules have highest priority
	for i := range rules {
		rule, ok := rules[i].(*TableRule)
		if !ok {
			return "", "", errors.NotValidf("table route rule %+v", rules[i])
		}

		if len(rule.TablePattern) == 0 {
			schemaRules = append(schemaRules, rule)
		} else {
			tableRules = append(tableRules, rule)
		}
	}

	if len(table) == 0 || len(tableRules) == 0 {
		if len(schemaRules) > 1 {
			return "", "", errors.NotSupportedf("`%s`.`%s` matches %d schema route rules which is more than one.\nThe first two rules are %+v, %+v.\nIt's", schema, table, len(schemaRules), schemaRules[0], schemaRules[1])
		}

		if len(schemaRules) == 1 {
			targetSchema, targetTable = schemaRules[0].TargetSchema, schemaRules[0].TargetTable
		}
	} else {
		if len(tableRules) > 1 {
			return "", "", errors.NotSupportedf("`%s`.`%s` matches %d table route rules which is more than one.\nThe first two rules are %+v, %+v.\nIt's", schema, table, len(tableRules), tableRules[0], tableRules[1])
		}

		targetSchema, targetTable = tableRules[0].TargetSchema, tableRules[0].TargetTable
	}

	if len(targetSchema) == 0 {
		targetSchema = schema
	}

	if len(targetTable) == 0 {
		targetTable = table
	}

	return targetSchema, targetTable, nil
}

// ExtractVal match value via regexp
func (*TableRule) extractVal(s string, ext interface{}) string {
	var params []string
	switch e := ext.(type) {
	case *TableExtractor:
		params = e.regexp.FindStringSubmatch(s)
	case *SchemaExtractor:
		params = e.regexp.FindStringSubmatch(s)
	case *SourceExtractor:
		params = e.regexp.FindStringSubmatch(s)
	}
	var val strings.Builder
	for idx, param := range params {
		if idx > 0 {
			val.WriteString(param)
		}
	}
	return val.String()
}

// FetchExtendColumn get extract rule, return extracted cols and extracted vals.
func (r *Table) FetchExtendColumn(schema, table, source string) ([]string, []string) {
	var cols []string
	var vals []string
	rules := r.Match(schema, table)
	var (
		schemaRules = make([]*TableRule, 0, len(rules))
		tableRules  = make([]*TableRule, 0, len(rules))
	)
	for i := range rules {
		rule, ok := rules[i].(*TableRule)
		if !ok {
			return cols, vals
		}
		if len(rule.TablePattern) == 0 {
			schemaRules = append(schemaRules, rule)
		} else {
			tableRules = append(tableRules, rule)
		}
	}
	if len(tableRules) == 0 && len(schemaRules) == 0 {
		return cols, vals
	}
	var rule *TableRule
	// table level rules have highest priority
	if len(tableRules) == 0 {
		rule = schemaRules[0]
	} else {
		rule = tableRules[0]
	}

	if rule.TableExtractor != nil {
		cols = append(cols, rule.TableExtractor.TargetColumn)
		vals = append(vals, rule.extractVal(table, rule.TableExtractor))
	}

	if rule.SchemaExtractor != nil {
		cols = append(cols, rule.SchemaExtractor.TargetColumn)
		vals = append(vals, rule.extractVal(schema, rule.SchemaExtractor))
	}

	if rule.SourceExtractor != nil {
		cols = append(cols, rule.SourceExtractor.TargetColumn)
		vals = append(vals, rule.extractVal(source, rule.SourceExtractor))
	}
	return cols, vals
}

相关信息

tidb 源码目录

相关文章

tidb bind_cache 源码

tidb bind_record 源码

tidb handle 源码

tidb session_handle 源码

tidb stat 源码

tidb backup 源码

tidb cmd 源码

tidb debug 源码

tidb main 源码

tidb restore 源码

0  赞