greenplumn logtape 源码

  • 2022-08-18
  • 浏览 (161)

greenplumn logtape 代码

文件路径:/src/backend/utils/sort/logtape.c

/*-------------------------------------------------------------------------
 *
 * logtape.c
 *	  Management of "logical tapes" within temporary files.
 *
 * This module exists to support sorting via multiple merge passes (see
 * tuplesort.c).  Merging is an ideal algorithm for tape devices, but if
 * we implement it on disk by creating a separate file for each "tape",
 * there is an annoying problem: the peak space usage is at least twice
 * the volume of actual data to be sorted.  (This must be so because each
 * datum will appear in both the input and output tapes of the final
 * merge pass.  For seven-tape polyphase merge, which is otherwise a
 * pretty good algorithm, peak usage is more like 4x actual data volume.)
 *
 * We can work around this problem by recognizing that any one tape
 * dataset (with the possible exception of the final output) is written
 * and read exactly once in a perfectly sequential manner.  Therefore,
 * a datum once read will not be required again, and we can recycle its
 * space for use by the new tape dataset(s) being generated.  In this way,
 * the total space usage is essentially just the actual data volume, plus
 * insignificant bookkeeping and start/stop overhead.
 *
 * Few OSes allow arbitrary parts of a file to be released back to the OS,
 * so we have to implement this space-recycling ourselves within a single
 * logical file.  logtape.c exists to perform this bookkeeping and provide
 * the illusion of N independent tape devices to tuplesort.c.  Note that
 * logtape.c itself depends on buffile.c to provide a "logical file" of
 * larger size than the underlying OS may support.
 *
 * For simplicity, we allocate and release space in the underlying file
 * in BLCKSZ-size blocks.  Space allocation boils down to keeping track
 * of which blocks in the underlying file belong to which logical tape,
 * plus any blocks that are free (recycled and not yet reused).
 * The blocks in each logical tape form a chain, with a prev- and next-
 * pointer in each block.
 *
 * The initial write pass is guaranteed to fill the underlying file
 * perfectly sequentially, no matter how data is divided into logical tapes.
 * Once we begin merge passes, the access pattern becomes considerably
 * less predictable --- but the seeking involved should be comparable to
 * what would happen if we kept each logical tape in a separate file,
 * so there's no serious performance penalty paid to obtain the space
 * savings of recycling.  We try to localize the write accesses by always
 * writing to the lowest-numbered free block when we have a choice; it's
 * not clear this helps much, but it can't hurt.  (XXX perhaps a LIFO
 * policy for free blocks would be better?)
 *
 * To further make the I/Os more sequential, we can use a larger buffer
 * when reading, and read multiple blocks from the same tape in one go,
 * whenever the buffer becomes empty.
 *
 * To support the above policy of writing to the lowest free block, the
 * freelist is a min heap.
 *
 * Since all the bookkeeping and buffer memory is allocated with palloc(),
 * and the underlying file(s) are made with OpenTemporaryFile, all resources
 * for a logical tape set are certain to be cleaned up even if processing
 * is aborted by ereport(ERROR).  To avoid confusion, the caller should take
 * care that all calls for a single LogicalTapeSet are made in the same
 * palloc context.
 *
 * To support parallel sort operations involving coordinated callers to
 * tuplesort.c routines across multiple workers, it is necessary to
 * concatenate each worker BufFile/tapeset into one single logical tapeset
 * managed by the leader.  Workers should have produced one final
 * materialized tape (their entire output) when this happens in leader.
 * There will always be the same number of runs as input tapes, and the same
 * number of input tapes as participants (worker Tuplesortstates).
 *
 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/backend/utils/sort/logtape.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "storage/buffile.h"
#include "utils/builtins.h"
#include "utils/logtape.h"
#include "utils/memdebug.h"
#include "utils/memutils.h"

/*
 * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
 *
 * The first block of a tape has prev == -1.  The last block of a tape
 * stores the number of valid bytes on the block, inverted, in 'next'
 * Therefore next < 0 indicates the last block.
 */
typedef struct TapeBlockTrailer
{
	long		prev;			/* previous block on this tape, or -1 on first
								 * block */
	long		next;			/* next block on this tape, or # of valid
								 * bytes on last block (if < 0) */
} TapeBlockTrailer;

#define TapeBlockPayloadSize  (BLCKSZ - sizeof(TapeBlockTrailer))
#define TapeBlockGetTrailer(buf) \
	((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))

