greenplumn cdbappendonlystorageread 源码

  • 2022-08-18
  • 浏览 (270)

greenplumn cdbappendonlystorageread 代码

文件路径:/src/backend/cdb/cdbappendonlystorageread.c

/*-------------------------------------------------------------------------
 *
 * cdbappendonlystorageread.c
 *
 * Portions Copyright (c) 2007-2009, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/cdbappendonlystorageread.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#ifndef WIN32
#include <sys/fcntl.h>
#else
#include <io.h>
#endif
#include <sys/file.h>
#include <unistd.h>

#include "catalog/pg_compression.h"
#include "cdb/cdbappendonlystorage.h"
#include "cdb/cdbappendonlystoragelayer.h"
#include "cdb/cdbappendonlystorageformat.h"
#include "cdb/cdbappendonlystorageread.h"
#include "storage/gp_compress.h"
#include "utils/guc.h"


/*----------------------------------------------------------------
 * Initialization
 *----------------------------------------------------------------
 */

/*
 * Initialize AppendOnlyStorageRead.
 *
 * The AppendOnlyStorageRead data structure is initialized once for a read
 * "session" and can be used to read Append-Only Storage Blocks from 1 or
 * more segment files.
 *
 * The current file to read to is opened with the
 * AppendOnlyStorageRead_OpenFile routine.
 *
 * storageRead		- data structure to initialize
 * memoryContext	- memory context to use for buffers and other memory
 *					  needs. When NULL, the current memory context is used.
 * maxBufferLen		- maximum Append-Only Storage Block length including all
 *					  storage headers.
 * relationName		- name of the relation to use in system logging and
 *					  error messages.
 * title			- A phrase that better describes the purpose of this open.
 *					  The caller manages the storage for this.
 * storageAttributes - Append-Only Storage Attributes from relation creation.
 */
void
AppendOnlyStorageRead_Init(AppendOnlyStorageRead *storageRead,
						   MemoryContext memoryContext,
						   int32 maxBufferLen,
						   char *relationName,
						   char *title,
						   AppendOnlyStorageAttributes *storageAttributes)
{
	uint8	   *memory;
	int32		memoryLen;
	MemoryContext oldMemoryContext;

	Assert(storageRead != NULL);

	/* UNDONE: Range check maxBufferLen */

	Assert(relationName != NULL);
	Assert(storageAttributes != NULL);

	/* UNDONE: Range check fields in storageAttributes */

	MemSet(storageRead, 0, sizeof(AppendOnlyStorageRead));

	storageRead->maxBufferLen = maxBufferLen;

	if (memoryContext == NULL)
		storageRead->memoryContext = CurrentMemoryContext;
	else
		storageRead->memoryContext = memoryContext;

	oldMemoryContext = MemoryContextSwitchTo(storageRead->memoryContext);

	memcpy(&storageRead->storageAttributes,
		   storageAttributes,
		   sizeof(AppendOnlyStorageAttributes));

	storageRead->relationName = pstrdup(relationName);
	storageRead->title = title;

	storageRead->minimumHeaderLen =
		AppendOnlyStorageFormat_RegularHeaderLenNeeded(
													   storageRead->storageAttributes.checksum);

	/*
	 * Initialize BufferedRead.
	 */
	storageRead->largeReadLen = 2 * storageRead->maxBufferLen;

	memoryLen = BufferedReadMemoryLen(storageRead->maxBufferLen,
									  storageRead->largeReadLen);

	Assert(CurrentMemoryContext == storageRead->memoryContext);
	memory = (uint8 *) palloc(memoryLen);

	BufferedReadInit(&storageRead->bufferedRead,
					 memory,
					 memoryLen,
					 storageRead->maxBufferLen,
					 storageRead->largeReadLen,
					 relationName);

	elogif(Debug_appendonly_print_scan || Debug_appendonly_print_read_block, LOG,
		   "Append-Only Storage Read initialize for table '%s' "
		   "(compression = %s, compression level %d, maximum buffer length %d, large read length %d)",
		   storageRead->relationName,
		   (storageRead->storageAttributes.compress ? "true" : "false"),
		   storageRead->storageAttributes.compressLevel,
		   storageRead->maxBufferLen,
		   storageRead->largeReadLen);

	storageRead->file = -1;
	storageRead->formatVersion = -1;

	MemoryContextSwitchTo(oldMemoryContext);

	storageRead->isActive = true;
}

/*
 * Return (read-only) pointer to relation name.
 */
char *
AppendOnlyStorageRead_RelationName(AppendOnlyStorageRead *storageRead)
{
	Assert(storageRead != NULL);
	Assert(storageRead->isActive);

	return storageRead->relationName;
}

/*
 * Return (read-only) pointer to relation name.
 */
char *
AppendOnlyStorageRead_SegmentFileName(AppendOnlyStorageRead *storageRead)
{
	Assert(storageRead != NULL);
	Assert(storageRead->isActive);

	return storageRead->segmentFileName;
}

/*
 * Finish using the AppendOnlyStorageRead session created with ~Init.
 */
void
AppendOnlyStorageRead_FinishSession(AppendOnlyStorageRead *storageRead)
{
	MemoryContext oldMemoryContext;

	if (!storageRead->isActive)
		return;

	oldMemoryContext = MemoryContextSwitchTo(storageRead->memoryContext);

	/*
	 * UNDONE: This expects the MemoryContext to be what was used for the
	 * 'memory' in ~Init
	 */
	BufferedReadFinish(&storageRead->bufferedRead);

	if (storageRead->relationName != NULL)
	{
		pfree(storageRead->relationName);
		storageRead->relationName = NULL;
	}

	if (storageRead->segmentFileName != NULL)
	{
		pfree(storageRead->segmentFileName);
		storageRead->segmentFileName = NULL;
	}

	if (storageRead->compression_functions != NULL)
	{
		callCompressionDestructor(storageRead->compression_functions[COMPRESSION_DESTRUCTOR], storageRead->compressionState);
		if (storageRead->compressionState != NULL)
		{
			pfree(storageRead->compressionState);
			storageRead->compressionState = NULL;
		}
		pfree(storageRead->compression_functions);
		storageRead->compression_functions = NULL;
	}

	/* Deallocation is done.      Go back to caller memory-context. */
	MemoryContextSwitchTo(oldMemoryContext);

	storageRead->isActive = false;

}

