greenplumn cdbbufferedread 源码

  • 2022-08-18
  • 浏览 (280)

greenplumn cdbbufferedread 代码

文件路径:/src/backend/cdb/cdbbufferedread.c

/*-------------------------------------------------------------------------
 *
 * cdbbufferedread.c
 *	  Read buffers sequentially from a file efficiently.
 *
 * (See .h file for usage comments)
 *
 * Portions Copyright (c) 2007, greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/cdbbufferedread.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "cdb/cdbbufferedread.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "utils/guc.h"

static void BufferedReadIo(
			   BufferedRead *bufferedRead);
static uint8 *BufferedReadUseBeforeBuffer(
							BufferedRead *bufferedRead,
							int32 maxReadAheadLen,
							int32 *nextBufferLen);


/*
 * Determines the amount of memory to supply for
 * BufferedRead given the desired buffer and
 * large write lengths.
 */
int32
BufferedReadMemoryLen(
					  int32 maxBufferLen,
					  int32 maxLargeReadLen)
{
	Assert(maxBufferLen > 0);
	Assert(maxLargeReadLen >= maxBufferLen);

	/* Adjacent before memory for 1 buffer and large read memory. */
	return (maxBufferLen + maxLargeReadLen);
}

/*
 * Initialize BufferedRead.
 *
 * Use the BufferedReadMemoryLen procedure to
 * determine the amount of memory to supply.
 */
void
BufferedReadInit(
				 BufferedRead *bufferedRead,
				 uint8 *memory,
				 int32 memoryLen,
				 int32 maxBufferLen,
				 int32 maxLargeReadLen,
				 char *relationName)
{
	Assert(bufferedRead != NULL);
	Assert(memory != NULL);
	Assert(maxBufferLen > 0);
	Assert(maxLargeReadLen >= maxBufferLen);
	Assert(memoryLen >= BufferedReadMemoryLen(maxBufferLen, maxLargeReadLen));

	memset(bufferedRead, 0, sizeof(BufferedRead));

	/*
	 * Init level.
	 */
	bufferedRead->relationName = pstrdup(relationName);

	/*
	 * Large-read memory level members.
	 */
	bufferedRead->maxBufferLen = maxBufferLen;
	bufferedRead->maxLargeReadLen = maxLargeReadLen;

	bufferedRead->memory = memory;
	bufferedRead->memoryLen = memoryLen;

	bufferedRead->beforeBufferMemory = memory;
	bufferedRead->largeReadMemory =
		&memory[maxBufferLen];

	bufferedRead->largeReadPosition = 0;
	bufferedRead->largeReadLen = 0;

	/*
	 * Buffer level members.
	 */
	bufferedRead->bufferOffset = 0;
	bufferedRead->bufferLen = 0;

	/*
	 * File level members.
	 */
	bufferedRead->file = -1;
	bufferedRead->fileLen = 0;
	/* start reading from beginning of file */
	bufferedRead->fileOff = 0;

	/*
	 * Temporary limit support for random reading.
	 */
	bufferedRead->haveTemporaryLimitInEffect = false;
	bufferedRead->temporaryLimitFileLen = 0;
}

/*
 * Takes an open file handle for the next file.
 */
void
BufferedReadSetFile(
					BufferedRead *bufferedRead,
					File file,
					char *filePathName,
					int64 fileLen)
{
	Assert(bufferedRead != NULL);

	Assert(bufferedRead->file == -1);
	Assert(bufferedRead->fileLen == 0);

	Assert(bufferedRead->largeReadPosition == 0);
	Assert(bufferedRead->largeReadLen == 0);

	Assert(bufferedRead->bufferOffset == 0);
	Assert(bufferedRead->bufferLen == 0);

	Assert(file >= 0);
	Assert(fileLen >= 0);

	bufferedRead->file = file;
	bufferedRead->filePathName = filePathName;
	bufferedRead->fileLen = fileLen;

	bufferedRead->haveTemporaryLimitInEffect = false;
	bufferedRead->temporaryLimitFileLen = 0;
	bufferedRead->fileOff =0;

	if (fileLen > 0)
	{
		/*
		 * Do the first read.
		 */
		if (fileLen > bufferedRead->maxLargeReadLen)
			bufferedRead->largeReadLen = bufferedRead->maxLargeReadLen;
		else
			bufferedRead->largeReadLen = (int32) fileLen;
		BufferedReadIo(bufferedRead);
	}
}

