greenplumn vmem_tracker 源码

  • 2022-08-18
  • 浏览 (89)

greenplumn vmem_tracker 代码

文件路径:/src/backend/utils/mmgr/vmem_tracker.c

/*-------------------------------------------------------------------------
 *
 * vmem_tracker.c
 *	 Implementation of vmem tracking that provides a mechanism to enforcing
 *	 a limit on the total memory consumption per-segment. The vmem tracker
 *	 also collaborates with red zone handler and runaway cleaner to determine
 *	 sessions that consume excessive vmem and cleans up such sessions by forcing
 *	 them to release their memory.
 *
 * Copyright (c) 2014-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/utils/mmgr/vmem_tracker.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include <math.h>

#include "cdb/cdbvars.h"
#include "miscadmin.h"
#include "port/atomics.h"
#include "utils/guc.h"
#include "utils/vmem_tracker.h"
#include "utils/resource_manager.h"
#include "utils/session_state.h"

/* External dependencies within the runaway cleanup framework */
extern void EventVersion_ShmemInit(void);
extern void RedZoneHandler_ShmemInit();
extern void IdleTracker_ShmemInit(void);
extern void RunawayCleaner_Init(void);
extern void IdleTracker_Init(void);
extern void IdleTracker_Shutdown(void);

/* Number of bits in one MB of memory */
#define SHMEM_AVAILABLE_VMEM "available vmem on the segment"

#define CHUNKS_TO_MB(chunks) ((chunks) << (VmemTracker_GetChunkSizeInBits() - BITS_IN_MB))
#define MB_TO_CHUNKS(mb) ((mb) >> (VmemTracker_GetChunkSizeInBits() - BITS_IN_MB))
#define CHUNKS_TO_BYTES(chunks) (((int64)chunks) << VmemTracker_GetChunkSizeInBits())
#define BYTES_TO_CHUNKS(bytes) ((bytes) >> VmemTracker_GetChunkSizeInBits())
#define BYTES_TO_MB(bytes) ((bytes) >> BITS_IN_MB)

/* Number of Vmem chunks tracked by this process */
static int32 trackedVmemChunks = 0;
/* Maximum number of vmem chunks tracked by this process */
static int32 maxVmemChunksTracked = 0;
/* Number of bytes tracked (i.e., allocated under the tutelage of vmem tracker) */
static int64 trackedBytes = 0;

static int64 startupBytes = 0;
static int32 startupChunks = 0;

/* Vmem quota in chunk unit */
static int32 vmemChunksQuota = 0;
/*
 * Chunk size in bits. By default a chunk is 1MB, but it can be larger
 * depending on the vmem quota.
 */
static int chunkSizeInBits = BITS_IN_MB;
/* Is vmem tracker was initialized? If not, then we don't track. */
bool vmemTrackerInited = false;

/*
 * A derived parameter from gp_vmem_limit_per_query in chunks unit,
 * considering the current chunk size.
 */
static int32 maxChunksPerQuery = 0;

/*
 * How many chunks are currently waived from checking (e.g., for error
 * handling)
 */
static int32 waivedChunks = 0;

/*
 * Consumed vmem on the segment.
 */
volatile int32 *segmentVmemChunks = NULL;

static void ReleaseAllVmemChunks(void);
static int32 VmemTracker_GetMaxChunksPerQuery(void);

/*
 * Initializes the shared memory states of the vmem tracker. This
 * will also initialize the shared memory states of event version
 * provider, red zone handler and idle tracker.
 */