#define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
#define TapeBlockGetNBytes(buf) \
	(TapeBlockIsLast(buf) ? \
	 (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
#define TapeBlockSetNBytes(buf, nbytes) \
	(TapeBlockGetTrailer(buf)->next = -(nbytes))

/*
 * When multiple tapes are being written to concurrently (as in HashAgg),
 * avoid excessive fragmentation by preallocating block numbers to individual
 * tapes. Each preallocation doubles in size starting at
 * TAPE_WRITE_PREALLOC_MIN blocks up to TAPE_WRITE_PREALLOC_MAX blocks.
 *
 * No filesystem operations are performed for preallocation; only the block
 * numbers are reserved. This may lead to sparse writes, which will cause
 * ltsWriteBlock() to fill in holes with zeros.
 */
#define TAPE_WRITE_PREALLOC_MIN 8
#define TAPE_WRITE_PREALLOC_MAX 128

/*
 * This data structure represents a single "logical tape" within the set
 * of logical tapes stored in the same file.
 *
 * While writing, we hold the current partially-written data block in the
 * buffer.  While reading, we can hold multiple blocks in the buffer.  Note
 * that we don't retain the trailers of a block when it's read into the
 * buffer.  The buffer therefore contains one large contiguous chunk of data
 * from the tape.
 */
typedef struct LogicalTape
{
	bool		writing;		/* T while in write phase */
	bool		frozen;			/* T if blocks should not be freed when read */
	bool		dirty;			/* does buffer need to be written? */

	/*
	 * Block numbers of the first, current, and next block of the tape.
	 *
	 * The "current" block number is only valid when writing, or reading from
	 * a frozen tape.  (When reading from an unfrozen tape, we use a larger
	 * read buffer that holds multiple blocks, so the "current" block is
	 * ambiguous.)
	 *
	 * When concatenation of worker tape BufFiles is performed, an offset to
	 * the first block in the unified BufFile space is applied during reads.
	 */
	long		firstBlockNumber;
	long		curBlockNumber;
	long		nextBlockNumber;
	long		offsetBlockNumber;

	/*
	 * Buffer for current data block(s).
	 */
	char	   *buffer;			/* physical buffer (separately palloc'd) */
	int			buffer_size;	/* allocated size of the buffer */
	int			max_size;		/* highest useful, safe buffer_size */
	int			pos;			/* next read/write position in buffer */
	int			nbytes;			/* total # of valid bytes in buffer */

	/*
	 * Preallocated block numbers are held in an array sorted in descending
	 * order; blocks are consumed from the end of the array (lowest block
	 * numbers first).
	 */
	long	   *prealloc;
	int			nprealloc;		/* number of elements in list */
	int			prealloc_size;	/* number of elements list can hold */
} LogicalTape;

/*
 * This data structure represents a set of related "logical tapes" sharing
 * space in a single underlying file.  (But that "file" may be multiple files
 * if needed to escape OS limits on file size; buffile.c handles that for us.)
 * The number of tapes is fixed at creation.
 */
struct LogicalTapeSet
{
	BufFile    *pfile;			/* underlying file for whole tape set */

	/*
	 * File size tracking.  nBlocksWritten is the size of the underlying file,
	 * in BLCKSZ blocks.  nBlocksAllocated is the number of blocks allocated
	 * by ltsReleaseBlock(), and it is always greater than or equal to
	 * nBlocksWritten.  Blocks between nBlocksAllocated and nBlocksWritten are
	 * blocks that have been allocated for a tape, but have not been written
	 * to the underlying file yet.  nHoleBlocks tracks the total number of
	 * blocks that are in unused holes between worker spaces following BufFile
	 * concatenation.
	 */
	long		nBlocksAllocated;	/* # of blocks allocated */
	long		nBlocksWritten; /* # of blocks used in underlying file */
	long		nHoleBlocks;	/* # of "hole" blocks left */

	/*
	 * We store the numbers of recycled-and-available blocks in freeBlocks[].
	 * When there are no such blocks, we extend the underlying file.
	 *
	 * If forgetFreeSpace is true then any freed blocks are simply forgotten
	 * rather than being remembered in freeBlocks[].  See notes for
	 * LogicalTapeSetForgetFreeSpace().
	 */
	bool		forgetFreeSpace;	/* are we remembering free blocks? */
	long	   *freeBlocks;		/* resizable array holding minheap */
	long		nFreeBlocks;	/* # of currently free blocks */
	Size		freeBlocksLen;	/* current allocated length of freeBlocks[] */

	/* The array of logical tapes. */
	int			nTapes;			/* # of logical tapes in set */
	LogicalTape *tapes;			/* has nTapes nentries */
};

static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static long ltsGetFreeBlock(LogicalTapeSet *lts);
static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
								 SharedFileSet *fileset);
static void ltsInitTape(LogicalTape *lt);
static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt);

char *
LogicalTapeGetBufFilename(const LogicalTapeSet *lts)
{
	return lts->pfile ? pstrdup(BufFileGetFilename(lts->pfile)) : NULL;
}

/*
 * Write a block-sized buffer to the specified block of the underlying file.
 *
 * No need for an error return convention; we ereport() on any error.
 */