/*
 * Perform a large read i/o.
 */
static void
BufferedReadIo(
			   BufferedRead *bufferedRead)
{
	int32		largeReadLen;
	uint8	   *largeReadMemory;
	int32		offset;

	largeReadLen = bufferedRead->largeReadLen;
	Assert(bufferedRead->largeReadLen > 0);
	largeReadMemory = bufferedRead->largeReadMemory;

	offset = 0;
	while (largeReadLen > 0)
	{
		int			actualLen = FileRead(bufferedRead->file,
										 (char *) largeReadMemory,
										 largeReadLen,
										 bufferedRead->fileOff,
										 WAIT_EVENT_DATA_FILE_READ);

		if (actualLen == 0)
			ereport(ERROR, (errcode_for_file_access(),
							errmsg("read beyond eof in table \"%s\" file \"%s\", "
								   "read position " INT64_FORMAT " (small offset %d), "
								   "actual read length %d (large read length %d)",
								   bufferedRead->relationName,
								   bufferedRead->filePathName,
								   bufferedRead->largeReadPosition,
								   offset,
								   actualLen,
								   bufferedRead->largeReadLen)));
		else if (actualLen < 0)
			ereport(ERROR, (errcode_for_file_access(),
							errmsg("unable to read table \"%s\" file \"%s\", "
								   "read position " INT64_FORMAT " (small offset %d), "
								   "actual read length %d (large read length %d): %m",
								   bufferedRead->relationName,
								   bufferedRead->filePathName,
								   bufferedRead->largeReadPosition,
								   offset,
								   actualLen,
								   bufferedRead->largeReadLen)));
		bufferedRead->fileOff += actualLen;

		elogif(Debug_appendonly_print_read_block, LOG,
			   "Append-Only storage read: table \"%s\", segment file \"%s\", read position " INT64_FORMAT " (small offset %d), "
			   "actual read length %d (equals large read length %d is %s)",
			   bufferedRead->relationName,
			   bufferedRead->filePathName,
			   bufferedRead->largeReadPosition,
			   offset,
			   actualLen,
			   bufferedRead->largeReadLen,
			   (actualLen == bufferedRead->largeReadLen ? "true" : "false"));

		largeReadLen -= actualLen;
		largeReadMemory += actualLen;
		offset += actualLen;
	}

	if (VacuumCostActive)
		VacuumCostBalance += VacuumCostPageMiss;
}

static uint8 *
BufferedReadUseBeforeBuffer(
							BufferedRead *bufferedRead,
							int32 maxReadAheadLen,
							int32 *nextBufferLen)
{
	int64		inEffectFileLen;
	int64		nextPosition;
	int64		remainingFileLen;
	int32		nextReadLen;
	int32		beforeLen;
	int32		beforeOffset;
	int32		extraLen;

	Assert(bufferedRead->largeReadLen - bufferedRead->bufferOffset > 0);

	if (bufferedRead->haveTemporaryLimitInEffect)
		inEffectFileLen = bufferedRead->temporaryLimitFileLen;
	else
		inEffectFileLen = bufferedRead->fileLen;

	/*
	 * Requesting more data than in the current read.
	 */
	nextPosition = bufferedRead->largeReadPosition + bufferedRead->largeReadLen;

	if (nextPosition == inEffectFileLen)
	{
		/*
		 * No more file to read.
		 */
		bufferedRead->bufferLen = bufferedRead->largeReadLen -
			bufferedRead->bufferOffset;
		Assert(bufferedRead->bufferLen > 0);

		*nextBufferLen = bufferedRead->bufferLen;
		return &bufferedRead->largeReadMemory[bufferedRead->bufferOffset];
	}

	remainingFileLen = inEffectFileLen -
		nextPosition;
	if (remainingFileLen > bufferedRead->maxLargeReadLen)
		nextReadLen = bufferedRead->maxLargeReadLen;
	else
		nextReadLen = (int32) remainingFileLen;

	Assert(nextReadLen >= 0);

	beforeLen = bufferedRead->maxLargeReadLen - bufferedRead->bufferOffset;
	beforeOffset = bufferedRead->maxBufferLen - beforeLen;

	/*
	 * Copy data from the current large-read buffer into the before memory.
	 */
	memcpy(&bufferedRead->beforeBufferMemory[beforeOffset],
		   &bufferedRead->largeReadMemory[bufferedRead->bufferOffset],
		   beforeLen);

	/*
	 * Do the next read.
	 */
	bufferedRead->largeReadPosition = nextPosition;
	bufferedRead->largeReadLen = nextReadLen;

	/*
	 * MPP-17061: nextReadLen should never be 0, since the code will return
	 * above if inEffectFileLen is equal to nextPosition.
	 */
	if (nextReadLen == 0)
	{
		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
						errmsg("Unexpected internal error,"
							   " largeReadLen is set to 0 before calling BufferedReadIo."
							   " remainingFileLen is " INT64_FORMAT
							   " inEffectFileLen is " INT64_FORMAT
							   " nextPosition is " INT64_FORMAT,
							   remainingFileLen, inEffectFileLen, nextPosition)));
	}

	BufferedReadIo(bufferedRead);

	extraLen = maxReadAheadLen - beforeLen;
	Assert(extraLen > 0);
	if (extraLen > nextReadLen)
		extraLen = (int32) nextReadLen;

	/*
	 * Return a buffer using a negative offset that starts in the before
	 * memory and goes into the large-read memory.
	 */
	bufferedRead->bufferOffset = -beforeLen;
	bufferedRead->bufferLen = beforeLen + extraLen;
	Assert(bufferedRead->bufferLen > 0);

	*nextBufferLen = bufferedRead->bufferLen;

	return &bufferedRead->beforeBufferMemory[beforeOffset];
}