void
VmemTracker_ShmemInit()
{
	Assert(!vmemTrackerInited);
	trackedVmemChunks = 0;
	maxVmemChunksTracked = 0;
	trackedBytes = 0;

	bool		alreadyInShmem = false;

	segmentVmemChunks = (int32 *)
								ShmemInitStruct(SHMEM_AVAILABLE_VMEM,
										sizeof(int32),
										&alreadyInShmem);
	Assert(alreadyInShmem || !IsUnderPostmaster);

	Assert(NULL != segmentVmemChunks);

	if(!IsUnderPostmaster)
	{
		chunkSizeInBits = BITS_IN_MB;

		vmemChunksQuota = gp_vmem_protect_limit;
		/*
		 * If vmem is larger than 16GB (i.e., 16K MB), we make the chunks bigger
		 * so that the vmem limit in chunks unit is not larger than 16K.
		 */
		while(vmemChunksQuota > (16 * 1024))
		{
			chunkSizeInBits++;
			vmemChunksQuota >>= 1;
		}

		/*
		 * gp_vmem_limit_per_query is in kB. So, first convert it to MB, and then shift it
		 * to adjust for cases where we enlarged our chunk size
		 */
		maxChunksPerQuery = ceil(gp_vmem_limit_per_query / (1024.0 * (1 << (chunkSizeInBits - BITS_IN_MB))));

		/* Initialize the sub-systems */
		EventVersion_ShmemInit();
		RedZoneHandler_ShmemInit();
		IdleTracker_ShmemInit();

		*segmentVmemChunks = 0;
	}
}

/*
 * Initializes the vmem tracker per-process states. This will also initialize
 * the per-process states of runaway cleaner and idle tracker.
 */
void
VmemTracker_Init()
{
	Assert(NULL != MySessionState);
	Assert(!vmemTrackerInited);
	Assert(trackedVmemChunks == 0);
	Assert(maxVmemChunksTracked == 0);
	Assert(trackedBytes == 0);

	/*
	 * Even though asserts have passed, make sure that in production system
	 * we still have 0 values in our counters.
	 */
	trackedVmemChunks = 0;
	maxVmemChunksTracked = 0;
	trackedBytes = 0;

	Assert(0 < vmemChunksQuota || Gp_role != GP_ROLE_EXECUTE);
	Assert(gp_vmem_limit_per_query == 0 || (maxChunksPerQuery != 0 && maxChunksPerQuery < gp_vmem_limit_per_query));

	Assert(NULL != segmentVmemChunks);

	IdleTracker_Init();
	RunawayCleaner_Init();

	vmemTrackerInited = true;
}

/*
 * Disables vmem tracking and releases all the memory that this
 * (i.e., current process) tracker is tracking. This will also
 * shutdown the idle tracker.
 */
void
VmemTracker_Shutdown()
{
	vmemTrackerInited = false;

	ReleaseAllVmemChunks();
	Assert(*segmentVmemChunks >= 0);

	IdleTracker_Shutdown();
}

/*
 * Resets the maximum reserved vmem to the current reserved vmem
 */
void
VmemTracker_ResetMaxVmemReserved()
{
	maxVmemChunksTracked = trackedVmemChunks;
}

/*
 * Reserve 'num_chunks_to_reserve' number of chunks for current process. The
 * reservation is validated against segment level vmem quota.
 */