static void
ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
{
	/*
	 * BufFile does not support "holes", so if we're about to write a block
	 * that's past the current end of file, fill the space between the current
	 * end of file and the target block with zeros.
	 *
	 * This should happen rarely, otherwise you are not writing very
	 * sequentially.  In current use, this only happens when the sort ends
	 * writing a run, and switches to another tape.  The last block of the
	 * previous tape isn't flushed to disk until the end of the sort, so you
	 * get one-block hole, where the last block of the previous tape will
	 * later go.
	 *
	 * Note that BufFile concatenation can leave "holes" in BufFile between
	 * worker-owned block ranges.  These are tracked for reporting purposes
	 * only.  We never read from nor write to these hole blocks, and so they
	 * are not considered here.
	 */
	while (blocknum > lts->nBlocksWritten)
	{
		PGAlignedBlock zerobuf;

		MemSet(zerobuf.data, 0, sizeof(zerobuf));

		ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
	}

	/* Write the requested block */
	if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not seek to block %ld of temporary file",
						blocknum)));
	BufFileWrite(lts->pfile, buffer, BLCKSZ);

	/* Update nBlocksWritten, if we extended the file */
	if (blocknum == lts->nBlocksWritten)
		lts->nBlocksWritten++;
}

/*
 * Read a block-sized buffer from the specified block of the underlying file.
 *
 * No need for an error return convention; we ereport() on any error.   This
 * module should never attempt to read a block it doesn't know is there.
 */
static void
ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
{
	size_t		nread;

	if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not seek to block %ld of temporary file",
						blocknum)));
	nread = BufFileRead(lts->pfile, buffer, BLCKSZ);
	if (nread != BLCKSZ)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not read block %ld of temporary file: read only %zu of %zu bytes",
						blocknum, nread, (size_t) BLCKSZ)));
}

/*
 * Read as many blocks as we can into the per-tape buffer.
 *
 * Returns true if anything was read, 'false' on EOF.
 */
static bool
ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
{
	lt->pos = 0;
	lt->nbytes = 0;

	do
	{
		char	   *thisbuf = lt->buffer + lt->nbytes;
		long		datablocknum = lt->nextBlockNumber;

		/* Fetch next block number */
		if (datablocknum == -1L)
			break;				/* EOF */
		/* Apply worker offset, needed for leader tapesets */
		datablocknum += lt->offsetBlockNumber;

		/* Read the block */
		ltsReadBlock(lts, datablocknum, (void *) thisbuf);
		if (!lt->frozen)
			ltsReleaseBlock(lts, datablocknum);
		lt->curBlockNumber = lt->nextBlockNumber;

		lt->nbytes += TapeBlockGetNBytes(thisbuf);
		if (TapeBlockIsLast(thisbuf))
		{
			lt->nextBlockNumber = -1L;
			/* EOF */
			break;
		}
		else
			lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;

		/* Advance to next block, if we have buffer space left */
	} while (lt->buffer_size - lt->nbytes > BLCKSZ);

	return (lt->nbytes > 0);
}

static inline void
swap_nodes(long *heap, unsigned long a, unsigned long b)
{
	unsigned long swap;

	swap = heap[a];
	heap[a] = heap[b];
	heap[b] = swap;
}

static inline unsigned long
left_offset(unsigned long i)
{
	return 2 * i + 1;
}

static inline unsigned long
right_offset(unsigned i)
{
	return 2 * i + 2;
}

static inline unsigned long
parent_offset(unsigned long i)
{
	return (i - 1) / 2;
}

/*
 * Select the lowest currently unused block by taking the first element from
 * the freelist min heap.
 */
static long
ltsGetFreeBlock(LogicalTapeSet *lts)
{
	long	   *heap = lts->freeBlocks;
	long		blocknum;
	int			heapsize;
	unsigned long pos;

	/* freelist empty; allocate a new block */
	if (lts->nFreeBlocks == 0)
		return lts->nBlocksAllocated++;

	if (lts->nFreeBlocks == 1)
	{
		lts->nFreeBlocks--;
		return lts->freeBlocks[0];
	}

	/* take top of minheap */
	blocknum = heap[0];

	/* replace with end of minheap array */
	heap[0] = heap[--lts->nFreeBlocks];

	/* sift down */
	pos = 0;
	heapsize = lts->nFreeBlocks;
	while (true)
	{
		unsigned long left = left_offset(pos);
		unsigned long right = right_offset(pos);
		unsigned long min_child;

		if (left < heapsize && right < heapsize)
			min_child = (heap[left] < heap[right]) ? left : right;
		else if (left < heapsize)
			min_child = left;
		else if (right < heapsize)
			min_child = right;
		else
			break;

		if (heap[min_child] >= heap[pos])
			break;

		swap_nodes(heap, min_child, pos);
		pos = min_child;
	}

	return blocknum;
}

/*
 * Return the lowest free block number from the tape's preallocation list.
 * Refill the preallocation list if necessary.
 */