/*----------------------------------------------------------------
 * Open and Close
 *----------------------------------------------------------------
 */

/*
 * Do open the next segment file to read, but don't do error processing.
 *
 * This routine is responsible for seeking to the proper location given
 * the logical EOF.
 *
 * filePathName - name of the segment file to open.
 */
static File
AppendOnlyStorageRead_DoOpenFile(AppendOnlyStorageRead *storageRead,
								 char *filePathName)
{
	int			fileFlags = O_RDONLY | PG_BINARY;
	File		file;

	Assert(storageRead != NULL);
	Assert(storageRead->isActive);
	Assert(filePathName != NULL);

	elogif(Debug_appendonly_print_read_block, LOG,
		   "Append-Only storage read: opening table '%s', segment file '%s', fileFlags 0x%x",
		   storageRead->relationName,
		   filePathName,
		   fileFlags);

	/*
	 * Open the file for read.
	 */
	file = PathNameOpenFile(filePathName, fileFlags);

	return file;
}

/*
 * Finish the open by positioning the next read and saving information.
 *
 * file			- The open file.
 * filePathName - name of the segment file to open.
 * version		- AO table format version the file is in.
 * logicalEof	- snapshot version of the EOF value to use as the read end
 *				  of the segment file.
 */
static void
AppendOnlyStorageRead_FinishOpenFile(AppendOnlyStorageRead *storageRead,
									 File file,
									 char *filePathName,
									 int version,
									 int64 logicalEof)
{
	MemoryContext oldMemoryContext;

	AORelationVersion_CheckValid(version);

	storageRead->file = file;
	storageRead->formatVersion = version;

	/*
	 * When reading multiple segment files, we throw away the old segment file
	 * name strings.
	 */
	oldMemoryContext = MemoryContextSwitchTo(storageRead->memoryContext);

	if (storageRead->segmentFileName != NULL)
		pfree(storageRead->segmentFileName);

	storageRead->segmentFileName = pstrdup(filePathName);

	/* Allocation is done.  Go back to caller memory-context. */
	MemoryContextSwitchTo(oldMemoryContext);

	storageRead->logicalEof = logicalEof;

	BufferedReadSetFile(&storageRead->bufferedRead,
						storageRead->file,
						storageRead->segmentFileName,
						logicalEof);
}

/*
 * Open the next segment file to read.
 *
 * This routine is responsible for seeking to the proper
 * read location given the logical EOF.
 *
 * filePathName - name of the segment file to open.
 * version		- AO table format version the file is in.
 * logicalEof	- snapshot version of the EOF value to use as the read end
 *				  of the segment file.
 */
void
AppendOnlyStorageRead_OpenFile(AppendOnlyStorageRead *storageRead,
							   char *filePathName,
							   int version,
							   int64 logicalEof)
{
	File		file;

	Assert(storageRead != NULL);
	Assert(storageRead->isActive);
	Assert(filePathName != NULL);

	/*
	 * The EOF must be greater than 0, otherwise we risk transactionally
	 * created segment files from disappearing if a concurrent write
	 * transaction aborts.
	 */
	if (logicalEof == 0)
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("append-only storage read segment file '%s' EOF must be > 0 for relation '%s'",
						filePathName,
						storageRead->relationName)));

	file = AppendOnlyStorageRead_DoOpenFile(storageRead,
											filePathName);
	if (file < 0)
	{
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("append-Only storage read could not open segment file '%s' for relation '%s'",
						filePathName,
						storageRead->relationName)));
	}

	AppendOnlyStorageRead_FinishOpenFile(storageRead,
										 file,
										 filePathName,
										 version,
										 logicalEof);
}

/*
 * Try opening the next segment file to read.
 *
 * This routine is responsible for seeking to the proper read location given
 * the logical EOF.
 *
 * filePathName - name of the segment file to open
 * version		- AO table format version the file is in.
 * logicalEof	- snapshot version of the EOF value to use as the read end of
 *				  the segment file.
 */
bool
AppendOnlyStorageRead_TryOpenFile(AppendOnlyStorageRead *storageRead,
								  char *filePathName,
								  int version,
								  int64 logicalEof)
{
	File		file;

	Assert(storageRead != NULL);
	Assert(storageRead->isActive);
	Assert(filePathName != NULL);
	/* UNDONE: Range check logicalEof */

	file = AppendOnlyStorageRead_DoOpenFile(storageRead,
											filePathName);
	if (file < 0)
		return false;

	AppendOnlyStorageRead_FinishOpenFile(storageRead,
										 file,
										 filePathName,
										 version,
										 logicalEof);

	return true;
}

/*
 * Set a temporary read range in the current open segment file.
 *
 * The beginFileOffset must be to the beginning of an Append-Only Storage block.
 *
 * The afterFileOffset serves as the temporary EOF.  It will cause
 * ~_GetBlockInfo to return false (no more blocks) when reached.  It must be
 * at the end of an Append-Only Storage block.
 *
 * When ~_GetBlockInfo returns false (no more blocks), the temporary read
 * range is forgotten.
 */
void
AppendOnlyStorageRead_SetTemporaryRange(AppendOnlyStorageRead *storageRead,
										int64 beginFileOffset,
										int64 afterFileOffset)
{
	Assert(storageRead->isActive);
	Assert(storageRead->file != -1);
	Assert(storageRead->formatVersion != -1);
	Assert(beginFileOffset >= 0);
	Assert(beginFileOffset <= storageRead->logicalEof);
	Assert(afterFileOffset >= 0);
	Assert(afterFileOffset <= storageRead->logicalEof);

	BufferedReadSetTemporaryRange(&storageRead->bufferedRead,
								  beginFileOffset,
								  afterFileOffset);
}