static MemoryAllocationStatus
VmemTracker_ReserveVmemChunks(int32 numChunksToReserve)
{
	Assert(vmemTrackerInited);
	Assert(NULL != MySessionState);

	Assert(0 <= numChunksToReserve);
	int32 total = pg_atomic_add_fetch_u32((pg_atomic_uint32 *)&MySessionState->sessionVmem, numChunksToReserve);
	Assert(total > (int32) 0);


	/* We don't support vmem usage from non-owner thread */
	Assert(MemoryProtection_IsOwnerThread());

	bool waiverUsed = false;

	if (!ResGroupReserveMemory(numChunksToReserve, waivedChunks, &waiverUsed))
	{
		pg_atomic_sub_fetch_u32((pg_atomic_uint32 *)&MySessionState->sessionVmem, numChunksToReserve);
		return MemoryFailure_ResourceGroupMemoryExhausted;
	}

	/*
	 * Query vmem quota exhausted, so rollback the reservation and return error.
	 * For non-QE processes and processes in critical section, we don't enforce
	 * VMEM, but we do track the usage.
	 */
	int32 memLimitPerQuery = VmemTracker_GetMaxChunksPerQuery();

	if (memLimitPerQuery != 0 && total > memLimitPerQuery &&
			Gp_role == GP_ROLE_EXECUTE && CritSectionCount == 0)
	{
		if (total > memLimitPerQuery + waivedChunks)
		{
			/* Revert the reserved space, but don't revert the prev_alloc as we have already set the firstTime to false */
			pg_atomic_sub_fetch_u32((pg_atomic_uint32 *)&MySessionState->sessionVmem, numChunksToReserve);
			/* Revert resgroup memory reservation */
			ResGroupReleaseMemory(numChunksToReserve);
			return MemoryFailure_QueryMemoryExhausted;
		}
		waiverUsed = true;
	}

	/* Now reserve vmem at segment level */
	int32 new_vmem = pg_atomic_add_fetch_u32((pg_atomic_uint32 *)segmentVmemChunks, numChunksToReserve);
	int32 vmemLimitChunks = VmemTracker_GetVmemLimitChunks();

	/*
	 * If segment vmem is exhausted, rollback query level reservation. For non-QE
	 * processes and processes in critical section, we don't enforce VMEM, but we
	 * do track the usage.
	 */
	if (new_vmem > vmemLimitChunks &&
			Gp_role == GP_ROLE_EXECUTE && CritSectionCount == 0)
	{
		if (new_vmem > vmemLimitChunks + waivedChunks)
		{
			/* Revert query memory reservation */
			pg_atomic_sub_fetch_u32((pg_atomic_uint32 *)&MySessionState->sessionVmem, numChunksToReserve);
			/* Revert vmem reservation */
			pg_atomic_sub_fetch_u32((pg_atomic_uint32 *)segmentVmemChunks, numChunksToReserve);
			/* Revert resgroup memory reservation */
			ResGroupReleaseMemory(numChunksToReserve);

			return MemoryFailure_VmemExhausted;
		}
		waiverUsed = true;
	}

	/* The current process now owns additional vmem in this segment */
	trackedVmemChunks += numChunksToReserve;

	maxVmemChunksTracked = Max(maxVmemChunksTracked, trackedVmemChunks);

	if (waivedChunks > 0 && !waiverUsed)
	{
		/*
		 * We have sufficient free memory that we are no longer using the waiver.
		 * Therefore reset the waiver.
		 */
		waivedChunks = 0;
	}

	return MemoryAllocation_Success;
}

/*
 * Releases "reduction" number of chunks to the session and segment vmem counter.
 */
static void
VmemTracker_ReleaseVmemChunks(int reduction)
{
	Assert(0 <= reduction);
	/* We don't support vmem usage from non-owner thread */
	Assert(MemoryProtection_IsOwnerThread());

	pg_atomic_sub_fetch_u32((pg_atomic_uint32 *) segmentVmemChunks, reduction);

	Assert(*segmentVmemChunks >= 0);
	Assert(NULL != MySessionState);
	pg_atomic_sub_fetch_u32((pg_atomic_uint32 *)&MySessionState->sessionVmem, reduction);
	ResGroupReleaseMemory(reduction);
	Assert(0 <= MySessionState->sessionVmem);
	trackedVmemChunks -= reduction;
}

/*
 * Releases all vmem reserved by this process.
 */
static void
ReleaseAllVmemChunks()
{
	VmemTracker_ReleaseVmemChunks(trackedVmemChunks);
	Assert(0 == trackedVmemChunks);
	trackedBytes = 0;
}

/*
 * Returns the available VMEM in "chunks" unit. If the available chunks
 * is less than 0, it return 0.
 */
static int32
VmemTracker_GetNonNegativeAvailableVmemChunks()
{
	int32 usedChunks = *segmentVmemChunks;
	int32 vmemLimitChunks = VmemTracker_GetVmemLimitChunks();
	if (vmemLimitChunks > usedChunks)
	{
		return vmemLimitChunks - usedChunks;
	}
	else
	{
		return 0;
	}
}

/*
 * Returns the available query chunks. If the available chunks
 * is less than 0, it return 0.
 */
static int32
VmemTracker_GetNonNegativeAvailableQueryChunks()
{
	int32 curSessionVmem = MySessionState->sessionVmem;
	int32 memLimitPerQuery = VmemTracker_GetMaxChunksPerQuery();
	if (memLimitPerQuery > curSessionVmem)
	{
		return memLimitPerQuery - curSessionVmem;
	}
	else
	{
		return 0;
	}
}

