tidb parser 源码

  • 2022-09-19
  • 浏览 (302)

tidb parser 代码

文件路径:/br/pkg/lightning/mydump/parser.go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mydump

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"regexp"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/pingcap/errors"
	"github.com/pingcap/tidb/br/pkg/lightning/config"
	"github.com/pingcap/tidb/br/pkg/lightning/log"
	"github.com/pingcap/tidb/br/pkg/lightning/metric"
	"github.com/pingcap/tidb/br/pkg/lightning/worker"
	"github.com/pingcap/tidb/parser/mysql"
	"github.com/pingcap/tidb/types"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

type blockParser struct {
	// states for the lexer
	reader      PooledReader
	buf         []byte
	blockBuf    []byte
	isLastChunk bool

	// The list of column names of the last INSERT statement.
	columns []string

	rowPool *sync.Pool
	lastRow Row
	// Current file offset.
	pos int64

	// cache
	remainBuf *bytes.Buffer
	appendBuf *bytes.Buffer

	// the Logger associated with this parser for reporting failure
	Logger  log.Logger
	metrics *metric.Metrics
}

func makeBlockParser(
	reader ReadSeekCloser,
	blockBufSize int64,
	ioWorkers *worker.Pool,
	metrics *metric.Metrics,
	logger log.Logger,
) blockParser {
	return blockParser{
		reader:    MakePooledReader(reader, ioWorkers),
		blockBuf:  make([]byte, blockBufSize*config.BufferSizeScale),
		remainBuf: &bytes.Buffer{},
		appendBuf: &bytes.Buffer{},
		Logger:    logger,
		rowPool: &sync.Pool{
			New: func() interface{} {
				return make([]types.Datum, 0, 16)
			},
		},
		metrics: metrics,
	}
}

// ChunkParser is a parser of the data files (the file containing only INSERT
// statements).
type ChunkParser struct {
	blockParser

	escFlavor backslashEscapeFlavor
}

// Chunk represents a portion of the data file.
type Chunk struct {
	Offset       int64
	EndOffset    int64
	PrevRowIDMax int64
	RowIDMax     int64
	Columns      []string
}

// Row is the content of a row.
type Row struct {
	RowID  int64
	Row    []types.Datum
	Length int
}

// MarshalLogArray implements the zapcore.ArrayMarshaler interface
func (row Row) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
	for _, r := range row.Row {
		encoder.AppendString(r.String())
	}
	return nil
}

type backslashEscapeFlavor uint8

const (
	backslashEscapeFlavorNone backslashEscapeFlavor = iota
	backslashEscapeFlavorMySQL
	backslashEscapeFlavorMySQLWithNull
)

// Parser provides some methods to parse a source data file.
type Parser interface {
	Pos() (pos int64, rowID int64)
	SetPos(pos int64, rowID int64) error
	Close() error
	ReadRow() error
	LastRow() Row
	RecycleRow(row Row)

	// Columns returns the _lower-case_ column names corresponding to values in
	// the LastRow.
	Columns() []string
	// SetColumns set restored column names to parser
	SetColumns([]string)

	SetLogger(log.Logger)
}

// NewChunkParser creates a new parser which can read chunks out of a file.
func NewChunkParser(
	ctx context.Context,
	sqlMode mysql.SQLMode,
	reader ReadSeekCloser,
	blockBufSize int64,
	ioWorkers *worker.Pool,
) *ChunkParser {
	escFlavor := backslashEscapeFlavorMySQL
	if sqlMode.HasNoBackslashEscapesMode() {
		escFlavor = backslashEscapeFlavorNone
	}
	metrics, _ := metric.FromContext(ctx)
	return &ChunkParser{
		blockParser: makeBlockParser(reader, blockBufSize, ioWorkers, metrics, log.FromContext(ctx)),
		escFlavor:   escFlavor,
	}
}

// SetPos changes the reported position and row ID.
func (parser *blockParser) SetPos(pos int64, rowID int64) error {
	p, err := parser.reader.Seek(pos, io.SeekStart)
	if err != nil {
		return errors.Trace(err)
	}
	if p != pos {
		return errors.Errorf("set pos failed, required position: %d, got: %d", pos, p)
	}
	parser.pos = pos
	parser.lastRow.RowID = rowID
	return nil
}