/*
 * Close the current segment file.
 *
 * No error if the current is already closed.
 */
void
AppendOnlyStorageRead_CloseFile(AppendOnlyStorageRead *storageRead)
{
	if (!storageRead->isActive)
		return;

	if (storageRead->file == -1)
		return;

	FileClose(storageRead->file);

	storageRead->file = -1;
	storageRead->formatVersion = -1;

	storageRead->logicalEof = INT64CONST(0);

	if (storageRead->bufferedRead.file >= 0)
		BufferedReadCompleteFile(&storageRead->bufferedRead);
}


/*----------------------------------------------------------------
 * Reading Content
 *----------------------------------------------------------------
 */

/*
 * This section describes for reading potentially long content that can be
 * up to 1 Gb long and and/or content may have been be bulk-compressed.
 *
 * The AppendOnlyStorageRead_GetBlockInfo routine is used to peek at the
 * next Append-Only Storage Block and tell the caller how to handle it.
 *
 * If the block is small and not compressed, then it may be looked at
 * directly in the read buffer.
 *
 * Otherwise, the caller must provide other buffer space to either
 * reconstruct large content and/or to decompress content into.
 */

/*
 * Skip zero padding to next page boundary, if necessary.
 *
 * This function is called when the file system block we are scanning has
 * no more valid data but instead is padded with zero's from the position
 * we are currently in until the end of the block. The function will skip
 * to the end of block if skipLen is -1 or skip skipLen bytes otherwise.
 */
static void
AppendOnlyStorageRead_DoSkipPadding(AppendOnlyStorageRead *storageRead,
									int32 skipLen)
{
	int64		nextReadPosition;
	int64		nextBoundaryPosition;
	int32		safeWriteRemainder;
	bool		doSkip;
	uint8	   *buffer;
	int32		availableLen;
	int32		safewrite = storageRead->storageAttributes.safeFSWriteSize;

	/* early exit if no pad used */
	if (safewrite == 0)
		return;

	nextReadPosition =
		BufferedReadNextBufferPosition(&storageRead->bufferedRead);
	nextBoundaryPosition =
		((nextReadPosition + safewrite - 1) / safewrite) * safewrite;
	safeWriteRemainder = (int32) (nextBoundaryPosition - nextReadPosition);

	if (safeWriteRemainder <= 0)
		doSkip = false;
	else if (skipLen == -1)
	{
		/*
		 * Skip to end of page.
		 */
		doSkip = true;
	}
	else
		doSkip = (safeWriteRemainder < skipLen);

	if (doSkip)
	{
		/*
		 * Read through the remainder.
		 */
		buffer = BufferedReadGetNextBuffer(&storageRead->bufferedRead,
										   safeWriteRemainder,
										   &availableLen);

		/*
		 * Since our file EOF should always be a multiple of the file-system
		 * page, we do not expect a short read here.
		 */
		if (buffer == NULL)
			availableLen = 0;
		if (buffer == NULL || safeWriteRemainder != availableLen)
		{
			ereport(ERROR,
					(errcode(ERRCODE_INTERNAL_ERROR),
					 errmsg("unexpected end of file"),
					 errdetail("Expected to read %d bytes after position " INT64_FORMAT " but found %d bytes (bufferCount  " INT64_FORMAT ").",
							safeWriteRemainder,
							nextReadPosition,
							availableLen,
							storageRead->bufferCount)));
		}

		/*
		 * UNDONE: For verification purposes, we should verify the remainder
		 * is all zeroes.
		 */

		elogif(Debug_appendonly_print_scan, LOG,
			   "Append-only scan skipping zero padded remainder for table '%s' (nextReadPosition = " INT64_FORMAT ", safeWriteRemainder = %d)",
			   storageRead->relationName,
			   nextReadPosition,
			   safeWriteRemainder);
	}
}

/*
 * Skip zero padding to next page boundary, if necessary.
 *
 * This function is called when the file system block we are scanning has
 * no more valid data but instead is padded with zero's from the position
 * we are currently in until the end of the block. The function will skip
 * to the end of block if skipLen is -1 or skip skipLen bytes otherwise.
 */
static bool
AppendOnlyStorageRead_PositionToNextBlock(AppendOnlyStorageRead *storageRead,
										  int64 *headerOffsetInFile,
										  uint8 **header,
										  int32 *blockLimitLen)
{
	int32		availableLen;
	int			i;
	int64		fileRemainderLen;

	Assert(storageRead != NULL);
	Assert(header != NULL);

	/*
	 * Peek ahead just enough so we can see the Append-Only storage header.
	 *
	 * However, we need to honor the file-system page boundaries here since we
	 * do not let the length information cross the boundary.
	 */
	AppendOnlyStorageRead_DoSkipPadding(storageRead, storageRead->minimumHeaderLen);

	*headerOffsetInFile =
		BufferedReadNextBufferPosition(&storageRead->bufferedRead);

	*header = BufferedReadGetNextBuffer(&storageRead->bufferedRead,
										storageRead->minimumHeaderLen,
										&availableLen);

	if (*header == NULL)
	{
		/* done reading the file */
		return false;
	}

	storageRead->bufferCount++;

	if (availableLen != storageRead->minimumHeaderLen)
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("expected %d bytes and got %d bytes in table %s (segment file '%s', header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")",
						storageRead->minimumHeaderLen,
						availableLen,
						storageRead->relationName,
						storageRead->segmentFileName,
						*headerOffsetInFile,
						storageRead->bufferCount)));

	/*
	 * First check for zero padded page remainder.
	 */
	i = 0;
	while (true)
	{
		if ((*header)[i] != 0)
			break;

		i++;
		if (i >= storageRead->minimumHeaderLen)
		{
			/*
			 * Skip over zero padding caused when the append command left a
			 * partially full page.
			 */
			AppendOnlyStorageRead_DoSkipPadding(storageRead,
												-1 /* means till end of page */ );

			/*
			 * Now try to get the peek data from the new page.
			 */
			*headerOffsetInFile = BufferedReadNextBufferPosition(&storageRead->bufferedRead);

			*header = BufferedReadGetNextBuffer(&storageRead->bufferedRead,
												storageRead->minimumHeaderLen,
												&availableLen);

			if (*header == NULL)
			{
				/* done reading the file */
				return false;
			}

			if (availableLen != storageRead->minimumHeaderLen)
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("expected %d bytes and found %d bytes in table %s (segment file '%s', header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")",
								storageRead->minimumHeaderLen,
								availableLen,
								storageRead->relationName,
								storageRead->segmentFileName,
								*headerOffsetInFile,
								storageRead->bufferCount)));
			break;
		}
	}

	/*
	 * Determine the maximum boundary of the block. UNDONE: When we have a
	 * block directory, we will tighten the limit down.
	 */
	fileRemainderLen = storageRead->bufferedRead.fileLen -
		*headerOffsetInFile;
	if (storageRead->maxBufferLen > fileRemainderLen)
		*blockLimitLen = (int32) fileRemainderLen;
	else
		*blockLimitLen = storageRead->maxBufferLen;

	return true;
}