static long
ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt)
{
	/* sorted in descending order, so return the last element */
	if (lt->nprealloc > 0)
		return lt->prealloc[--lt->nprealloc];

	if (lt->prealloc == NULL)
	{
		lt->prealloc_size = TAPE_WRITE_PREALLOC_MIN;
		lt->prealloc = (long *) palloc(sizeof(long) * lt->prealloc_size);
	}
	else if (lt->prealloc_size < TAPE_WRITE_PREALLOC_MAX)
	{
		/* when the preallocation list runs out, double the size */
		lt->prealloc_size *= 2;
		if (lt->prealloc_size > TAPE_WRITE_PREALLOC_MAX)
			lt->prealloc_size = TAPE_WRITE_PREALLOC_MAX;
		lt->prealloc = (long *) repalloc(lt->prealloc,
										 sizeof(long) * lt->prealloc_size);
	}

	/* refill preallocation list */
	lt->nprealloc = lt->prealloc_size;
	for (int i = lt->nprealloc; i > 0; i--)
	{
		lt->prealloc[i - 1] = ltsGetFreeBlock(lts);

		/* verify descending order */
		Assert(i == lt->nprealloc || lt->prealloc[i - 1] > lt->prealloc[i]);
	}

	return lt->prealloc[--lt->nprealloc];
}

/*
 * Return a block# to the freelist.
 */
static void
ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
{
	long	   *heap;
	unsigned long pos;

	/*
	 * Do nothing if we're no longer interested in remembering free space.
	 */
	if (lts->forgetFreeSpace)
		return;

	/*
	 * Enlarge freeBlocks array if full.
	 */
	if (lts->nFreeBlocks >= lts->freeBlocksLen)
	{
		/*
		 * If the freelist becomes very large, just return and leak this free
		 * block.
		 */
		if (lts->freeBlocksLen * 2 > MaxAllocSize)
			return;

		lts->freeBlocksLen *= 2;
		lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
											lts->freeBlocksLen * sizeof(long));
	}

	heap = lts->freeBlocks;
	pos = lts->nFreeBlocks;

	/* place entry at end of minheap array */
	heap[pos] = blocknum;
	lts->nFreeBlocks++;

	/* sift up */
	while (pos != 0)
	{
		unsigned long parent = parent_offset(pos);

		if (heap[parent] < heap[pos])
			break;

		swap_nodes(heap, parent, pos);
		pos = parent;
	}
}

/*
 * Claim ownership of a set of logical tapes from existing shared BufFiles.
 *
 * Caller should be leader process.  Though tapes are marked as frozen in
 * workers, they are not frozen when opened within leader, since unfrozen tapes
 * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
 * for random access.)
 */
static void
ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
					 SharedFileSet *fileset)
{
	LogicalTape *lt = NULL;
	long		tapeblocks = 0L;
	long		nphysicalblocks = 0L;
	int			i;

	/* Should have at least one worker tape, plus leader's tape */
	Assert(lts->nTapes >= 2);

	/*
	 * Build concatenated view of all BufFiles, remembering the block number
	 * where each source file begins.  No changes are needed for leader/last
	 * tape.
	 */
	for (i = 0; i < lts->nTapes - 1; i++)
	{
		char		filename[MAXPGPATH];
		BufFile    *file;
		int64		filesize;

		lt = &lts->tapes[i];

		pg_itoa(i, filename);
		file = BufFileOpenShared(fileset, filename);
		filesize = BufFileSize(file);

		/*
		 * Stash first BufFile, and concatenate subsequent BufFiles to that.
		 * Store block offset into each tape as we go.
		 */
		lt->firstBlockNumber = shared[i].firstblocknumber;
		if (i == 0)
		{
			lts->pfile = file;
			lt->offsetBlockNumber = 0L;
		}
		else
		{
			lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
		}
		/* Don't allocate more for read buffer than could possibly help */
		lt->max_size = Min(MaxAllocSize, filesize);
		tapeblocks = filesize / BLCKSZ;
		nphysicalblocks += tapeblocks;
	}

	/*
	 * Set # of allocated blocks, as well as # blocks written.  Use extent of
	 * new BufFile space (from 0 to end of last worker's tape space) for this.
	 * Allocated/written blocks should include space used by holes left
	 * between concatenated BufFiles.
	 */
	lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
	lts->nBlocksWritten = lts->nBlocksAllocated;

	/*
	 * Compute number of hole blocks so that we can later work backwards, and
	 * instrument number of physical blocks.  We don't simply use physical
	 * blocks directly for instrumentation because this would break if we ever
	 * subsequently wrote to the leader tape.
	 *
	 * Working backwards like this keeps our options open.  If shared BufFiles
	 * ever support being written to post-export, logtape.c can automatically
	 * take advantage of that.  We'd then support writing to the leader tape
	 * while recycling space from worker tapes, because the leader tape has a
	 * zero offset (write routines won't need to have extra logic to apply an
	 * offset).
	 *
	 * The only thing that currently prevents writing to the leader tape from
	 * working is the fact that BufFiles opened using BufFileOpenShared() are
	 * read-only by definition, but that could be changed if it seemed
	 * worthwhile.  For now, writing to the leader tape will raise a "Bad file
	 * descriptor" error, so tuplesort must avoid writing to the leader tape
	 * altogether.
	 */
	lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
}