// Pos returns the current file offset.
func (parser *blockParser) Pos() (pos int64, lastRowID int64) {
	return parser.pos, parser.lastRow.RowID
}

func (parser *blockParser) Close() error {
	return parser.reader.Close()
}

func (parser *blockParser) Columns() []string {
	return parser.columns
}

func (parser *blockParser) SetColumns(columns []string) {
	parser.columns = columns
}

func (parser *blockParser) logSyntaxError() {
	content := parser.buf
	if len(content) > 256 {
		content = content[:256]
	}
	parser.Logger.Error("syntax error",
		zap.Int64("pos", parser.pos),
		zap.ByteString("content", content),
	)
}

func (parser *blockParser) SetLogger(logger log.Logger) {
	parser.Logger = logger
}

type token byte

const (
	tokNil token = iota
	tokRowBegin
	tokRowEnd
	tokValues
	tokNull
	tokTrue
	tokFalse
	tokHexString
	tokBinString
	tokInteger
	tokSingleQuoted
	tokDoubleQuoted
	tokBackQuoted
	tokUnquoted
)

var tokenDescriptions = [...]string{
	tokNil:          "<Nil>",
	tokRowBegin:     "RowBegin",
	tokRowEnd:       "RowEnd",
	tokValues:       "Values",
	tokNull:         "Null",
	tokTrue:         "True",
	tokFalse:        "False",
	tokHexString:    "HexString",
	tokBinString:    "BinString",
	tokInteger:      "Integer",
	tokSingleQuoted: "SingleQuoted",
	tokDoubleQuoted: "DoubleQuoted",
	tokBackQuoted:   "BackQuoted",
	tokUnquoted:     "Unquoted",
}

// String implements the fmt.Stringer interface
//
// Mainly used for debugging a token.
func (tok token) String() string {
	t := int(tok)
	if t >= 0 && t < len(tokenDescriptions) {
		if description := tokenDescriptions[t]; description != "" {
			return description
		}
	}
	return fmt.Sprintf("<Unknown(%d)>", t)
}

func (parser *blockParser) readBlock() error {
	startTime := time.Now()

	n, err := parser.reader.ReadFull(parser.blockBuf)

	switch err {
	case io.ErrUnexpectedEOF, io.EOF:
		parser.isLastChunk = true
		fallthrough
	case nil:
		// `parser.buf` reference to `appendBuf.Bytes`, so should use remainBuf to
		// hold the `parser.buf` rest data to prevent slice overlap
		parser.remainBuf.Reset()
		parser.remainBuf.Write(parser.buf)
		parser.appendBuf.Reset()
		parser.appendBuf.Write(parser.remainBuf.Bytes())
		parser.appendBuf.Write(parser.blockBuf[:n])
		parser.buf = parser.appendBuf.Bytes()
		if parser.metrics != nil {
			parser.metrics.ChunkParserReadBlockSecondsHistogram.Observe(time.Since(startTime).Seconds())
		}
		return nil
	default:
		return errors.Trace(err)
	}
}

var unescapeRegexp = regexp.MustCompile(`(?s)\\.`)

func unescape(
	input string,
	delim string,
	escFlavor backslashEscapeFlavor,
) string {
	if len(delim) > 0 {
		delim2 := delim + delim
		if strings.Contains(input, delim2) {
			input = strings.ReplaceAll(input, delim2, delim)
		}
	}
	if escFlavor != backslashEscapeFlavorNone && strings.IndexByte(input, '\\') != -1 {
		input = unescapeRegexp.ReplaceAllStringFunc(input, func(substr string) string {
			switch substr[1] {
			case '0':
				return "\x00"
			case 'b':
				return "\b"
			case 'n':
				return "\n"
			case 'r':
				return "\r"
			case 't':
				return "\t"
			case 'Z':
				return "\x1a"
			default:
				return substr[1:]
			}
		})
	}
	return input
}