/* Converts chunks to MB */
int32
VmemTracker_ConvertVmemChunksToMB(int chunks)
{
	return CHUNKS_TO_MB(chunks);
}

/* Converts MB to chunks */
int32
VmemTracker_ConvertVmemMBToChunks(int mb)
{
	return MB_TO_CHUNKS(mb);
}

/* Converts chunks to bytes */
int64
VmemTracker_ConvertVmemChunksToBytes(int chunks)
{
	return CHUNKS_TO_BYTES(chunks);
}

/* Converts bytes to chunks */
int32
VmemTracker_ConvertVmemBytesToChunks(int64 bytes)
{
	return BYTES_TO_CHUNKS(bytes);
}

/*
 * Returns the maximum vmem consumed by current process in "chunks" unit.
 */
int64
VmemTracker_GetMaxReservedVmemChunks(void)
{
	Assert(maxVmemChunksTracked >= trackedVmemChunks - startupChunks);
	return maxVmemChunksTracked;
}

/*
 * Returns the maximum vmem consumed by current process in "MB" unit.
 */
int64
VmemTracker_GetMaxReservedVmemMB(void)
{
	Assert(maxVmemChunksTracked >= trackedVmemChunks - startupChunks);
	return CHUNKS_TO_MB(maxVmemChunksTracked);
}

/*
 * Returns the maximum vmem consumed by current process in "bytes" unit.
 */
int64
VmemTracker_GetMaxReservedVmemBytes(void)
{
	Assert(maxVmemChunksTracked >= trackedVmemChunks - startupChunks);
	return CHUNKS_TO_BYTES(maxVmemChunksTracked);
}

/*
 * Returns the vmem limit in "bytes" unit.
 */
int64
VmemTracker_GetVmemLimitBytes(void)
{
	return CHUNKS_TO_BYTES(VmemTracker_GetVmemLimitChunks());
}

/*
 * Returns the vmem limit in "chunks" unit.
 */
int32
VmemTracker_GetVmemLimitChunks(void)
{
	/*
	 * TODO:
	 * For backend who has vmem tracker initialized and resource
	 * group enabled, the vmem limit is not expected to be used
	 * until resource group is activated, otherwise, there might
	 * be an inconsistency about the vmem limit.
	 */
	return IsResGroupEnabled() ?
		ResGroupGetVmemLimitChunks() : vmemChunksQuota;
}

/*
 * Return the chunk size in bits.
 */
int32
VmemTracker_GetChunkSizeInBits(void)
{
	/*
	 * TODO:
	 * For backend who has vmem tracker initialized and resource
	 * group enabled, the chunk size is not expected to be used
	 * until resource group is activated, otherwise, there might
	 * be an inconsistency about the chunk size.
	 */
	return IsResGroupEnabled() ?
		ResGroupGetVmemChunkSizeInBits() : chunkSizeInBits;
}

/*
 * Returns the vmem  in "chunks" unit.
 */
static int32
VmemTracker_GetMaxChunksPerQuery(void)
{
	return IsResGroupEnabled() ?
		ResGroupGetMaxChunksPerQuery() : maxChunksPerQuery;
}

/*
 * Returns the vmem usage of current process in "chunks" unit.
 */
int32
VmemTracker_GetReservedVmemChunks(void)
{
	return trackedVmemChunks;
}

/*
 * Returns the vmem usage of current process in "bytes" unit.
 */
int64
VmemTracker_GetReservedVmemBytes(void)
{
	return CHUNKS_TO_BYTES(trackedVmemChunks);
}

/*
 * Returns the available VMEM in "bytes" unit
 */
int64
VmemTracker_GetAvailableVmemBytes()
{
	return CHUNKS_TO_BYTES(VmemTracker_GetNonNegativeAvailableVmemChunks());
}

/*
 * Returns the available VMEM in "MB" unit
 */
