tidb router 源码

  • 2022-09-19
  • 浏览 (349)

tidb router 代码

文件路径:/br/pkg/lightning/mydump/router.go

package mydump

import (
	"net/url"
	"regexp"
	"strconv"
	"strings"

	"github.com/pingcap/errors"
	"github.com/pingcap/tidb/br/pkg/lightning/config"
	"github.com/pingcap/tidb/br/pkg/lightning/log"
	"github.com/pingcap/tidb/util/filter"
	"github.com/pingcap/tidb/util/slice"
	"go.uber.org/zap"
)

// SourceType specifies the source file types.
type SourceType int

const (
	// SourceTypeIgnore means this source file is ignored.
	SourceTypeIgnore SourceType = iota
	// SourceTypeSchemaSchema means this source file is a schema file for the DB.
	SourceTypeSchemaSchema
	// SourceTypeTableSchema means this source file is a schema file for the table.
	SourceTypeTableSchema
	// SourceTypeSQL means this source file is a SQL data file.
	SourceTypeSQL
	// SourceTypeCSV means this source file is a CSV data file.
	SourceTypeCSV
	// SourceTypeParquet means this source file is a parquet data file.
	SourceTypeParquet
	// SourceTypeViewSchema means this source file is a schema file for the view.
	SourceTypeViewSchema
)

const (
	// SchemaSchema is the source type value for schema file for DB.
	SchemaSchema = "schema-schema"
	// TableSchema is the source type value for schema file for table.
	TableSchema = "table-schema"
	// ViewSchema is the source type value for schema file for view.
	ViewSchema = "view-schema"
	// TypeSQL is the source type value for sql data file.
	TypeSQL = "sql"
	// TypeCSV is the source type value for csv data file.
	TypeCSV = "csv"
	// TypeParquet is the source type value for parquet data file.
	TypeParquet = "parquet"
	// TypeIgnore is the source type value for a ignored data file.
	TypeIgnore = "ignore"
)

// Compression specifies the compression type.
type Compression int

const (
	// CompressionNone is the compression type that with no compression.
	CompressionNone Compression = iota
	// CompressionGZ is the compression type that uses GZ algorithm.
	CompressionGZ
	// CompressionLZ4 is the compression type that uses LZ4 algorithm.
	CompressionLZ4
	// CompressionZStd is the compression type that uses ZStd algorithm.
	CompressionZStd
	// CompressionXZ is the compression type that uses XZ algorithm.
	CompressionXZ
)

func parseSourceType(t string) (SourceType, error) {
	switch strings.ToLower(strings.TrimSpace(t)) {
	case SchemaSchema:
		return SourceTypeSchemaSchema, nil
	case TableSchema:
		return SourceTypeTableSchema, nil
	case TypeSQL:
		return SourceTypeSQL, nil
	case TypeCSV:
		return SourceTypeCSV, nil
	case TypeParquet:
		return SourceTypeParquet, nil
	case TypeIgnore:
		return SourceTypeIgnore, nil
	case ViewSchema:
		return SourceTypeViewSchema, nil
	default:
		return SourceTypeIgnore, errors.Errorf("unknown source type '%s'", t)
	}
}

func (s SourceType) String() string {
	switch s {
	case SourceTypeSchemaSchema:
		return SchemaSchema
	case SourceTypeTableSchema:
		return TableSchema
	case SourceTypeCSV:
		return TypeCSV
	case SourceTypeSQL:
		return TypeSQL
	case SourceTypeParquet:
		return TypeParquet
	case SourceTypeViewSchema:
		return ViewSchema
	default:
		return TypeIgnore
	}
}

func parseCompressionType(t string) (Compression, error) {
	switch strings.ToLower(strings.TrimSpace(t)) {
	case "gz":
		return CompressionGZ, nil
	case "lz4":
		return CompressionLZ4, nil
	case "zstd":
		return CompressionZStd, nil
	case "xz":
		return CompressionXZ, nil
	case "":
		return CompressionNone, nil
	default:
		return CompressionNone, errors.Errorf("invalid compression type '%s'", t)
	}
}

var expandVariablePattern = regexp.MustCompile(`\$(?:\$|[\pL\p{Nd}_]+|\{[\pL\p{Nd}_]+\})`)