/*
 * Initialize per-tape struct.  Note we allocate the I/O buffer lazily.
 */
static void
ltsInitTape(LogicalTape *lt)
{
	lt->writing = true;
	lt->frozen = false;
	lt->dirty = false;
	lt->firstBlockNumber = -1L;
	lt->curBlockNumber = -1L;
	lt->nextBlockNumber = -1L;
	lt->offsetBlockNumber = 0L;
	lt->buffer = NULL;
	lt->buffer_size = 0;
	/* palloc() larger than MaxAllocSize would fail */
	lt->max_size = MaxAllocSize;
	lt->pos = 0;
	lt->nbytes = 0;
	lt->prealloc = NULL;
	lt->nprealloc = 0;
	lt->prealloc_size = 0;
}

/*
 * Lazily allocate and initialize the read buffer. This avoids waste when many
 * tapes are open at once, but not all are active between rewinding and
 * reading.
 */
static void
ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
{
	Assert(lt->buffer_size > 0);
	lt->buffer = palloc(lt->buffer_size);

	/* Read the first block, or reset if tape is empty */
	lt->nextBlockNumber = lt->firstBlockNumber;
	lt->pos = 0;
	lt->nbytes = 0;
	ltsReadFillBuffer(lts, lt);
}

/*
 * Create a set of logical tapes in a temporary underlying file.
 *
 * Each tape is initialized in write state.  Serial callers pass ntapes,
 * NULL argument for shared, and -1 for worker.  Parallel worker callers
 * pass ntapes, a shared file handle, NULL shared argument,  and their own
 * worker number.  Leader callers, which claim shared worker tapes here,
 * must supply non-sentinel values for all arguments except worker number,
 * which should be -1.
 *
 * Leader caller is passing back an array of metadata each worker captured
 * when LogicalTapeFreeze() was called for their final result tapes.  Passed
 * tapes array is actually sized ntapes - 1, because it includes only
 * worker tapes, whereas leader requires its own leader tape.  Note that we
 * rely on the assumption that reclaimed worker tapes will only be read
 * from once by leader, and never written to again (tapes are initialized
 * for writing, but that's only to be consistent).  Leader may not write to
 * its own tape purely due to a restriction in the shared buffile
 * infrastructure that may be lifted in the future.
 */
LogicalTapeSet *
LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset,
					 int worker)
{
	LogicalTapeSet *lts;
	int			i;

	/*
	 * Create top-level struct including per-tape LogicalTape structs.
	 */
	Assert(ntapes > 0);
	lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
	lts->nBlocksAllocated = 0L;
	lts->nBlocksWritten = 0L;
	lts->nHoleBlocks = 0L;
	lts->forgetFreeSpace = false;
	lts->freeBlocksLen = 32;	/* reasonable initial guess */
	lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
	lts->nFreeBlocks = 0;
	lts->nTapes = ntapes;
	lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape));

	for (i = 0; i < ntapes; i++)
		ltsInitTape(&lts->tapes[i]);

	/*
	 * Create temp BufFile storage as required.
	 *
	 * Leader concatenates worker tapes, which requires special adjustment to
	 * final tapeset data.  Things are simpler for the worker case and the
	 * serial case, though.  They are generally very similar -- workers use a
	 * shared fileset, whereas serial sorts use a conventional serial BufFile.
	 */
	if (shared)
		ltsConcatWorkerTapes(lts, shared, fileset);
	else if (fileset)
	{
		char		filename[MAXPGPATH];
		workfile_set *work_set;

		pg_itoa(worker, filename);
		work_set = workfile_mgr_create_set("LogicalTape", filename, false /* hold pin */);
		lts->pfile = BufFileCreateShared(fileset, filename, work_set);
	}
	else
	{
		lts->pfile = BufFileCreateTemp("LogicalTape", false);
	}

	return lts;
}

/*
 * Close a logical tape set and release all resources.
 */
void
LogicalTapeSetClose(LogicalTapeSet *lts)
{
	LogicalTape *lt;
	int			i;

	BufFileClose(lts->pfile);
	for (i = 0; i < lts->nTapes; i++)
	{
		lt = &lts->tapes[i];
		if (lt->buffer)
			pfree(lt->buffer);
	}
	pfree(lts->tapes);
	pfree(lts->freeBlocks);
	pfree(lts);
}

/*
 * Mark a logical tape set as not needing management of free space anymore.
 *
 * This should be called if the caller does not intend to write any more data
 * into the tape set, but is reading from un-frozen tapes.  Since no more
 * writes are planned, remembering free blocks is no longer useful.  Setting
 * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
 * is not designed to handle large numbers of free blocks.
 */