/*
 * Set a temporary read range in the current open segment file.
 *
 * The beginFileOffset must be to the beginning of an Append-Only Storage block.
 *
 * The afterFileOffset serves as the temporary EOF.  It will cause reads to return
 * false (no more blocks) when reached.  It must be at the end of an Append-Only Storage
 * block.
 *
 * When a read returns false (no more blocks), the temporary read range is forgotten.
 */
void
BufferedReadSetTemporaryRange(
							  BufferedRead *bufferedRead,
							  int64 beginFileOffset,
							  int64 afterFileOffset)
{
	bool		newReadNeeded;
	int64		largeReadAfterPos;

	Assert(bufferedRead != NULL);
	Assert(bufferedRead->file >= 0);

	/*
	 * Forget any current read buffer length (but not the offset!).
	 */
	bufferedRead->bufferLen = 0;

	newReadNeeded = false;
	largeReadAfterPos = bufferedRead->largeReadPosition +
		bufferedRead->largeReadLen;
	if (bufferedRead->bufferOffset < 0)
	{
		int64		virtualLargeReadBeginPos;

		virtualLargeReadBeginPos = bufferedRead->largeReadPosition +
			bufferedRead->bufferOffset;

		if (beginFileOffset >= virtualLargeReadBeginPos &&
			beginFileOffset < largeReadAfterPos)
		{
			/*
			 * Our temporary read begin position is within our before buffer
			 * and current read.
			 */
			bufferedRead->bufferOffset =
				(int32) (beginFileOffset -
						 bufferedRead->largeReadPosition);
		}
		else
			newReadNeeded = true;
	}
	else
	{
		if (beginFileOffset >= bufferedRead->largeReadPosition &&
			beginFileOffset < largeReadAfterPos)
		{
			/*
			 * Our temporary read begin position is within our current read.
			 */
			bufferedRead->bufferOffset =
				(int32) (beginFileOffset -
						 bufferedRead->largeReadPosition);
		}
		else
		{
			newReadNeeded = true;
		}
	}

	if (newReadNeeded)
	{
		int64		remainingFileLen;

		/*
		 * Seek to the requested beginning position. MPP-17061: allow seeking
		 * backward (negative offset) in file, this could happen during index
		 * scan, if we do lookup for a block directory entry at the end of the
		 * segment file, followed by a lookup for a block directory entry at
		 * the beginning of file.
		 */
		bufferedRead->fileOff = beginFileOffset;
		bufferedRead->bufferOffset = 0;

		remainingFileLen = afterFileOffset - beginFileOffset;
		if (remainingFileLen > bufferedRead->maxLargeReadLen)
			bufferedRead->largeReadLen = bufferedRead->maxLargeReadLen;
		else
			bufferedRead->largeReadLen = (int32) remainingFileLen;

		bufferedRead->largeReadPosition = beginFileOffset;

		if (bufferedRead->largeReadLen > 0)
			BufferedReadIo(bufferedRead);
	}

	bufferedRead->haveTemporaryLimitInEffect = true;
	bufferedRead->temporaryLimitFileLen = afterFileOffset;

}