var defaultFileRouteRules = []*config.FileRouteRule{
	// ignore *-schema-trigger.sql, *-schema-post.sql files
	{Pattern: `(?i).*(-schema-trigger|-schema-post)\.sql$`, Type: "ignore"},
	// db schema create file pattern, matches files like '{schema}-schema-create.sql'
	{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql$`, Schema: "$1", Table: "", Type: SchemaSchema, Unescape: true},
	// table schema create file pattern, matches files like '{schema}.{table}-schema.sql'
	{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql$`, Schema: "$1", Table: "$2", Type: TableSchema, Unescape: true},
	// view schema create file pattern, matches files like '{schema}.{table}-schema-view.sql'
	{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema-view\.sql$`, Schema: "$1", Table: "$2", Type: ViewSchema, Unescape: true},
	// source file pattern, matches files like '{schema}.{table}.0001.{sql|csv}'
	{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)$`, Schema: "$1", Table: "$2", Type: "$4", Key: "$3", Unescape: true},
}

// FileRouter provides some operations to apply a rule to route file path to target schema/table
type FileRouter interface {
	// Route apply rule to path. Return nil if path doesn't match route rule;
	// return error if path match route rule but the captured value for field is invalid
	Route(path string) (*RouteResult, error)
}

// chainRouters aggregates multi `FileRouter` as a router
type chainRouters []FileRouter

func (c chainRouters) Route(path string) (*RouteResult, error) {
	for _, r := range c {
		res, err := r.Route(path)
		if err != nil {
			return nil, err
		}
		if res != nil {
			return res, nil
		}
	}
	return nil, nil
}

// NewFileRouter creates a new file router with the rule.
func NewFileRouter(cfg []*config.FileRouteRule, logger log.Logger) (FileRouter, error) {
	res := make([]FileRouter, 0, len(cfg))
	p := regexRouterParser{}
	for _, c := range cfg {
		rule, err := p.Parse(c, logger)
		if err != nil {
			return nil, err
		}
		res = append(res, rule)
	}
	return chainRouters(res), nil
}

// RegexRouter is a `FileRouter` implement that apply specific regex pattern to filepath.
// if regex pattern match, then each extractors with capture the matched regexp pattern and
// set value to target field in `RouteResult`
type RegexRouter struct {
	pattern    *regexp.Regexp
	extractors []patExpander
}

// Route routes a file path to a source file type.
func (r *RegexRouter) Route(path string) (*RouteResult, error) {
	indexes := r.pattern.FindStringSubmatchIndex(path)
	if len(indexes) == 0 {
		return nil, nil
	}
	result := &RouteResult{}
	for _, e := range r.extractors {
		err := e.Expand(r.pattern, path, indexes, result)
		if err != nil {
			return nil, err
		}
	}
	return result, nil
}

type regexRouterParser struct{}