void
LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
{
	lts->forgetFreeSpace = true;
}

/*
 * Write to a logical tape.
 *
 * There are no error returns; we ereport() on failure.
 */
void
LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
				 void *ptr, size_t size)
{
	LogicalTape *lt;
	size_t		nthistime;

	Assert(tapenum >= 0 && tapenum < lts->nTapes);
	lt = &lts->tapes[tapenum];
	Assert(lt->writing);
	Assert(lt->offsetBlockNumber == 0L);

	/* Allocate data buffer and first block on first write */
	if (lt->buffer == NULL)
	{
		lt->buffer = (char *) palloc(BLCKSZ);
		lt->buffer_size = BLCKSZ;
	}
	if (lt->curBlockNumber == -1)
	{
		Assert(lt->firstBlockNumber == -1);
		Assert(lt->pos == 0);

		lt->curBlockNumber = ltsGetPreallocBlock(lts, lt);
		lt->firstBlockNumber = lt->curBlockNumber;

		TapeBlockGetTrailer(lt->buffer)->prev = -1L;
	}

	Assert(lt->buffer_size == BLCKSZ);
	while (size > 0)
	{
		if (lt->pos >= (int) TapeBlockPayloadSize)
		{
			/* Buffer full, dump it out */
			long		nextBlockNumber;

			if (!lt->dirty)
			{
				/* Hmm, went directly from reading to writing? */
				elog(ERROR, "invalid logtape state: should be dirty");
			}

			/*
			 * First allocate the next block, so that we can store it in the
			 * 'next' pointer of this block.
			 */
			nextBlockNumber = ltsGetPreallocBlock(lts, lt);

			/* set the next-pointer and dump the current block. */
			TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
			ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);

			/* initialize the prev-pointer of the next block */
			TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
			lt->curBlockNumber = nextBlockNumber;
			lt->pos = 0;
			lt->nbytes = 0;
		}

		nthistime = TapeBlockPayloadSize - lt->pos;
		if (nthistime > size)
			nthistime = size;
		Assert(nthistime > 0);

		memcpy(lt->buffer + lt->pos, ptr, nthistime);

		lt->dirty = true;
		lt->pos += nthistime;
		if (lt->nbytes < lt->pos)
			lt->nbytes = lt->pos;
		ptr = (void *) ((char *) ptr + nthistime);
		size -= nthistime;
	}
}

/*
 * Rewind logical tape and switch from writing to reading.
 *
 * The tape must currently be in writing state, or "frozen" in read state.
 *
 * 'buffer_size' specifies how much memory to use for the read buffer.
 * Regardless of the argument, the actual amount of memory used is between
 * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ.  The given value is
 * rounded down and truncated to fit those constraints, if necessary.  If the
 * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
 * byte buffer is used.
 */
void
LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
{
	LogicalTape *lt;

	Assert(tapenum >= 0 && tapenum < lts->nTapes);
	lt = &lts->tapes[tapenum];

	/*
	 * Round and cap buffer_size if needed.
	 */
	if (lt->frozen)
		buffer_size = BLCKSZ;
	else
	{
		/* need at least one block */
		if (buffer_size < BLCKSZ)
			buffer_size = BLCKSZ;

		/* palloc() larger than max_size is unlikely to be helpful */
		if (buffer_size > lt->max_size)
			buffer_size = lt->max_size;

		/* round down to BLCKSZ boundary */
		buffer_size -= buffer_size % BLCKSZ;
	}

	if (lt->writing)
	{
		/*
		 * Completion of a write phase.  Flush last partial data block, and
		 * rewind for normal (destructive) read.
		 */
		if (lt->dirty)
		{
			/*
			 * As long as we've filled the buffer at least once, its contents
			 * are entirely defined from valgrind's point of view, even though
			 * contents beyond the current end point may be stale.  But it's
			 * possible - at least in the case of a parallel sort - to sort
			 * such small amount of data that we do not fill the buffer even
			 * once.  Tell valgrind that its contents are defined, so it
			 * doesn't bleat.
			 */
			VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
									  lt->buffer_size - lt->nbytes);

			TapeBlockSetNBytes(lt->buffer, lt->nbytes);
			ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
		}
		lt->writing = false;
	}
	else
	{
		/*
		 * This is only OK if tape is frozen; we rewind for (another) read
		 * pass.
		 */
		Assert(lt->frozen);
	}

	if (lt->buffer)
		pfree(lt->buffer);

	/* the buffer is lazily allocated, but set the size here */
	lt->buffer = NULL;
	lt->buffer_size = buffer_size;

	/* free the preallocation list, and return unused block numbers */
	if (lt->prealloc != NULL)
	{
		for (int i = lt->nprealloc; i > 0; i--)
			ltsReleaseBlock(lts, lt->prealloc[i - 1]);
		pfree(lt->prealloc);
		lt->prealloc = NULL;
		lt->nprealloc = 0;
		lt->prealloc_size = 0;
	}
}