char *
AppendOnlyStorageRead_ContextStr(AppendOnlyStorageRead *storageRead)
{
	int64		headerOffsetInFile;

	headerOffsetInFile =
		BufferedReadCurrentPosition(&storageRead->bufferedRead);

	return psprintf("%s. Append-Only segment file '%s', block header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT,
					storageRead->title,
					storageRead->segmentFileName,
					headerOffsetInFile,
					storageRead->bufferCount);
}

/*
 * errcontext_appendonly_read_storage_block
 *
 * Add an errcontext() line showing the table, segment file, offset in file,
 * block count of the storage block being read.
 */
int
errcontext_appendonly_read_storage_block(AppendOnlyStorageRead *storageRead)
{
	char	   *str;

	str = AppendOnlyStorageRead_ContextStr(storageRead);
	errcontext("%s", str);
	pfree(str);

	return 0;
}

char *
AppendOnlyStorageRead_StorageContentHeaderStr(AppendOnlyStorageRead *storageRead)
{
	uint8	   *header;

	header = BufferedReadGetCurrentBuffer(&storageRead->bufferedRead);

	return AppendOnlyStorageFormat_BlockHeaderStr(header,
												  storageRead->storageAttributes.checksum,
												  storageRead->formatVersion);
}

/*
 * errdetail_appendonly_read_storage_content_header
 *
 * Add an errdetail() line showing the Append-Only Storage header being read.
 */
int
errdetail_appendonly_read_storage_content_header(AppendOnlyStorageRead *storageRead)
{
	char	   *str;

	str = AppendOnlyStorageRead_StorageContentHeaderStr(storageRead);

	errdetail("%s", str);

	pfree(str);

	return 0;
}

static void
AppendOnlyStorageRead_LogBlockHeader(AppendOnlyStorageRead *storageRead,
									 uint8 *header)
{
	char	   *contextStr;
	char	   *blockHeaderStr;

	contextStr = AppendOnlyStorageRead_ContextStr(storageRead);

	blockHeaderStr =
		AppendOnlyStorageFormat_SmallContentHeaderStr(header,
													  storageRead->storageAttributes.checksum,
													  storageRead->formatVersion);
	ereport(LOG,
			(errmsg("%s. %s",
					contextStr,
					blockHeaderStr)));

	pfree(contextStr);
	pfree(blockHeaderStr);
}

/*
 * Get information on the next Append-Only Storage Block.
 *
 * Return true if another block was found.  Otherwise, we have reached the
 * end of the current segment file.
 */