int32
VmemTracker_GetAvailableVmemMB()
{
	return CHUNKS_TO_MB(VmemTracker_GetNonNegativeAvailableVmemChunks());
}

/*
 * Returns the available per-query VMEM in "MB" unit
 */
int32
VmemTracker_GetAvailableQueryVmemMB()
{
	return CHUNKS_TO_MB(VmemTracker_GetNonNegativeAvailableQueryChunks());
}

/*
 * Reserve newly_requested bytes from the vmem system.
 *
 * For performance reason, this method only reserves in chunk units and if the new
 * request can be met from previous chunk reservation, it does not try to reserve a new
 * chunk.
 */
MemoryAllocationStatus
VmemTracker_ReserveVmem(int64 newlyRequestedBytes)
{
	if (!VmemTrackerIsActivated())
	{
		Assert(0 == trackedVmemChunks - startupChunks);
		return MemoryAllocation_Success;
	}

	Assert(gp_mp_inited);
	Assert(newlyRequestedBytes >= 0);

	trackedBytes += newlyRequestedBytes;

	int32 newszChunk = trackedBytes >> VmemTracker_GetChunkSizeInBits();

	MemoryAllocationStatus status = MemoryAllocation_Success;

	if(newszChunk > trackedVmemChunks)
	{
		/*
		 * Undo trackedBytes, as the VmemTracker_TerminateRunawayQuery() may
		 * not return
		 */
		trackedBytes -= newlyRequestedBytes;

		/*
		 * Detect a runaway session. Moreover, if the current session is deemed
		 * as runaway, start cleanup.
		 *
		 * Caution: this method may not return as it has the potential to call
		 * elog(ERROR, ...).
		 */
		RedZoneHandler_DetectRunawaySession();

		/*
		 * Before reserving further VMEM, check if the current session has a pending
		 * query cancellation or other pending interrupts. This ensures more responsive
		 * interrupt processing, including query cancellation requests without depending
		 * on CHECK_FOR_INTERRUPTS(). In a sense, this is a lightweight CHECK_FOR_INTERRUPTS
		 * as we don't execute BackoffBackendTick() and some runaway detection code.
		 */
		if (vmem_process_interrupt && InterruptPending)
		{
			/* ProcessInterrupts should check for InterruptHoldoffCount and CritSectionCount */
			ProcessInterrupts(__FILE__, __LINE__);
		}

		/*
		 * Redo, as we returned from VmemTracker_TerminateRunawayQuery and
		 * we are successfully reserving this vmem
		 */
		trackedBytes += newlyRequestedBytes;

		/*
		 * Before attempting to reserve vmem, we check if there was any OOM
		 * situation, and report our consumption if there was any. This accurately
		 * tells us our share of fault in an OOM situation.
		 */
		ReportOOMConsumption();

		int32 needChunk = newszChunk - trackedVmemChunks;
		status = VmemTracker_ReserveVmemChunks(needChunk);
	}

	/* Failed to reserve vmem chunks. Revert changes to trackedBytes */
	if (MemoryAllocation_Success != status)
	{
		trackedBytes -= newlyRequestedBytes;
	}

	return status;
}

/*
 * Releases toBeFreedRequested bytes from the vmem system.
 *
 * For performance reason this method accumulates free requests until it has
 * enough bytes to free a whole chunk.
 */
void
VmemTracker_ReleaseVmem(int64 toBeFreedRequested)
{
	if (!vmemTrackerInited ||
	   (IsResGroupEnabled() &&
		!IsResGroupActivated()))
	{
		Assert(0 == trackedVmemChunks - startupChunks);
		return;
	}

	Assert(gp_mp_inited);

	/*
	 * We need this adjustment as GPDB may request to free more VMEM than it reserved, apparently
	 * because a bug somewhere that tries to release vmem for allocations made before the vmem
	 * system was initialized.
	 */
	int64 toBeFreed = Min(trackedBytes - startupBytes, toBeFreedRequested);
	if (0 == toBeFreed)
	{
		Assert(0 == trackedVmemChunks - startupChunks);
		return;
	}

	trackedBytes -= toBeFreed;

	int newszChunk = trackedBytes >> VmemTracker_GetChunkSizeInBits();

	if (newszChunk < trackedVmemChunks)
	{
		int reduction = trackedVmemChunks - newszChunk;

		VmemTracker_ReleaseVmemChunks(reduction);
	}
}