/*
 * Rewind logical tape and switch from reading to writing.
 *
 * NOTE: we assume the caller has read the tape to the end; otherwise
 * untouched data will not have been freed. We could add more code to free
 * any unread blocks, but in current usage of this module it'd be useless
 * code.
 */
void
LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
{
	LogicalTape *lt;

	Assert(tapenum >= 0 && tapenum < lts->nTapes);
	lt = &lts->tapes[tapenum];

	Assert(!lt->writing && !lt->frozen);
	lt->writing = true;
	lt->dirty = false;
	lt->firstBlockNumber = -1L;
	lt->curBlockNumber = -1L;
	lt->pos = 0;
	lt->nbytes = 0;
	if (lt->buffer)
		pfree(lt->buffer);
	lt->buffer = NULL;
	lt->buffer_size = 0;
}

/*
 * Read from a logical tape.
 *
 * Early EOF is indicated by return value less than #bytes requested.
 */
size_t
LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
				void *ptr, size_t size)
{
	LogicalTape *lt;
	size_t		nread = 0;
	size_t		nthistime;

	Assert(tapenum >= 0 && tapenum < lts->nTapes);
	lt = &lts->tapes[tapenum];
	Assert(!lt->writing);

	if (lt->buffer == NULL)
		ltsInitReadBuffer(lts, lt);

	while (size > 0)
	{
		if (lt->pos >= lt->nbytes)
		{
			/* Try to load more data into buffer. */
			if (!ltsReadFillBuffer(lts, lt))
				break;			/* EOF */
		}

		nthistime = lt->nbytes - lt->pos;
		if (nthistime > size)
			nthistime = size;
		Assert(nthistime > 0);

		memcpy(ptr, lt->buffer + lt->pos, nthistime);

		lt->pos += nthistime;
		ptr = (void *) ((char *) ptr + nthistime);
		size -= nthistime;
		nread += nthistime;
	}

	return nread;
}

/*
 * "Freeze" the contents of a tape so that it can be read multiple times
 * and/or read backwards.  Once a tape is frozen, its contents will not
 * be released until the LogicalTapeSet is destroyed.  This is expected
 * to be used only for the final output pass of a merge.
 *
 * This *must* be called just at the end of a write pass, before the
 * tape is rewound (after rewind is too late!).  It performs a rewind
 * and switch to read mode "for free".  An immediately following rewind-
 * for-read call is OK but not necessary.
 *
 * share output argument is set with details of storage used for tape after
 * freezing, which may be passed to LogicalTapeSetCreate within leader
 * process later.  This metadata is only of interest to worker callers
 * freezing their final output for leader (single materialized tape).
 * Serial sorts should set share to NULL.
 */
void
LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
{
	LogicalTape *lt;

	Assert(tapenum >= 0 && tapenum < lts->nTapes);
	lt = &lts->tapes[tapenum];
	Assert(lt->writing);
	Assert(lt->offsetBlockNumber == 0L);

	/*
	 * Completion of a write phase.  Flush last partial data block, and rewind
	 * for nondestructive read.
	 */
	if (lt->dirty)
	{
		/*
		 * As long as we've filled the buffer at least once, its contents are
		 * entirely defined from valgrind's point of view, even though
		 * contents beyond the current end point may be stale.  But it's
		 * possible - at least in the case of a parallel sort - to sort such
		 * small amount of data that we do not fill the buffer even once. Tell
		 * valgrind that its contents are defined, so it doesn't bleat.
		 */
		VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
								  lt->buffer_size - lt->nbytes);

		TapeBlockSetNBytes(lt->buffer, lt->nbytes);
		ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
		lt->writing = false;
	}
	lt->writing = false;
	lt->frozen = true;

	/*
	 * The seek and backspace functions assume a single block read buffer.
	 * That's OK with current usage.  A larger buffer is helpful to make the
	 * read pattern of the backing file look more sequential to the OS, when
	 * we're reading from multiple tapes.  But at the end of a sort, when a
	 * tape is frozen, we only read from a single tape anyway.
	 */
	if (!lt->buffer || lt->buffer_size != BLCKSZ)
	{
		if (lt->buffer)
			pfree(lt->buffer);
		lt->buffer = palloc(BLCKSZ);
		lt->buffer_size = BLCKSZ;
	}

	/* Read the first block, or reset if tape is empty */
	lt->curBlockNumber = lt->firstBlockNumber;
	lt->pos = 0;
	lt->nbytes = 0;

	if (lt->firstBlockNumber == -1L)
		lt->nextBlockNumber = -1L;
	ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
	if (TapeBlockIsLast(lt->buffer))
		lt->nextBlockNumber = -1L;
	else
		lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
	lt->nbytes = TapeBlockGetNBytes(lt->buffer);

	/* Handle extra steps when caller is to share its tapeset */
	if (share)
	{
		BufFileExportShared(lts->pfile);
		share->firstblocknumber = lt->firstBlockNumber;
	}
}