bool
AppendOnlyStorageRead_ReadNextBlock(AppendOnlyStorageRead *storageRead)
{
	uint8	   *header;
	AOHeaderCheckError checkError;
	int32		blockLimitLen = 0;	/* Shutup compiler. */
	pg_crc32	storedChecksum;
	pg_crc32	computedChecksum;

	/*
	 * Reset current* variables.
	 */

	/* For efficiency, zero out.  Comment out lines that set fields to 0. */
	memset(&storageRead->current, 0, sizeof(AppendOnlyStorageReadCurrent));

/*	storageRead->current.headerOffsetInFile = 0; */
	storageRead->current.headerKind = AoHeaderKind_None;
/*	storageRead->current.actualHeaderLen = 0; */
/*	storageRead->current.contentLen = 0; */
/*	storageRead->current.overallBlockLen = 0; */
/*	storageRead->current.contentOffset = 0; */
/*	storageRead->current.executorBlockKind = 0; */
/*	storageRead->current.hasFirstRowNum = false; */
	storageRead->current.firstRowNum = INT64CONST(-1);
/*	storageRead->current.rowCount = 0; */
/*	storageRead->current.isLarge = false; */
/*	storageRead->current.isCompressed = false; */
/*	storageRead->current.compressedLen = 0; */

	elogif(Debug_appendonly_print_datumstream, LOG,
		   "before AppendOnlyStorageRead_PositionToNextBlock, storageRead->current.headerOffsetInFile is" INT64_FORMAT "storageRead->current.overallBlockLen is %d",
		   storageRead->current.headerOffsetInFile, storageRead->current.overallBlockLen);

	if (!AppendOnlyStorageRead_PositionToNextBlock(storageRead,
												   &storageRead->current.headerOffsetInFile,
												   &header,
												   &blockLimitLen))
	{
		/* Done reading the file */
		return false;
	}

	elogif(Debug_appendonly_print_datumstream, LOG,
		   "after AppendOnlyStorageRead_PositionToNextBlock, storageRead->current.headerOffsetInFile is" INT64_FORMAT "storageRead->current.overallBlockLen is %d",
		   storageRead->current.headerOffsetInFile, storageRead->current.overallBlockLen);

	/*----------
	 * Proceed very carefully:
	 * [ 1. Verify header checksum ]
	 *	 2. Examine (basic) header.
	 *	 3. Examine specific header.
	 * [ 4. Verify the block checksum ]
	 *----------
	 */
	if (storageRead->storageAttributes.checksum &&
		gp_appendonly_verify_block_checksums)
	{
		if (!AppendOnlyStorageFormat_VerifyHeaderChecksum(header,
														  &storedChecksum,
														  &computedChecksum))
			ereport(ERROR,
					(errcode(ERRCODE_DATA_CORRUPTED),
					 errmsg("header checksum does not match, expected 0x%08X and found 0x%08X",
							storedChecksum, computedChecksum),
					 errdetail_appendonly_read_storage_content_header(storageRead),
					 errcontext_appendonly_read_storage_block(storageRead)));
	}

	/*
	 * Check the (basic) header information.
	 */
	checkError = AppendOnlyStorageFormat_GetHeaderInfo(header,
													   storageRead->storageAttributes.checksum,
													   &storageRead->current.headerKind,
													   &storageRead->current.actualHeaderLen);
	if (checkError != AOHeaderCheckOk)
		ereport(ERROR,
				(errcode(ERRCODE_DATA_CORRUPTED),
				 errmsg("bad append-only storage header, header check error %d, detail '%s'",
						(int) checkError,
						AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
				 errdetail_appendonly_read_storage_content_header(storageRead),
				 errcontext_appendonly_read_storage_block(storageRead)));

	/*
	 * Get more header since AppendOnlyStorageRead_PositionToNextBlock only
	 * gets minimum.
	 */
	if (storageRead->minimumHeaderLen < storageRead->current.actualHeaderLen)
	{
		int32		availableLen;

		header = BufferedReadGrowBuffer(&storageRead->bufferedRead,
										storageRead->current.actualHeaderLen,
										&availableLen);

		if (header == NULL ||
			availableLen != storageRead->current.actualHeaderLen)
			ereport(ERROR,
					(errcode(ERRCODE_INTERNAL_ERROR),
					 errmsg("expected %d bytes and found %d bytes in table %s (segment file '%s', header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")",
							storageRead->current.actualHeaderLen,
							availableLen,
							storageRead->relationName,
							storageRead->segmentFileName,
							storageRead->current.headerOffsetInFile,
							storageRead->bufferCount)));
	}

	/*
	 * Based on the kind of header, we either have small or large content.
	 */
	switch (storageRead->current.headerKind)
	{
		case AoHeaderKind_SmallContent:

			/*
			 * Check the SmallContent header information.
			 */
			checkError = AppendOnlyStorageFormat_GetSmallContentHeaderInfo
				(header,
				 storageRead->current.actualHeaderLen,
				 storageRead->storageAttributes.checksum,
				 blockLimitLen,
				 &storageRead->current.overallBlockLen,
				 &storageRead->current.contentOffset,
				 &storageRead->current.uncompressedLen,
				 &storageRead->current.executorBlockKind,
				 &storageRead->current.hasFirstRowNum,
				 storageRead->formatVersion,
				 &storageRead->current.firstRowNum,
				 &storageRead->current.rowCount,
				 &storageRead->current.isCompressed,
				 &storageRead->current.compressedLen
				);
			if (checkError != AOHeaderCheckOk)
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("bad append-only storage header of type small content, header check error %d, detail '%s'",
								(int) checkError,
								AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
						 errdetail_appendonly_read_storage_content_header(storageRead),
						 errcontext_appendonly_read_storage_block(storageRead)));
			break;

		case AoHeaderKind_LargeContent:

			/*
			 * Check the LargeContent metadata header information.
			 */
			checkError = AppendOnlyStorageFormat_GetLargeContentHeaderInfo
				(header,
				 storageRead->current.actualHeaderLen,
				 storageRead->storageAttributes.checksum,
				 &storageRead->current.uncompressedLen,
				 &storageRead->current.executorBlockKind,
				 &storageRead->current.hasFirstRowNum,
				 &storageRead->current.firstRowNum,
				 &storageRead->current.rowCount);
			if (checkError != AOHeaderCheckOk)
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("bad append-only storage header of type large content, header check error %d, detail '%s'",
								(int) checkError,
								AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
						 errdetail_appendonly_read_storage_content_header(storageRead),
						 errcontext_appendonly_read_storage_block(storageRead)));
			storageRead->current.isLarge = true;
			break;

		case AoHeaderKind_NonBulkDenseContent:

			/*
			 * Check the NonBulkDense header information.
			 */
			checkError =
				AppendOnlyStorageFormat_GetNonBulkDenseContentHeaderInfo
				(header,
				 storageRead->current.actualHeaderLen,
				 storageRead->storageAttributes.checksum,
				 blockLimitLen,
				 &storageRead->current.overallBlockLen,
				 &storageRead->current.contentOffset,
				 &storageRead->current.uncompressedLen,
				 &storageRead->current.executorBlockKind,
				 &storageRead->current.hasFirstRowNum,
				 storageRead->formatVersion,
				 &storageRead->current.firstRowNum,
				 &storageRead->current.rowCount
				);
			if (checkError != AOHeaderCheckOk)
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("bad append-only storage header of type non-bulk dense content, header check error %d, detail '%s'",
								(int) checkError,
								AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
						 errdetail_appendonly_read_storage_content_header(storageRead),
						 errcontext_appendonly_read_storage_block(storageRead)));
			break;

		case AoHeaderKind_BulkDenseContent:

			/*
			 * Check the BulkDenseContent header information.
			 */
			checkError =
				AppendOnlyStorageFormat_GetBulkDenseContentHeaderInfo
				(header,
				 storageRead->current.actualHeaderLen,
				 storageRead->storageAttributes.checksum,
				 blockLimitLen,
				 &storageRead->current.overallBlockLen,
				 &storageRead->current.contentOffset,
				 &storageRead->current.uncompressedLen,
				 &storageRead->current.executorBlockKind,
				 &storageRead->current.hasFirstRowNum,
				 storageRead->formatVersion,
				 &storageRead->current.firstRowNum,
				 &storageRead->current.rowCount,
				 &storageRead->current.isCompressed,
				 &storageRead->current.compressedLen
				);
			if (checkError != AOHeaderCheckOk)
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("bad append-only storage header of type bulk dense content, header check error %d, detail '%s'",
								(int) checkError,
								AppendOnlyStorageFormat_GetHeaderCheckErrorStr()),
						 errdetail_appendonly_read_storage_content_header(storageRead),
						 errcontext_appendonly_read_storage_block(storageRead)));
			break;

		default:
			elog(ERROR, "unexpected append-only header kind %d",
				 storageRead->current.headerKind);
			break;
	}

	if (Debug_appendonly_print_storage_headers)
	{
		AppendOnlyStorageRead_LogBlockHeader(storageRead, header);
	}

	if (storageRead->current.hasFirstRowNum)
	{
		/* UNDONE: Grow buffer and read the value into firstRowNum. */
	}

	if (storageRead->current.headerKind == AoHeaderKind_LargeContent)
	{
		/* UNDONE: Finish the read for the information only header. */
	}

	return true;
}