/*
 * Register the startup memory to vmem tracker.
 *
 * The startup memory will always be tracked, but an OOM error will be raised
 * if the memory usage exceeds the limits.
 */
MemoryAllocationStatus
VmemTracker_RegisterStartupMemory(int64 bytes)
{
	int64		chunkSizeInBytes;

	Assert(vmemTrackerInited);

	chunkSizeInBytes = 1L << VmemTracker_GetChunkSizeInBits();

	/* Align to chunks */
	startupChunks = BYTES_TO_CHUNKS(bytes + chunkSizeInBytes - 1);
	startupBytes = CHUNKS_TO_BYTES(startupChunks);

	/*
	 * Step 1, add the startup memory forcefully as these memory were already
	 * in-use before the call to this function.
	 *
	 * Do not add the startup memory to resource group as resource group only
	 * tracks memory allocated via vmem tracker.
	 */

	trackedBytes += startupBytes;
	trackedVmemChunks += startupChunks;

	if (MySessionState)
		pg_atomic_add_fetch_u32((pg_atomic_uint32 *) &MySessionState->sessionVmem,
								startupChunks);

	pg_atomic_add_fetch_u32((pg_atomic_uint32 *) segmentVmemChunks,
							startupChunks);

	/*
	 * Step 2, check if an OOM error should be raised by allocating 0 chunk.
	 */

	return VmemTracker_ReserveVmemChunks(0);
}

/*
 * Unregister the startup memory from vmem tracker.
 */
void
VmemTracker_UnregisterStartupMemory(void)
{
	Assert(vmemTrackerInited);

	pg_atomic_sub_fetch_u32((pg_atomic_uint32 *) segmentVmemChunks,
							startupChunks);

	if (MySessionState)
		pg_atomic_sub_fetch_u32((pg_atomic_uint32 *) &MySessionState->sessionVmem,
								startupChunks);

	trackedBytes -= startupBytes;
	trackedVmemChunks -= startupChunks;

	startupBytes = 0;
	startupChunks = 0;
}

/*
 * Request additional VMEM bytes beyond per-session or system vmem limit for
 * OOM error handling.
 *
 * Note, the waiver_bytes are converted to chunk unit, and ceiled. This
 * means asking for 1 byte would result in 1 chunk, which can be multiples
 * of 1 MB.
 *
 * This method does nothing if the previously requested waiver is at least
 * as large as the newly requested waiver.
 */
void
VmemTracker_RequestWaiver(int64 waiver_bytes)
{
	Assert(gp_mp_inited);
	int chunks = BYTES_TO_CHUNKS(waiver_bytes);

	/* Handle ceiling */
	if (waiver_bytes > CHUNKS_TO_BYTES(chunks))
	{
		chunks += 1;

		Assert(waiver_bytes < CHUNKS_TO_BYTES(chunks));
	}

	waivedChunks = Max(chunks, waivedChunks);
}

/*
 * Reset requested waiver to zero.
 */
void
VmemTracker_ResetWaiver(void)
{
	waivedChunks = 0;
}

bool
VmemTrackerIsActivated(void)
{
	/*
	 * If resource group is enabled, the segment vmem limit
	 * and chunk size are undetermined until resource group
	 * is activated, so before that, the vmem tracker should
	 * stay unactivated.
	 */
	if (IsResGroupEnabled())
	{
		return vmemTrackerInited &&
			IsResGroupActivated();	
	}
	else
	{
		return vmemTrackerInited;
	}
}

相关信息

greenplumn 源码目录

相关文章

greenplumn aset 源码

greenplumn dsa 源码

greenplumn event_version 源码

greenplumn freepage 源码

greenplumn generation 源码

greenplumn idle_tracker 源码

greenplumn mcxt 源码

greenplumn memdebug 源码

greenplumn memprot 源码

greenplumn mpool 源码

0  赞