/*
 * Add additional tapes to this tape set. Not intended to be used when any
 * tapes are frozen.
 */
void
LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
{
	int			i;
	int			nTapesOrig = lts->nTapes;

	lts->nTapes += nAdditional;

	lts->tapes = (LogicalTape *) repalloc(lts->tapes,
										  lts->nTapes * sizeof(LogicalTape));

	for (i = nTapesOrig; i < lts->nTapes; i++)
		ltsInitTape(&lts->tapes[i]);
}

/*
 * Backspace the tape a given number of bytes.  (We also support a more
 * general seek interface, see below.)
 *
 * *Only* a frozen-for-read tape can be backed up; we don't support
 * random access during write, and an unfrozen read tape may have
 * already discarded the desired data!
 *
 * Returns the number of bytes backed up.  It can be less than the
 * requested amount, if there isn't that much data before the current
 * position.  The tape is positioned to the beginning of the tape in
 * that case.
 */
size_t
LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
{
	LogicalTape *lt;
	size_t		seekpos = 0;

	Assert(tapenum >= 0 && tapenum < lts->nTapes);
	lt = &lts->tapes[tapenum];
	Assert(lt->frozen);
	Assert(lt->buffer_size == BLCKSZ);

	if (lt->buffer == NULL)
		ltsInitReadBuffer(lts, lt);

	/*
	 * Easy case for seek within current block.
	 */
	if (size <= (size_t) lt->pos)
	{
		lt->pos -= (int) size;
		return size;
	}

	/*
	 * Not-so-easy case, have to walk back the chain of blocks.  This
	 * implementation would be pretty inefficient for long seeks, but we
	 * really aren't doing that (a seek over one tuple is typical).
	 */
	seekpos = (size_t) lt->pos; /* part within this block */
	while (size > seekpos)
	{
		long		prev = TapeBlockGetTrailer(lt->buffer)->prev;

		if (prev == -1L)
		{
			/* Tried to back up beyond the beginning of tape. */
			if (lt->curBlockNumber != lt->firstBlockNumber)
				elog(ERROR, "unexpected end of tape");
			lt->pos = 0;
			return seekpos;
		}

		ltsReadBlock(lts, prev, (void *) lt->buffer);

		if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
			elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
				 prev,
				 TapeBlockGetTrailer(lt->buffer)->next,
				 lt->curBlockNumber);

		lt->nbytes = TapeBlockPayloadSize;
		lt->curBlockNumber = prev;
		lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;

		seekpos += TapeBlockPayloadSize;
	}

	/*
	 * 'seekpos' can now be greater than 'size', because it points to the
	 * beginning the target block.  The difference is the position within the
	 * page.
	 */
	lt->pos = seekpos - size;
	return size;
}

/*
 * Seek to an arbitrary position in a logical tape.
 *
 * *Only* a frozen-for-read tape can be seeked.
 *
 * Must be called with a block/offset previously returned by
 * LogicalTapeTell().
 */
void
LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
				long blocknum, int offset)
{
	LogicalTape *lt;

	Assert(tapenum >= 0 && tapenum < lts->nTapes);
	lt = &lts->tapes[tapenum];
	Assert(lt->frozen);
	Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
	Assert(lt->buffer_size == BLCKSZ);

	if (lt->buffer == NULL)
		ltsInitReadBuffer(lts, lt);

	if (blocknum != lt->curBlockNumber)
	{
		ltsReadBlock(lts, blocknum, (void *) lt->buffer);
		lt->curBlockNumber = blocknum;
		lt->nbytes = TapeBlockPayloadSize;
		lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
	}

	if (offset > lt->nbytes)
		elog(ERROR, "invalid tape seek position");
	lt->pos = offset;
}

/*
 * Obtain current position in a form suitable for a later LogicalTapeSeek.
 *
 * NOTE: it'd be OK to do this during write phase with intention of using
 * the position for a seek after freezing.  Not clear if anyone needs that.
 */
void
LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
				long *blocknum, int *offset)
{
	LogicalTape *lt;

	Assert(tapenum >= 0 && tapenum < lts->nTapes);
	lt = &lts->tapes[tapenum];

	if (lt->buffer == NULL)
		ltsInitReadBuffer(lts, lt);

	Assert(lt->offsetBlockNumber == 0L);

	/* With a larger buffer, 'pos' wouldn't be the same as offset within page */
	Assert(lt->buffer_size == BLCKSZ);

	*blocknum = lt->curBlockNumber;
	*offset = lt->pos;
}

/*
 * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
 */
long
LogicalTapeSetBlocks(LogicalTapeSet *lts)
{
	return lts->nBlocksAllocated - lts->nHoleBlocks;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn sharedtuplestore 源码

greenplumn sortsupport 源码

greenplumn tuplesort 源码

greenplumn tuplestore 源码

0  赞