func (parser *ChunkParser) unescapeString(input string) string {
	if len(input) >= 2 {
		switch input[0] {
		case '\'', '"':
			return unescape(input[1:len(input)-1], input[:1], parser.escFlavor)
		case '`':
			return unescape(input[1:len(input)-1], "`", backslashEscapeFlavorNone)
		}
	}
	return input
}

// ReadRow reads a row from the datafile.
func (parser *ChunkParser) ReadRow() error {
	// This parser will recognize contents like:
	//
	// 		`tableName` (...) VALUES (...) (...) (...)
	//
	// Keywords like INSERT, INTO and separators like ',' and ';' are treated
	// like comments and ignored. Therefore, this parser will accept some
	// nonsense input. The advantage is the parser becomes extremely simple,
	// suitable for us where we just want to quickly and accurately split the
	// file apart, not to validate the content.

	type state byte

	const (
		// the state after "INSERT INTO" before the column names or "VALUES"
		stateTableName state = iota

		// the state while reading the column names
		stateColumns

		// the state after reading "VALUES"
		stateValues

		// the state while reading row values
		stateRow
	)

	// Dry-run sample of the state machine, first row:
	//
	//              Input         Token             State
	//              ~~~~~         ~~~~~             ~~~~~
	//
	//                                              stateValues
	//              INSERT
	//              INTO
	//              `tableName`   tokBackQuoted
	//                                              stateTableName (reset columns)
	//              (             tokRowBegin
	//                                              stateColumns
	//              `a`           tokBackQuoted
	//                                              stateColumns (append column)
	//              ,
	//              `b`           tokBackQuoted
	//                                              stateColumns (append column)
	//              )             tokRowEnd
	//                                              stateValues
	//              VALUES
	//                                              stateValues (no-op)
	//              (             tokRowBegin
	//                                              stateRow (reset row)
	//              1             tokInteger
	//                                              stateRow (append value)
	//              ,
	//              2             tokInteger
	//                                              stateRow (append value)
	//              )             tokRowEnd
	//                                              return
	//
	//
	// Second row:
	//
	//              Input         Token             State
	//              ~~~~~         ~~~~~             ~~~~~
	//
	//                                              stateValues
	//              ,
	//              (             tokRowBegin
	//                                              stateRow (reset row)
	//              3             tokInteger
	//                                              stateRow (append value)
	//              )             tokRowEnd
	//                                              return
	//
	// Third row:
	//
	//              Input         Token             State
	//              ~~~~~         ~~~~~             ~~~~~
	//
	//              ;
	//              INSERT
	//              INTO
	//              `database`    tokBackQuoted
	//                                              stateTableName (reset columns)
	//              .
	//              `tableName`   tokBackQuoted
	//                                              stateTableName (no-op)
	//              VALUES
	//                                              stateValues
	//              (             tokRowBegin
	//                                              stateRow (reset row)
	//              4             tokInteger
	//                                              stateRow (append value)
	//              )             tokRowEnd
	//                                              return

	row := &parser.lastRow
	st := stateValues
	row.Length = 0

	for {
		tok, content, err := parser.lex()
		if err != nil {
			if err == io.EOF && st != stateValues {
				return errors.Errorf("syntax error: premature EOF at offset %d", parser.pos)
			}
			return errors.Trace(err)
		}
		row.Length += len(content)
		switch st {
		case stateTableName:
			switch tok {
			case tokRowBegin:
				st = stateColumns
			case tokValues:
				st = stateValues
			case tokUnquoted, tokDoubleQuoted, tokBackQuoted:
			default:
				return errors.Errorf(
					"syntax error: unexpected %s (%s) at offset %d, expecting %s",
					tok, content, parser.pos, "table name",
				)
			}
		case stateColumns:
			switch tok {
			case tokRowEnd:
				st = stateValues
			case tokUnquoted, tokDoubleQuoted, tokBackQuoted:
				columnName := strings.ToLower(parser.unescapeString(string(content)))
				parser.columns = append(parser.columns, columnName)
			default:
				return errors.Errorf(
					"syntax error: unexpected %s (%s) at offset %d, expecting %s",
					tok, content, parser.pos, "column list",
				)
			}
		case stateValues:
			switch tok {
			case tokRowBegin:
				row.RowID++
				row.Row = parser.acquireDatumSlice()
				st = stateRow
			case tokUnquoted, tokDoubleQuoted, tokBackQuoted:
				parser.columns = nil
				st = stateTableName
			case tokValues:
			default:
				return errors.Errorf(
					"syntax error: unexpected %s (%s) at offset %d, expecting %s",
					tok, content, parser.pos, "start of row",
				)
			}
		case stateRow:
			var value types.Datum
			switch tok {
			case tokRowEnd:
				return nil
			case tokNull:
				value.SetNull()
			case tokTrue:
				value.SetInt64(1)
			case tokFalse:
				value.SetInt64(0)
			case tokInteger:
				c := string(content)
				if strings.HasPrefix(c, "-") {
					i, err := strconv.ParseInt(c, 10, 64)
					if err == nil {
						value.SetInt64(i)
						break
					}
				} else {
					u, err := strconv.ParseUint(c, 10, 64)
					if err == nil {
						value.SetUint64(u)
						break
					}
				}
				// if the integer is too long, fallback to treating it as a
				// string (all types that treats integer specially like BIT
				// can't handle integers more than 64 bits anyway)
				fallthrough
			case tokUnquoted, tokSingleQuoted, tokDoubleQuoted:
				value.SetString(parser.unescapeString(string(content)), "utf8mb4_bin")
			case tokHexString:
				hexLit, err := types.ParseHexStr(string(content))
				if err != nil {
					return errors.Trace(err)
				}
				value.SetBinaryLiteral(hexLit)
			case tokBinString:
				binLit, err := types.ParseBitStr(string(content))
				if err != nil {
					return errors.Trace(err)
				}
				value.SetBinaryLiteral(binLit)
			default:
				return errors.Errorf(
					"syntax error: unexpected %s (%s) at offset %d, expecting %s",
					tok, content, parser.pos, "data literal",
				)
			}
			row.Row = append(row.Row, value)
		}
	}
}