/*
 * Get information on the next Append-Only Storage Block.
 *
 * Return true if another block was found.  Otherwise, when we have reached
 * the end of the current segment file.
 *
 * OUTPUTS:
 *
 * contentLen	- total byte length of the content.
 * executorBlockKind - executor supplied value stored in the Append-Only
 *				  Storage Block header
 * firstRowNum	- When the first row number for this block was explicitly set,
 *				  that value is returned here.  Otherwise, INT64CONST(-1) is
 *				  returned.
 * rowCount		- number of rows in the content
 * isLarge		- When true, the content was longer than the maxBufferLen
 *				  (i.e. blocksize) minus Append-Only Storage Block header and
 *				  had to be stored in more than one storage block.
 * isCompressed - When true, the content is compressed and cannot be looked at
 *				  directly in the buffer.
 */
bool
AppendOnlyStorageRead_GetBlockInfo(AppendOnlyStorageRead *storageRead,
								   int32 *contentLen,
								   int *executorBlockKind,
								   int64 *firstRowNum,
								   int *rowCount,
								   bool *isLarge,
								   bool *isCompressed)
{
	bool		isNext;

	Assert(storageRead != NULL);
	Assert(storageRead->isActive);

	isNext = AppendOnlyStorageRead_ReadNextBlock(storageRead);

	/*
	 * The current* variables have good values even when there is no next
	 * block.
	 */
	*contentLen = storageRead->current.uncompressedLen;
	*executorBlockKind = storageRead->current.executorBlockKind;
	*firstRowNum = storageRead->current.firstRowNum;
	*rowCount = storageRead->current.rowCount;
	*isLarge = storageRead->current.isLarge;
	*isCompressed = storageRead->current.isCompressed;

	return isNext;
}

/*
 * Return the file offset of the current Append-Only Storage Block.
 */
int64
AppendOnlyStorageRead_CurrentHeaderOffsetInFile(AppendOnlyStorageRead *storageRead)
{
	Assert(storageRead != NULL);
	Assert(storageRead->isActive);

	return storageRead->current.headerOffsetInFile;
}

/*
 * Return the compressed length of the content of the current Append-Only
 * Storage Block.
 */
int64
AppendOnlyStorageRead_CurrentCompressedLen(AppendOnlyStorageRead *storageRead)
{
	Assert(storageRead != NULL);
	Assert(storageRead->isActive);

	return storageRead->current.compressedLen;
}

/*
 * Return the overall block length of the current Append-Only Storage Block.
 */
int64
AppendOnlyStorageRead_OverallBlockLen(AppendOnlyStorageRead *storageRead)
{
	Assert(storageRead != NULL);
	Assert(storageRead->isActive);

	return storageRead->current.overallBlockLen;
}

/*
 * Internal routine to grow the BufferedRead buffer to be the whole current
 * block and to get header and content pointers of current block.
 *
 * Since we are growing the BufferedRead buffer to the whole block, old
 * pointers to the header must be abandoned.
 *
 * Header to current block was read and verified by
 * AppendOnlyStorageRead_ReadNextBlock.
 */
static void
AppendOnlyStorageRead_InternalGetBuffer(AppendOnlyStorageRead *storageRead,
										uint8 **header, uint8 **content)
{
	int32		availableLen;
	pg_crc32	storedChecksum;
	pg_crc32	computedChecksum;

	/*
	 * Verify next block is type Block.
	 */
	Assert(storageRead->current.headerKind == AoHeaderKind_SmallContent ||
		   storageRead->current.headerKind == AoHeaderKind_NonBulkDenseContent ||
		   storageRead->current.headerKind == AoHeaderKind_BulkDenseContent);

	/*
	 * Grow the buffer to the full block length to avoid any unnecessary
	 * copying by BufferedRead.
	 *
	 * Since the BufferedRead module may have to copy information around, we
	 * do not save any pointers to the prior buffer call.  This why
	 * AppendOnlyStorageFormat_GetHeaderInfo passes back the offset to the
	 * data, not a pointer.
	 */
	*header = BufferedReadGrowBuffer(&storageRead->bufferedRead,
									 storageRead->current.overallBlockLen,
									 &availableLen);

	if (storageRead->current.overallBlockLen != availableLen)
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("wrong buffer length, expected %d byte length buffer and got %d",
						storageRead->current.overallBlockLen,
						availableLen),
				 errdetail_appendonly_read_storage_content_header(storageRead),
				 errcontext_appendonly_read_storage_block(storageRead)));

	if (storageRead->storageAttributes.checksum &&
		gp_appendonly_verify_block_checksums)
	{
		/*
		 * Now that the header has been verified, verify the block checksum in
		 * the header with the checksum of the data portion.
		 */
		if (!AppendOnlyStorageFormat_VerifyBlockChecksum(*header,
														 storageRead->current.overallBlockLen,
														 &storedChecksum,
														 &computedChecksum))
			ereport(ERROR,
					(errcode(ERRCODE_DATA_CORRUPTED),
					 errmsg("block checksum does not match, expected 0x%08X and found 0x%08X",
							storedChecksum,
							computedChecksum),
					 errdetail_appendonly_read_storage_content_header(storageRead),
					 errcontext_appendonly_read_storage_block(storageRead)));
	}

	*content = &((*header)[storageRead->current.contentOffset]);
}