func (p regexRouterParser) Parse(r *config.FileRouteRule, logger log.Logger) (*RegexRouter, error) {
	rule := &RegexRouter{}
	if r.Path == "" && r.Pattern == "" {
		return nil, errors.New("`path` and `pattern` must not be both empty in [[mydumper.files]]")
	}
	if r.Path != "" && r.Pattern != "" {
		return nil, errors.New("can't set both `path` and `pattern` field in [[mydumper.files]]")
	}
	if r.Path != "" {
		// convert constant string as a regexp pattern
		r.Pattern = regexp.QuoteMeta(r.Path)
		// escape all '$' by '$$' in match templates
		quoteTmplFn := func(t string) string { return strings.ReplaceAll(t, "$", "$$") }
		r.Table = quoteTmplFn(r.Table)
		r.Schema = quoteTmplFn(r.Schema)
		r.Type = quoteTmplFn(r.Type)
		r.Compression = quoteTmplFn(r.Compression)
		r.Key = quoteTmplFn(r.Key)
	}
	pattern, err := regexp.Compile(r.Pattern)
	if err != nil {
		return nil, errors.Trace(err)
	}
	rule.pattern = pattern

	err = p.parseFieldExtractor(rule, "type", r.Type, func(result *RouteResult, value string) error {
		ty, err := parseSourceType(value)
		if err != nil {
			return err
		}
		result.Type = ty
		return nil
	})
	if err != nil {
		return nil, err
	}
	// ignore pattern needn't parse other fields
	if r.Type == TypeIgnore {
		return rule, nil
	}

	setValue := func(target *string, value string, unescape bool) {
		if unescape {
			val, err := url.PathUnescape(value)
			if err != nil {
				logger.Warn("unescape string failed, will be ignored", zap.String("value", value),
					zap.Error(err))
			} else {
				value = val
			}
		}
		*target = value
	}

	err = p.parseFieldExtractor(rule, "schema", r.Schema, func(result *RouteResult, value string) error {
		setValue(&result.Schema, value, r.Unescape)
		return nil
	})
	if err != nil {
		return nil, err
	}

	// special case: when the pattern is for db schema, should not parse table name
	if r.Type != SchemaSchema {
		err = p.parseFieldExtractor(rule, "table", r.Table, func(result *RouteResult, value string) error {
			setValue(&result.Name, value, r.Unescape)
			return nil
		})
		if err != nil {
			return nil, err
		}
	}

	if len(r.Key) > 0 {
		err = p.parseFieldExtractor(rule, "key", r.Key, func(result *RouteResult, value string) error {
			result.Key = value
			return nil
		})
		if err != nil {
			return nil, err
		}
	}

	if len(r.Compression) > 0 {
		err = p.parseFieldExtractor(rule, "compression", r.Compression, func(result *RouteResult, value string) error {
			// TODO: should support restore compressed source files
			compression, err := parseCompressionType(value)
			if err != nil {
				return err
			}
			if compression != CompressionNone {
				return errors.New("Currently we don't support restore compressed source file yet")
			}
			result.Compression = compression
			return nil
		})
		if err != nil {
			return nil, err
		}
	}

	return rule, nil
}

// parse each field extractor in `p.r` and set them to p.rule
func (p regexRouterParser) parseFieldExtractor(
	rule *RegexRouter,
	field,
	fieldPattern string,
	applyFn func(result *RouteResult, value string) error,
) error {
	// pattern is empty, return default rule
	if len(fieldPattern) == 0 {
		return errors.Errorf("field '%s' match pattern can't be empty", field)
	}

	// check and parse regexp template
	if err := p.checkSubPatterns(rule.pattern, fieldPattern); err != nil {
		return errors.Trace(err)
	}
	rule.extractors = append(rule.extractors, patExpander{
		template: fieldPattern,
		applyFn:  applyFn,
	})
	return nil
}

func (regexRouterParser) checkSubPatterns(pat *regexp.Regexp, t string) error {
	subPats := expandVariablePattern.FindAllString(t, -1)
	for _, subVar := range subPats {
		var tmplName string
		switch {
		case subVar == "$$":
			continue
		case strings.HasPrefix(subVar, "${"):
			tmplName = subVar[2 : len(subVar)-1]
		default:
			tmplName = subVar[1:]
		}
		if number, err := strconv.Atoi(tmplName); err == nil {
			if number > pat.NumSubexp() {
				return errors.Errorf("sub pattern capture '%s' out of range", subVar)
			}
		} else if !slice.AnyOf(pat.SubexpNames(), func(i int) bool {
			// FIXME: we should use re.SubexpIndex here, but not supported in go1.13 yet
			return pat.SubexpNames()[i] == tmplName
		}) {
			return errors.Errorf("invalid named capture '%s'", subVar)
		}
	}

	return nil
}

// patExpander extract string by expanding template with the regexp pattern
type patExpander struct {
	template string
	applyFn  func(result *RouteResult, value string) error
}

func (p *patExpander) Expand(pattern *regexp.Regexp, path string, matchIndex []int, result *RouteResult) error {
	value := pattern.ExpandString([]byte{}, p.template, path, matchIndex)
	return p.applyFn(result, string(value))
}

// RouteResult contains the information for a file routing.
type RouteResult struct {
	filter.Table
	Key         string
	Compression Compression
	Type        SourceType
}

相关信息

tidb 源码目录

相关文章

tidb bytes 源码

tidb charset_convertor 源码

tidb csv_parser 源码

tidb loader 源码

tidb parquet_parser 源码

tidb parser 源码

tidb parser_generated 源码

tidb reader 源码

tidb region 源码

0  赞