/*
 * Return the position of the next read in bytes.
 */
int64
BufferedReadNextBufferPosition(
							   BufferedRead *bufferedRead)
{
	Assert(bufferedRead != NULL);
	Assert(bufferedRead->file >= 0);

	/*
	 * Note that the bufferOffset can be negative when we are using the before
	 * buffer, but that is ok.  We should get an accurate next position.
	 */
	return bufferedRead->largeReadPosition +
		bufferedRead->bufferOffset +
		bufferedRead->bufferLen;
}

/*
 * Get the next buffer space for reading with a specified max read-ahead
 * amount.
 *
 * Returns NULL when the current file has been completely read.
 */
uint8 *
BufferedReadGetNextBuffer(
						  BufferedRead *bufferedRead,
						  int32 maxReadAheadLen,
						  int32 *nextBufferLen)
{
	int64		inEffectFileLen;

	Assert(bufferedRead != NULL);
	Assert(bufferedRead->file >= 0);
	Assert(maxReadAheadLen > 0);
	Assert(nextBufferLen != NULL);
	if (maxReadAheadLen > bufferedRead->maxBufferLen)
	{
		Assert(maxReadAheadLen <= bufferedRead->maxBufferLen);

		elog(ERROR, "Read ahead length %d is greater than maximum buffer length %d",
			 maxReadAheadLen, bufferedRead->maxBufferLen);
	}

	if (bufferedRead->haveTemporaryLimitInEffect)
		inEffectFileLen = bufferedRead->temporaryLimitFileLen;
	else
		inEffectFileLen = bufferedRead->fileLen;

	/*
	 * Finish previous buffer.
	 */
	bufferedRead->bufferOffset += bufferedRead->bufferLen;
	bufferedRead->bufferLen = 0;

	if (bufferedRead->largeReadPosition == inEffectFileLen)
	{
		/*
		 * At end of file.
		 */
		*nextBufferLen = 0;
		bufferedRead->haveTemporaryLimitInEffect = false;
		return NULL;
	}

	/*
	 * Any more left in current read?
	 */
	if (bufferedRead->bufferOffset == bufferedRead->largeReadLen)
	{
		int64		remainingFileLen;

		/*
		 * Used exactly all of it.  Attempt read more.
		 */
		bufferedRead->largeReadPosition += bufferedRead->largeReadLen;
		bufferedRead->bufferOffset = 0;

		remainingFileLen = inEffectFileLen -
			bufferedRead->largeReadPosition;
		if (remainingFileLen > bufferedRead->maxLargeReadLen)
			bufferedRead->largeReadLen = bufferedRead->maxLargeReadLen;
		else
			bufferedRead->largeReadLen = (int32) remainingFileLen;
		if (bufferedRead->largeReadLen == 0)
		{
			/*
			 * At end of file.
			 */
			*nextBufferLen = 0;
			bufferedRead->haveTemporaryLimitInEffect = false;
			return NULL;
		}

		BufferedReadIo(bufferedRead);

		if (maxReadAheadLen > bufferedRead->largeReadLen)
			bufferedRead->bufferLen = bufferedRead->largeReadLen;
		else
			bufferedRead->bufferLen = maxReadAheadLen;

		Assert(bufferedRead->bufferOffset == 0);
		Assert(bufferedRead->bufferLen > 0);

		*nextBufferLen = bufferedRead->bufferLen;
		return bufferedRead->largeReadMemory;
	}

	if (bufferedRead->bufferOffset + maxReadAheadLen > bufferedRead->largeReadLen)
	{
		/*
		 * Odd boundary.  Use before memory to carry us over.
		 */
		Assert(bufferedRead->largeReadLen - bufferedRead->bufferOffset > 0);

		return BufferedReadUseBeforeBuffer(
										   bufferedRead,
										   maxReadAheadLen,
										   nextBufferLen);
	}

	/*
	 * We can satisify request from current read.
	 */
	bufferedRead->bufferLen = maxReadAheadLen;
	Assert(bufferedRead->bufferLen > 0);

	*nextBufferLen = bufferedRead->bufferLen;
	return &bufferedRead->largeReadMemory[bufferedRead->bufferOffset];
}