// LastRow is the copy of the row parsed by the last call to ReadRow().
func (parser *blockParser) LastRow() Row {
	return parser.lastRow
}

// RecycleRow places the row object back into the allocation pool.
func (parser *blockParser) RecycleRow(row Row) {
	// We need farther benchmarking to make sure whether send a pointer
	// (instead of a slice) here can improve performance.
	//nolint:staticcheck
	parser.rowPool.Put(row.Row[:0])
}

// acquireDatumSlice allocates an empty []types.Datum
func (parser *blockParser) acquireDatumSlice() []types.Datum {
	datum, ok := parser.rowPool.Get().([]types.Datum)
	if !ok {
		return []types.Datum{}
	}
	return datum
}

// ReadChunks parses the entire file and splits it into continuous chunks of
// size >= minSize.
func ReadChunks(parser Parser, minSize int64) ([]Chunk, error) {
	var chunks []Chunk

	pos, lastRowID := parser.Pos()
	cur := Chunk{
		Offset:       pos,
		EndOffset:    pos,
		PrevRowIDMax: lastRowID,
		RowIDMax:     lastRowID,
	}

	for {
		switch err := parser.ReadRow(); errors.Cause(err) {
		case nil:
			cur.EndOffset, cur.RowIDMax = parser.Pos()
			if cur.EndOffset-cur.Offset >= minSize {
				chunks = append(chunks, cur)
				cur.Offset = cur.EndOffset
				cur.PrevRowIDMax = cur.RowIDMax
			}

		case io.EOF:
			if cur.Offset < cur.EndOffset {
				chunks = append(chunks, cur)
			}
			return chunks, nil

		default:
			return nil, errors.Trace(err)
		}
	}
}

相关信息

tidb 源码目录

相关文章

tidb bytes 源码

tidb charset_convertor 源码

tidb csv_parser 源码

tidb loader 源码

tidb parquet_parser 源码

tidb parser_generated 源码

tidb reader 源码

tidb region 源码

tidb router 源码

0  赞