/*
 * Get a pointer to the *small* non-compressed content.
 *
 * This interface provides a pointer directly into the read buffer for
 * efficient data use.
 */
uint8 *
AppendOnlyStorageRead_GetBuffer(AppendOnlyStorageRead *storageRead)
{
	uint8	   *header;
	uint8	   *content;

	Assert(storageRead != NULL);
	Assert(storageRead->isActive);

	/*
	 * Verify next block is a "small" non-compressed block.
	 */
	Assert(storageRead->current.headerKind == AoHeaderKind_SmallContent ||
		   storageRead->current.headerKind == AoHeaderKind_NonBulkDenseContent ||
		   storageRead->current.headerKind == AoHeaderKind_BulkDenseContent);
	Assert(!storageRead->current.isLarge);
	Assert(!storageRead->current.isCompressed);

	/*
	 * Fetch pointers to content.
	 */
	AppendOnlyStorageRead_InternalGetBuffer(storageRead,
											&header,
											&content);

	return content;
}

/*
 * Copy the large and/or decompressed content out.
 *
 * The contentOutLen parameter value must match the contentLen from the
 * AppendOnlyStorageReadGetBlockInfo call.
 *
 * Note this routine will work for small non-compressed content, too.
 *
 * contentOut	- memory to receive the contiguous content.
 * contentOutLen - byte length of the contentOut buffer.
 */
void
AppendOnlyStorageRead_Content(AppendOnlyStorageRead *storageRead,
							  uint8 *contentOut,
							  int32 contentOutLen)
{
	Assert(storageRead != NULL);
	Assert(storageRead->isActive);
	Assert(contentOutLen == storageRead->current.uncompressedLen);

	if (storageRead->current.isLarge)
	{
		int64		largeContentPosition;	/* Position of the large content
											 * metadata block. */
		int32		largeContentLen;	/* Total length of the large content. */
		int32		remainingLargeContentLen;	/* The remaining number of
												 * bytes to read for the large
												 * content. */
		uint8	   *contentNext;	/* Pointer inside the contentOut buffer to
									 * put the next byte. */
		int32		regularBlockReadCount;	/* Number of regular blocks read
											 * after the metadata block. */
		int32		regularContentLen;	/* Length of the current regular
										 * block's content. */

		/*
		 * Large content.
		 *
		 * We have the LargeContent "metadata" AO block with the total length
		 * (already read) followed by N SmallContent blocks with the fragments
		 * of the large content.
		 */


		/*
		 * Save any values needed from the current* members since they will be
		 * modifed as we read the regular blocks.
		 */
		largeContentPosition = storageRead->current.headerOffsetInFile;
		largeContentLen = storageRead->current.uncompressedLen;

		/*
		 * Loop to read regular blocks.
		 */
		contentNext = contentOut;
		remainingLargeContentLen = largeContentLen;
		regularBlockReadCount = 0;
		while (true)
		{
			/*
			 * Read next regular block.
			 */
			regularBlockReadCount++;
			if (!AppendOnlyStorageRead_ReadNextBlock(storageRead))
			{
				/*
				 * Unexpected end of file.
				 */
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("unexpected end of file trying to read block %d of large content in segment file '%s' of table '%s'",
								regularBlockReadCount,
								storageRead->segmentFileName,
								storageRead->relationName),
						 errdetail("Large content metadata block is at position " INT64_FORMAT " Large content length %d",
								   largeContentPosition,
								   largeContentLen)));
			}
			if (storageRead->current.headerKind != AoHeaderKind_SmallContent)
			{
				/*
				 * Unexpected headerKind.
				 */
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("expected header kind 'Block' for block %d of large content in segment file '%s' of table '%s'",
								regularBlockReadCount,
								storageRead->segmentFileName,
								storageRead->relationName),
						 errdetail("Large content metadata block is at position " INT64_FORMAT " Large content length %d",
								   largeContentPosition,
								   largeContentLen)));
			}
			Assert(!storageRead->current.isLarge);

			regularContentLen = storageRead->current.uncompressedLen;
			remainingLargeContentLen -= regularContentLen;
			if (remainingLargeContentLen < 0)
			{
				/*
				 * Too much data found???
				 */
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("too much data found after reading %d blocks for large content in segment file '%s' of table '%s'",
								regularBlockReadCount,
								storageRead->segmentFileName,
								storageRead->relationName),
						 errdetail("Large content metadata block is at position " INT64_FORMAT " Large content length %d; extra data length %d",
								largeContentPosition,
								largeContentLen,
								-remainingLargeContentLen)));
			}

			/*
			 * We can safely recurse one level here.
			 */
			AppendOnlyStorageRead_Content(storageRead,
										  contentNext,
										  regularContentLen);

			if (remainingLargeContentLen == 0)
				break;

			/*
			 * Advance our pointer inside the contentOut buffer to put the
			 * next bytes.
			 */
			contentNext += regularContentLen;
		}
	}
	else
	{
		uint8	   *header;
		uint8	   *content;

		/*
		 * "Small" content in one regular block.
		 */

		/*
		 * Fetch pointers to content.
		 */
		AppendOnlyStorageRead_InternalGetBuffer(storageRead,
												&header,
												&content);

		if (!storageRead->current.isCompressed)
		{
			/*
			 * Not compressed.
			 */
			memcpy(contentOut,
				   content,
				   storageRead->current.uncompressedLen);

			if (Debug_appendonly_print_scan)
				elog(LOG,
					 "Append-only Storage Read non-compressed block for table '%s' "
					 "(length = %d, segment file '%s', header offset in file = "
					 INT64_FORMAT ", block count " INT64_FORMAT ")",
					 storageRead->relationName,
					 storageRead->current.uncompressedLen,
					 storageRead->segmentFileName,
					 storageRead->current.headerOffsetInFile,
					 storageRead->bufferCount);
		}
		else
		{
			/*
			 * Compressed.
			 */
			PGFunction	decompressor;
			PGFunction *cfns = storageRead->compression_functions;

			if (cfns == NULL)
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("decompression information missing")));

			decompressor = cfns[COMPRESSION_DECOMPRESS];

			gp_decompress(content,    /* Compressed data in block. */
						  storageRead->current.compressedLen,
						  contentOut,
						  storageRead->current.uncompressedLen,
						  decompressor,
						  storageRead->compressionState,
						  storageRead->bufferCount);

			if (Debug_appendonly_print_scan)
				elog(LOG,
					 "Append-only Storage Read decompressed block for table '%s' "
					 "(compressed length %d, uncompressed length = %d, segment file '%s', "
					 "header offset in file = " INT64_FORMAT ", block count " INT64_FORMAT ")",
					 storageRead->relationName,
					 AppendOnlyStorageFormat_GetCompressedLen(header),
					 storageRead->current.uncompressedLen,
					 storageRead->segmentFileName,
					 storageRead->current.headerOffsetInFile,
					 storageRead->bufferCount);
		}
	}
}