/*
 * Grow the available length of the current buffer.
 *
 * NOTE: The buffer address returned can be different, even for previously
 * examined buffer data.  In other words, don't keep buffer pointers in
 * the buffer region.  Use offsets to re-establish pointers after this call.
 *
 * If the current file has been completely read, bufferLen will remain
 * the current value.
 */
uint8 *
BufferedReadGrowBuffer(
					   BufferedRead *bufferedRead,
					   int32 newMaxReadAheadLen,
					   int32 *growBufferLen)
{
	int32		newNextOffset;

	Assert(bufferedRead != NULL);
	Assert(bufferedRead->file >= 0);
	Assert(newMaxReadAheadLen > bufferedRead->bufferLen);
	Assert(newMaxReadAheadLen <= bufferedRead->maxBufferLen);
	Assert(growBufferLen != NULL);

	newNextOffset = bufferedRead->bufferOffset + newMaxReadAheadLen;
	if (newNextOffset > bufferedRead->largeReadLen)
	{
		/*
		 * Odd boundary.  Use before memory to carry us over.
		 */
		Assert(bufferedRead->largeReadLen - bufferedRead->bufferOffset > 0);

		return BufferedReadUseBeforeBuffer(
										   bufferedRead,
										   newMaxReadAheadLen,
										   growBufferLen);
	}

	/*
	 * There is maxReadAheadLen more left in current large-read memory.
	 */
	bufferedRead->bufferLen = newMaxReadAheadLen;
	Assert(bufferedRead->bufferLen > 0);

	*growBufferLen = bufferedRead->bufferLen;
	return &bufferedRead->largeReadMemory[bufferedRead->bufferOffset];
}

/*
 * Return the address of the current read buffer.
 */
uint8 *
BufferedReadGetCurrentBuffer(
							 BufferedRead *bufferedRead)
{
	Assert(bufferedRead != NULL);
	Assert(bufferedRead->file >= 0);

	return &bufferedRead->largeReadMemory[bufferedRead->bufferOffset];
}

/*
 * Return the current buffer's start position.
 */
int64
BufferedReadCurrentPosition(
							BufferedRead *bufferedRead)
{
	Assert(bufferedRead != NULL);
	Assert(bufferedRead->file >= 0);

	return bufferedRead->largeReadPosition + bufferedRead->bufferOffset;
}

/*
 * Flushes the current file for append.  Caller is responsible for closing
 * the file afterwards.
 */
void
BufferedReadCompleteFile(
						 BufferedRead *bufferedRead)
{
	Assert(bufferedRead != NULL);
	Assert(bufferedRead->file >= 0);

	bufferedRead->file = -1;
	bufferedRead->filePathName = NULL;
	bufferedRead->fileLen = 0;

	bufferedRead->bufferOffset = 0;
	bufferedRead->bufferLen = 0;

	bufferedRead->largeReadPosition = 0;
	bufferedRead->largeReadLen = 0;
}


/*
 * Finish with reading all together.
 */
void
BufferedReadFinish(BufferedRead *bufferedRead)
{
	Assert(bufferedRead != NULL);

	/* Assert(bufferedRead->file == -1); */
	Assert(bufferedRead->fileLen == 0);

	Assert(bufferedRead->bufferOffset == 0);
	Assert(bufferedRead->bufferLen == 0);

	if (bufferedRead->memory)
	{
		pfree(bufferedRead->memory);
		bufferedRead->memory = NULL;
		bufferedRead->memoryLen = 0;
	}

	if (bufferedRead->relationName != NULL)
	{
		pfree(bufferedRead->relationName);
		bufferedRead->relationName = NULL;
	}
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbappendonlystorageformat 源码

greenplumn cdbappendonlystorageread 源码

greenplumn cdbappendonlystoragewrite 源码

greenplumn cdbappendonlyxlog 源码

greenplumn cdbbufferedappend 源码

greenplumn cdbcat 源码

greenplumn cdbcopy 源码

greenplumn cdbdistributedsnapshot 源码

greenplumn cdbdistributedxacts 源码

greenplumn cdbdistributedxid 源码

0  赞