/*
 * Skip the current block found with ~_GetBlockInfo.
 *
 * Do not decompress the block contents.
 *
 * Call this routine instead of calling ~_GetBuffer or ~_Contents that look at
 * contents. Useful when the desired row(s) are not within the row range of
 * the current block.
 */
void
AppendOnlyStorageRead_SkipCurrentBlock(AppendOnlyStorageRead *storageRead)
{
	Assert(storageRead != NULL);
	Assert(storageRead->isActive);

	if (storageRead->current.isLarge)
	{
		int64		largeContentPosition;	/* Position of the large content
											 * metadata block. */
		int32		largeContentLen;	/* Total length of the large content. */
		int32		remainingLargeContentLen;	/* Remaining number of bytes
												 * to read for the large
												 * content. */
		int32		regularBlockReadCount;	/* Number of regular blocks read
											 * after the metadata block. */
		int32		regularContentLen;	/* Length of the current regular
										 * block's content. */
		int32		availableLen;

		/*
		 * Large content.
		 *
		 * We have the LargeContent "metadata" AO block with the total length
		 * (already read) followed by N SmallContent blocks with the fragments
		 * of the large content.
		 */


		/*
		 * Save any values needed from the current* members since they will be
		 * modifed as we read the regular blocks.
		 */
		largeContentPosition = storageRead->current.headerOffsetInFile;
		largeContentLen = storageRead->current.uncompressedLen;

		/*
		 * Loop to read regular blocks.
		 */
		remainingLargeContentLen = largeContentLen;
		regularBlockReadCount = 0;
		while (true)
		{
			/*
			 * Read next regular block.
			 */
			regularBlockReadCount++;
			if (!AppendOnlyStorageRead_ReadNextBlock(storageRead))
			{
				/*
				 * Unexpected end of file.
				 */
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("unexpected end of file trying to read block %d of large content in segment file '%s' of table '%s'",
								regularBlockReadCount,
								storageRead->segmentFileName,
								storageRead->relationName),
						 errdetail("Large content metadata block is at position " INT64_FORMAT " Large content length %d",
								   largeContentPosition, largeContentLen)));
			}
			if (storageRead->current.headerKind != AoHeaderKind_SmallContent)
			{
				/*
				 * Unexpected headerKind.
				 */
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("expected header kind 'Block' for block %d of large content in segment file '%s' of table '%s'",
								regularBlockReadCount,
								storageRead->segmentFileName,
								storageRead->relationName),
						 errdetail("Large content metadata block is at position " INT64_FORMAT " Large content length %d",
								   largeContentPosition, largeContentLen)));
			}
			Assert(!storageRead->current.isLarge);

			BufferedReadGrowBuffer(&storageRead->bufferedRead,
								   storageRead->current.overallBlockLen,
								   &availableLen);

			if (storageRead->current.overallBlockLen != availableLen)
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("wrong buffer length, expected %d byte length buffer and got %d",
								storageRead->current.overallBlockLen,
								availableLen),
						 errdetail_appendonly_read_storage_content_header(storageRead),
						 errcontext_appendonly_read_storage_block(storageRead)));

			regularContentLen = storageRead->current.uncompressedLen;
			remainingLargeContentLen -= regularContentLen;
			if (remainingLargeContentLen < 0)
			{
				/*
				 * Too much data found???
				 */
				ereport(ERROR,
						(errcode(ERRCODE_INTERNAL_ERROR),
						 errmsg("too much data found after reading %d blocks for large content in segment file '%s' of table '%s'",
								regularBlockReadCount,
								storageRead->segmentFileName,
								storageRead->relationName),
						 errdetail("Large content metadata block is at position " INT64_FORMAT " Large content length %d; extra data length %d",
								   largeContentPosition,
								   largeContentLen,
								   -remainingLargeContentLen)));
			}

			/*
			 * Since we are skipping, we do not use the compressed or
			 * uncompressed content.
			 */

			if (remainingLargeContentLen == 0)
				break;
		}
	}
	else
	{
		uint8	   *header;
		uint8	   *content;

		/*
		 * "Small" content in one regular block.
		 */

		/*
		 * Fetch pointers to content.
		 *
		 * Since we are skipping, we do not look at the content.
		 */
		AppendOnlyStorageRead_InternalGetBuffer(storageRead,
												&header,
												&content);
	}
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbappendonlystorageformat 源码

greenplumn cdbappendonlystoragewrite 源码

greenplumn cdbappendonlyxlog 源码

greenplumn cdbbufferedappend 源码

greenplumn cdbbufferedread 源码

greenplumn cdbcat 源码

greenplumn cdbcopy 源码

greenplumn cdbdistributedsnapshot 源码

greenplumn cdbdistributedxacts 源码

greenplumn cdbdistributedxid 源码

0  赞