greenplumn backoff 源码

  • 2022-08-18
  • 浏览 (259)

greenplumn backoff 代码

文件路径:/src/backend/postmaster/backoff.c

/*-------------------------------------------------------------------------
 *
 * backoff.c
 *	  Query Prioritization
 *
 * This file contains functions that implement the Query Prioritization
 * feature. Query prioritization is implemented by employing a
 * 'backing off' technique where each backend sleeps to let some other
 * backend use the CPU. A sweeper process identifies backends that are
 * making active progress and determines what the relative CPU usage
 * should be.
 *
 * BackoffBackendTick() - a CHECK_FOR_INTERRUPTS() call in a backend
 *						  leads to a backend 'tick'. If enough 'ticks'
 *						  elapse, then the backend considers a
 *						  backoff.
 * BackoffSweeper()		- workhorse for the sweeper process
 *
 * Portions Copyright (c) 2009-2010, Greenplum inc.
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/postmaster/backoff.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#ifndef HAVE_GETRUSAGE
#include "rusagestub.h"
#else
#include <sys/time.h>
#include <sys/resource.h>
#endif
#include <sys/time.h>
#include <signal.h>
#include <math.h>

#include "storage/ipc.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdispatchresult.h"
#include "libpq-fe.h"

#include "libpq/pqsignal.h"
#include "tcop/tcopprot.h"
#include "postmaster/bgworker.h"
#include "storage/pmsignal.h"	/* PostmasterIsAlive */
#include "storage/proc.h"
#include "catalog/pg_resourcetype.h"
#include "utils/builtins.h"
#include "utils/resource_manager.h"
#include "funcapi.h"
#include "access/xact.h"
#include "port/atomics.h"
#include "postmaster/backoff.h"
#include "pg_trace.h"
#include "pgstat.h"

extern bool gp_debug_resqueue_priority;

/* Enable for more debug info to be logged */
/* #define BACKOFF_DEBUG */

/**
 * Difference of two timevals in microsecs
 */
#define TIMEVAL_DIFF_USEC(b, a) ((double) (b.tv_sec - a.tv_sec) * 1000000.0 + (b.tv_usec - a.tv_usec))

/* In ms */
#define MIN_SLEEP_THRESHOLD  5000

/* In ms */
#define DEFAULT_SLEEP_TIME 100.0

/**
 * A statement id consists of a session id and command count.
 */
typedef struct StatementId
{
	int			sessionId;
	int			commandCount;
}	StatementId;

/* Invalid statement id */
static const struct StatementId InvalidStatementId = {0, 0};

/**
 * This is information that only the current backend ever needs to see.
 */
typedef struct BackoffBackendLocalEntry
{
	int			processId;		/* Process Id of backend */
	struct rusage startUsage;	/* Usage when current statement began. To
								 * account for caching of backends. */
	struct rusage lastUsage;	/* Usage statistics when backend process
								 * performed local backoff action */
	double		lastSleepTime;	/* Last sleep time when local backing-off
								 * action was performed */
	bool		inTick;			/* Is backend currently performing tick? - to
								 * prevent nested calls */
	bool		groupingTimeExpired;	/* Should backend try to find better
										 * leader? */
}	BackoffBackendLocalEntry;

/**
 * There is a backend entry for every backend with a valid backendid on the master and segments.
 */
typedef struct BackoffBackendSharedEntry
{
	struct StatementId statementId;		/* A statement Id. Can be invalid. */
	int			groupLeaderIndex;		/* Who is my leader? */
	int			groupSize;		/* How many in my group ? */
	int			numFollowers;	/* How many followers do I have? */

	/* These fields are written by backend and read by sweeper process */
	struct timeval lastCheckTime;		/* Last time the backend process
										 * performed local back-off action.
										 * Used to determine inactive
										 * backends. */

	/* These fields are written to by sweeper and read by backend */
	bool		backoff;		/* If set to false, then no backoff to be
								 * performed by this backend */
	double		targetUsage;	/* Current target CPU usage as calculated by
								 * sweeper */
	bool		earlyBackoffExit;		/* Sweeper asking backend to stop
										 * backing off */

	/* These fields are written to and read by sweeper */
	bool		isActive;		/* Sweeper marking backend as active based on
								 * lastCheckTime */
	int			numFollowersActive;		/* If backend is a leader, this
										 * represents number of followers that
										 * are active */

	/* These fields are wrtten by backend during init and by manual adjustment */
	int			weight;			/* Weight of the statement that this backend
								 * belongs to */

}	BackoffBackendSharedEntry;

/**
 * Local entry for backoff.
 */
static BackoffBackendLocalEntry myLocalEntry;

int backoffTickCounter = 0;

/**
 * This is the global state of the backoff mechanism. It is a singleton structure - one
 * per postmaster. It exists on master and segments. All backends with a valid backendid
 * and entry in the ProcArray have access to this information.
 */
typedef struct BackoffState
{
	BackoffBackendSharedEntry *backendEntries;	/* Indexed by backend ids */
	int			numEntries;

	bool		sweeperInProgress;		/* Is the sweeper process working? */
	int			lastTotalStatementWeight;		/* To keep track of total
												 * weight */
}	BackoffState;

/**
 * Pointer to singleton struct used by the backoff mechanism.
 */
BackoffState *backoffSingleton = NULL;

/* Statement-id related */

static inline void init(StatementId * s, int sessionId, int commandCount);
static inline void setInvalid(StatementId * s);
static inline bool isValid(const StatementId * s);
static inline bool equalStatementId(const StatementId * s1, const StatementId * s2);

/* Main accessor methods for backoff entries */
static inline const BackoffBackendSharedEntry *getBackoffEntryRO(int index);
static inline BackoffBackendSharedEntry *getBackoffEntryRW(int index);

/* Backend uses these */
static inline BackoffBackendLocalEntry *myBackoffLocalEntry(void);
static inline BackoffBackendSharedEntry *myBackoffSharedEntry(void);
static inline void SwitchGroupLeader(int newLeaderIndex);
static inline bool groupingTimeExpired(void);
static inline void findBetterGroupLeader(void);
static inline bool isGroupLeader(int index);
static inline void BackoffBackend(void);

/* Init and exit routines */
static void BackoffStateAtExit(int code, Datum arg);

/* Routines to access global state */
static inline double numProcsPerSegment(void);

/* Sweeper related routines */
static void BackoffSweeper(void);
static void BackoffSweeperLoop(void);
static volatile bool isSweeperProcess = false;

/* Resource queue related routines */
static int	BackoffPriorityValueToInt(const char *priorityVal);
extern List *GetResqueueCapabilityEntry(Oid queueid);

static int BackoffDefaultWeight(void);
static int BackoffSuperuserStatementWeight(void);

/*
 * Helper method that verifies setting of default priority guc.
 */
bool gpvars_check_gp_resqueue_priority_default_value(char **newval,
													void **extra,
													GucSource source);

/**
 * Primitives on statement id.
 */

static inline void
init(StatementId * s, int sessionId, int commandCount)
{
	Assert(s);
	s->sessionId = sessionId;
	s->commandCount = commandCount;
	return;
}

/**
 * Sets a statemend id to be invalid.
 */
static inline void
setInvalid(StatementId * s)
{
	init(s, InvalidStatementId.sessionId, InvalidStatementId.commandCount);
}

/**
 * Are two statement ids equal?
 */
static inline bool
equalStatementId(const StatementId * s1, const StatementId * s2)
{
	Assert(s1);
	Assert(s2);
	return ((s1->sessionId == s2->sessionId)
			&& (s1->commandCount == s2->commandCount));
}

/**
 * Is a StatementId valid?
 */
static inline bool
isValid(const StatementId * s)
{
	return !equalStatementId(s, &InvalidStatementId);
}

/**
 * Access to the local entry for this backend.
 */

static inline BackoffBackendLocalEntry *
myBackoffLocalEntry()
{
	return &myLocalEntry;
}

/**
 * Access to the shared entry for this backend.
 */
static inline BackoffBackendSharedEntry *
myBackoffSharedEntry()
{
	return getBackoffEntryRW(MyBackendId);
}

/**
 * A backend is a group leader if it is its own leader.
 */
static inline bool
isGroupLeader(int index)
{
	return (getBackoffEntryRO(index)->groupLeaderIndex == index);
}

/**
 * This method is used by a backend to switch the group leader. It is unique
 * in that it modifies the numFollowers field in its current group leader and new leader index.
 * The increments and decrements are done using atomic operations (else we may have race conditions
 * across processes). However, this code is not thread safe. We do not call these code in multi-threaded
 * situations.
 */
static inline void
SwitchGroupLeader(int newLeaderIndex)
{
	BackoffBackendSharedEntry *myEntry = myBackoffSharedEntry();
	BackoffBackendSharedEntry *oldLeaderEntry = NULL;
	BackoffBackendSharedEntry *newLeaderEntry = NULL;

	if (backoffSingleton->sweeperInProgress == true)
		return;

	Assert(newLeaderIndex < myEntry->groupLeaderIndex);
	Assert(newLeaderIndex >= 0 && newLeaderIndex < backoffSingleton->numEntries);

	oldLeaderEntry = &backoffSingleton->backendEntries[myEntry->groupLeaderIndex];
	newLeaderEntry = &backoffSingleton->backendEntries[newLeaderIndex];

	pg_atomic_sub_fetch_u32((pg_atomic_uint32 *) &oldLeaderEntry->numFollowers, 1);
	pg_atomic_add_fetch_u32((pg_atomic_uint32 *) &newLeaderEntry->numFollowers, 1);
	myEntry->groupLeaderIndex = newLeaderIndex;
}

/*
 * Should this backend stop finding a better leader? If the backend has spent enough time working
 * on the current statement (measured in elapsedTimeForStatement), it marks grouping time expired.
 */
static inline bool
groupingTimeExpired()
{
	BackoffBackendLocalEntry *le = myBackoffLocalEntry();

	if (le->groupingTimeExpired)
	{
		return true;
	}
	else
	{
		double		elapsedTimeForStatement =
		TIMEVAL_DIFF_USEC(le->lastUsage.ru_utime, le->startUsage.ru_utime)
		+ TIMEVAL_DIFF_USEC(le->lastUsage.ru_stime, le->startUsage.ru_stime);

		if (elapsedTimeForStatement > gp_resqueue_priority_grouping_timeout * 1000.0)
		{
			le->groupingTimeExpired = true;
			return true;
		}
		else
		{
			return false;
		}
	}
}

/**
 * Executed by a backend to find a better group leader (i.e. one with a lower index), if possible.
 * This is the only method that can write to groupLeaderIndex.
 */
static inline void
findBetterGroupLeader()
{
	int			leadersLeaderIndex = -1;
	BackoffBackendSharedEntry *myEntry = myBackoffSharedEntry();
	const BackoffBackendSharedEntry *leaderEntry = getBackoffEntryRO(myEntry->groupLeaderIndex);

	Assert(myEntry);
	leadersLeaderIndex = leaderEntry->groupLeaderIndex;

	if (backoffSingleton->sweeperInProgress == true)
		return;

	/* If my leader has a different leader, then jump pointer */
	if (myEntry->groupLeaderIndex != leadersLeaderIndex)
	{
		SwitchGroupLeader(leadersLeaderIndex);
	}
	else
	{
		int			i = 0;

		for (i = 0; i < myEntry->groupLeaderIndex; i++)
		{
			const BackoffBackendSharedEntry *other = getBackoffEntryRO(i);

			if (equalStatementId(&other->statementId, &myEntry->statementId))
			{
				/* Found a better leader! */
				break;
			}
		}
		if (i < myEntry->groupLeaderIndex)
		{
			SwitchGroupLeader(i);
		}
	}
	return;
}


/**
 * Read only access to a backend entry.
 */
static inline const BackoffBackendSharedEntry *
getBackoffEntryRO(int index)
{
	return (const BackoffBackendSharedEntry *) getBackoffEntryRW(index);
}

/**
 * Gives write access to a backend entry.
 */
static inline BackoffBackendSharedEntry *
getBackoffEntryRW(int index)
{
	Assert(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE || isSweeperProcess);
	Assert(index >= 0 && index < backoffSingleton->numEntries);
	return &backoffSingleton->backendEntries[index];
}


/**
 * This method is called by the backend when it begins working on a new statement.
 * This initializes the backend entry corresponding to this backend.
 * After initialization, the backend entry immediately finds its group leader, which
 * is the first backend entry that has the same statement id with itself.
 */
void
BackoffBackendEntryInit(int sessionid, int commandcount, Oid queueId)
{
	BackoffBackendSharedEntry *mySharedEntry = NULL;
	BackoffBackendLocalEntry *myLocalEntry = NULL;

	Assert(sessionid > -1);
	Assert(commandcount > -1);
	Assert(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE);
	Assert(!isSweeperProcess);

	/* Shared information */
	mySharedEntry = myBackoffSharedEntry();

	mySharedEntry->targetUsage = 1.0 / numProcsPerSegment();	/* Initially, do not
																 * perform any backoffs */
	mySharedEntry->isActive = false;
	mySharedEntry->backoff = true;
	mySharedEntry->earlyBackoffExit = false;

	if (gettimeofday(&mySharedEntry->lastCheckTime, NULL) < 0)
	{
		elog(ERROR, "Unable to execute gettimeofday(). Please disable query prioritization.");
	}

	mySharedEntry->groupLeaderIndex = MyBackendId;
	mySharedEntry->weight = ResourceQueueGetPriorityWeight(queueId);
	mySharedEntry->groupSize = 0;
	mySharedEntry->numFollowers = 1;

	/* this should happen last or the sweeper may pick up a non-complete entry */
	init(&mySharedEntry->statementId, sessionid, commandcount);
	Assert(isValid(&mySharedEntry->statementId));

	/* Local information */
	myLocalEntry = myBackoffLocalEntry();

	myLocalEntry->processId = MyProcPid;
	myLocalEntry->lastSleepTime = DEFAULT_SLEEP_TIME;
	myLocalEntry->groupingTimeExpired = false;
	if (getrusage(RUSAGE_SELF, &myLocalEntry->lastUsage) < 0)
	{
		elog(ERROR, "Unable to execute getrusage(). Please disable query prioritization.");
	}
	memcpy(&myLocalEntry->startUsage, &myLocalEntry->lastUsage, sizeof(myLocalEntry->lastUsage));
	myLocalEntry->inTick = false;

	/* Try to find a better leader for my group */
	findBetterGroupLeader();

	return;
}

/**
 * Accessing the number of procs per segment.
 */
static inline double
numProcsPerSegment()
{
	Assert(gp_enable_resqueue_priority);
	Assert(backoffSingleton);
	Assert(gp_resqueue_priority_cpucores_per_segment > 0);
	return gp_resqueue_priority_cpucores_per_segment;
}


/**
 * This method is called once in a while by a backend to determine if it needs
 * to backoff per its current usage and target usage.
 */
static inline void
BackoffBackend()
{
	BackoffBackendLocalEntry *le = NULL;
	BackoffBackendSharedEntry *se = NULL;

	/* Try to achieve target usage! */
	struct timeval currentTime;
	struct rusage currentUsage;
	double		thisProcessTime = 0.0;
	double		totalTime = 0.0;
	double		cpuRatio = 0.0;
	double		changeFactor = 1.0;

	le = myBackoffLocalEntry();
	Assert(le);
	se = myBackoffSharedEntry();
	Assert(se);
	Assert(se->weight > 0);

	/* Provide tracing information */
	TRACE_POSTGRESQL_BACKOFF_LOCALCHECK(MyBackendId);

	if (gettimeofday(&currentTime, NULL) < 0)
	{
		elog(ERROR, "Unable to execute gettimeofday(). Please disable query prioritization.");
	}

	if (getrusage(RUSAGE_SELF, &currentUsage) < 0)
	{
		elog(ERROR, "Unable to execute getrusage(). Please disable query prioritization.");
	}

	/* If backoff can be performed by this process */
	if (se->backoff)
	{
		/*
		 * How much did the cpu work on behalf of this process - incl user and
		 * sys time
		 */
		thisProcessTime = TIMEVAL_DIFF_USEC(currentUsage.ru_utime, le->lastUsage.ru_utime)
			+ TIMEVAL_DIFF_USEC(currentUsage.ru_stime, le->lastUsage.ru_stime);

		/*
		 * Absolute cpu time since the last check. This accounts for multiple
		 * procs per segment
		 */
		totalTime = TIMEVAL_DIFF_USEC(currentTime, se->lastCheckTime);

		cpuRatio = thisProcessTime / totalTime;

		cpuRatio = Min(cpuRatio, 1.0);

		changeFactor = cpuRatio / se->targetUsage;

		le->lastSleepTime *= changeFactor;

		if (le->lastSleepTime < DEFAULT_SLEEP_TIME)
			le->lastSleepTime = DEFAULT_SLEEP_TIME;

		if (gp_debug_resqueue_priority)
		{
			elog(LOG, "thissession = %d, thisProcTime = %f, totalTime = %f, targetusage = %f, cpuRatio = %f, change factor = %f, sleeptime = %f",
				 se->statementId.sessionId, thisProcessTime, totalTime, se->targetUsage, cpuRatio, changeFactor, (double) le->lastSleepTime);
		}

		memcpy(&le->lastUsage, &currentUsage, sizeof(currentUsage));
		memcpy(&se->lastCheckTime, &currentTime, sizeof(currentTime));

		if (le->lastSleepTime > MIN_SLEEP_THRESHOLD)
		{
			/*
			 * Sleeping happens in chunks so that the backend may exit early
			 * from its sleep if the sweeper requests it to.
			 */
			int			j = 0;
			long		sleepInterval = ((long) gp_resqueue_priority_sweeper_interval) * 1000L;
			int			numIterations = (int) (le->lastSleepTime / sleepInterval);
			double		leftOver = (double) ((long) le->lastSleepTime % sleepInterval);

			for (j = 0; j < numIterations; j++)
			{
				/* Sleep a chunk */
				pg_usleep(sleepInterval);
				/* Check for early backoff exit */
				if (se->earlyBackoffExit)
				{
					le->lastSleepTime = DEFAULT_SLEEP_TIME;		/* Minimize sleep time
																 * since we may need to
																 * recompute from
																 * scratch */
					break;
				}
			}
			if (j == numIterations)
				pg_usleep(leftOver);
		}
	}
	else
	{
		/*
		 * Even if this backend did not backoff, it should record current
		 * usage and current time so that subsequent calculations are
		 * accurate.
		 */
		memcpy(&le->lastUsage, &currentUsage, sizeof(currentUsage));
		memcpy(&se->lastCheckTime, &currentTime, sizeof(currentTime));
	}

	/* Consider finding a better leader for better grouping */
	if (!groupingTimeExpired())
	{
		findBetterGroupLeader();
	}
}

/*
 * CHECK_FOR_INTERRUPTS() increments a counter, 'backoffTickCounter', on
 * every call, which we use as a loose measure of progress. Whenever the
 * counter reaches 'gp_resqueue_priority_local_interval', CHECK_FOR_INTERRUPTS()
 * calls this function, to perform a backoff action (see BackoffBackend()).
 */
void
BackoffBackendTickExpired(void)
{
	BackoffBackendLocalEntry *le;
	BackoffBackendSharedEntry *se;
	StatementId currentStatementId = {gp_session_id, gp_command_count};

	backoffTickCounter = 0;

	if (!(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE)
		|| !IsResQueueEnabled()
		|| !gp_enable_resqueue_priority
		|| !IsUnderPostmaster
		|| (MyBackendId == InvalidBackendId)
		|| proc_exit_inprogress
		|| ProcDiePending		/* Proc is dying */
		|| QueryCancelPending	/* Statement cancellation */
		|| QueryFinishPending	/* Statement finish requested */
		|| InterruptHoldoffCount != 0	/* We're holding off on handling
										 * interrupts */
		|| CritSectionCount != 0	/* In critical section */
		)
	{
		/* Do nothing under these circumstances */
		return;
	}

	if (!backoffSingleton)
	{
		/* Not initialized yet. Do nothing */
		return;
	}

	Assert(backoffSingleton);

	le = myBackoffLocalEntry();
	se = myBackoffSharedEntry();

	if (!equalStatementId(&se->statementId, &currentStatementId))
	{
		/* This backend's entry has not yet been initialized. Do nothing yet. */
		return;
	}

	if (le->inTick)
	{
		/* No nested calls allowed. This may happen during elog calls :( */
		return;
	}

	le->inTick = true;

	/* Perform backoff. */
	BackoffBackend();
	se->earlyBackoffExit = false;

	le->inTick = false;
}

/**
 * BackoffSweeper() looks at all the backend structures to determine if any
 * backends are not making progress. This is done by inspecting the lastchecked
 * time.  It also calculates the total weight of all 'active' backends to
 * re-calculate the target CPU usage per backend process. If it finds that a
 * backend is trying to request more CPU resources than the maximum CPU that it
 * can get (such a backend is called a 'pegger'), it assigns maxCPU to it.
 *
 * For example:
 * Let Qi be the ith query statement, Ri be the target CPU usage for Qi,
 * Wi be the statement weight for Qi, W be the total statements weight.
 * For simplicity, let's assume every statement only has 1 backend per segment.
 *
 * Let there be 4 active queries with weights {1,100,10,1000} with K=3 CPUs
 * available per segment to share. The maximum CPU that a backend can get is
 * maxCPU = 1.0. The total active statements weight is
 * W (activeWeight) = 1 + 100 + 10 + 1000 = 1111.
 * The following algorithm determines that Q4 is pegger, because
 * K * W4 / W > maxCPU, which is 3000/1111 > 1.0, so we assign R4 = 1.0.
 * Now K becomes 2.0, W becomes 111.
 * It restarts from the beginning and determines that Q2 is now a pegger as
 * well, because K * W2 / W > maxCPU, which is 200/111 > 1.0, we assign
 * R2 = 1.0. Now there is only 1 CPU left and no peggers left. We continue
 * to distribute the left 1 CPU to other backends according to their weight,
 * so we assign the target CPU ratio of R1=1/11 and R3=10/11. The final
 * target CPU assignments are {0.09,1.0,0.91,1.0}.
 *
 * If there are multiple backends within a segment running for the query Qi,
 * the target CPU ratio Ri for query Qi is divided equally among all the
 * active backends belonging to the query.
 */
void
BackoffSweeper()
{
	int			i = 0;

	/* The overall weight of active statements */
	volatile double activeWeight = 0.0;
	int			numActiveBackends = 0;
	int			numActiveStatements = 0;

	/* The overall weight of active and inactive statements */
	int			totalStatementWeight = 0;
	int			numValidBackends = 0;
	int			numStatements = 0;

	struct timeval currentTime;

	if (gettimeofday(&currentTime, NULL) < 0)
	{
		elog(ERROR, "Unable to execute gettimeofday(). Please disable query prioritization.");
	}

	Assert(backoffSingleton->sweeperInProgress == false);

	backoffSingleton->sweeperInProgress = true;

	TRACE_POSTGRESQL_BACKOFF_GLOBALCHECK();

	/* Reset status for all the backend entries */
	for (i = 0; i < backoffSingleton->numEntries; i++)
	{
		BackoffBackendSharedEntry *se = getBackoffEntryRW(i);

		se->isActive = false;
		se->numFollowersActive = 0;
		se->backoff = true;
	}

	/*
	 * Mark backends that are active. Count of active group members is
	 * maintained at their group leader.
	 */
	for (i = 0; i < backoffSingleton->numEntries; i++)
	{
		BackoffBackendSharedEntry *se = getBackoffEntryRW(i);

		if (isValid(&se->statementId))
		{
			Assert(se->weight > 0);
			if (TIMEVAL_DIFF_USEC(currentTime, se->lastCheckTime)
				< gp_resqueue_priority_inactivity_timeout * 1000.0)
			{
				/*
				 * This is an active backend. Need to maintain count at group
				 * leader
				 */
				BackoffBackendSharedEntry *gl = getBackoffEntryRW(se->groupLeaderIndex);

				if (gl->numFollowersActive == 0)
				{
					activeWeight += se->weight;
					numActiveStatements++;
				}
				gl->numFollowersActive++;
				numActiveBackends++;
				se->isActive = true;
			}
			if (isGroupLeader(i))
			{
				totalStatementWeight += se->weight;
				numStatements++;
			}
			numValidBackends++;
		}
	}

	/* Sanity checks */
	Assert(numActiveBackends <= numValidBackends);
	Assert(numValidBackends >= numStatements);

	/**
	 * Under certain conditions, we want to avoid backoff. Cases are:
	 * 1. A statement just entered or exited
	 * 2. A statement's weight changed due to user intervention via gp_adjust_priority()
	 * 3. There is no active backend
	 * 4. There is exactly one statement
	 * 5. Total number valid of backends <= number of procs per segment
	 * Case 1 and 2 are approximated by checking if total statement weight changed since last sweeper loop.
	 */
	if (backoffSingleton->lastTotalStatementWeight != totalStatementWeight
		|| numActiveBackends == 0
		|| numStatements == 1
		|| numValidBackends <= numProcsPerSegment())
	{
		/* Write to targets */
		for (i = 0; i < backoffSingleton->numEntries; i++)
		{
			BackoffBackendSharedEntry *se = getBackoffEntryRW(i);

			se->backoff = false;
			se->earlyBackoffExit = true;
			se->targetUsage = 1.0;
		}
	}
	else
	{
		/**
		 * There are multiple statements with active backends.
		 *
		 * Let 'found' be true if we find a backend is trying to
		 * request more CPU resources than the maximum CPU that it can
		 * get. No matter how high the priority of a query process, it
		 * can utilize at most a single CPU at a time.
		 */
		bool		found = true;
		int			numIterations = 0;
		double		CPUAvailable = numProcsPerSegment();
		double		maxCPU = Min(1.0, numProcsPerSegment());	/* Maximum CPU that a
																 * backend can get */

		Assert(maxCPU > 0.0);
		Assert(activeWeight > 0.0);

		if (gp_debug_resqueue_priority)
		{
			elog(LOG, "before allocation: active backends = %d, active weight = %f, cpu available = %f", numActiveBackends, activeWeight, CPUAvailable);
		}

		while (found)
		{
			found = false;

			/**
			 * We try to find one or more backends that deserve maxCPU.
			 */
			for (i = 0; i < backoffSingleton->numEntries; i++)
			{
				BackoffBackendSharedEntry *se = getBackoffEntryRW(i);

				if (se->isActive
					&& se->backoff)
				{
					double		targetCPU = 0.0;
					const BackoffBackendSharedEntry *gl = getBackoffEntryRO(se->groupLeaderIndex);

					if (gl->numFollowersActive <= 0)
					{
						/*
						 * There is a race condition here:
						 * Backend A, B belong to the same statement. Backend A remains inactive
						 * longer than gp_resqueue_priority_inactivity_timeout.
						 * 
						 * Timestamp1: backend A's leader is A, backend B's leader is B.
						 *
						 * Timestamp2: backend A's numFollowersActive remains zero due to timeout.
						 *
						 * Timestamp3: Sweeper calculates leader B's numFollowersActive to 1.
						 *
						 * Timestamp4: backend B changes it's leader to A.
						 *
						 * Backend process can change the backoff group leader without checking whether
						 * the leader is an active backend due to performance consideration. This leads
						 * to a backend could switch to an inactive leader whose numFollowersActive is
						 * zero. Since backoff sweeper is not an accurate control, we could just skip
						 * it in the current loop.
						 */
						backoffSingleton->sweeperInProgress = false;
						elog(LOG, "numFollowersActive underflow!");
						return;
					}

					Assert(se->weight > 0.0);
					targetCPU = (CPUAvailable) * (se->weight) / activeWeight / gl->numFollowersActive;

					/**
					 * Some statements may be weighed so heavily that they are allocated the maximum cpu ratio.
					 */
					if (targetCPU >= maxCPU)
					{
						Assert(numProcsPerSegment() >= 1.0);	/* This can only happen
																 * when there is more
																 * than one proc */
						se->targetUsage = maxCPU;
						se->backoff = false;
						activeWeight -= (se->weight / gl->numFollowersActive);

						CPUAvailable -= maxCPU;
						found = true;
					}

					if (activeWeight <= 0.0)
					{
						/*
						 * There is a race condition here:
						 * Backend A,B,C belong to the same statement and have weight of
						 * 100000.
						 *
						 * Timestamp1: backend A's leader is A, backend B's leader is B
						 * backend C's leader is also B.
						 *
						 * Timestamp2: Sweeper calculates the activeWeight to 200000.
						 *
						 * Timestamp3: backend B changes it's leader to A.
						 *
						 * Timestamp4: Sweeper try to find the backends who deserve maxCPU,
						 * if backend A, B, C all deserve maxCPU, then activeWeight = 
						 * 200000 - 100000/1 - 100000/1 - 100000/2 which is less than zero.
						 *
						 * We can stop sweeping for such race condition because current
						 * backoff mechanism dose not ask for accurate control.
						 */
						backoffSingleton->sweeperInProgress = false;
						elog(LOG, "activeWeight underflow!");
						return;
					}
				}
			}
			numIterations++;
			AssertImply(found, (numIterations <= floor(numProcsPerSegment())));
			Assert(numIterations <= ceil(numProcsPerSegment()));
		}

		if (gp_debug_resqueue_priority)
		{
			elog(LOG, "after heavy backends: active backends = %d, active weight = %f, cpu available = %f", numActiveBackends, activeWeight, CPUAvailable);
		}

		/**
		 * Distribute whatever is the CPU available among the rest.
		 */
		for (i = 0; i < backoffSingleton->numEntries; i++)
		{
			BackoffBackendSharedEntry *se = getBackoffEntryRW(i);

			if (se->isActive
				&& se->backoff)
			{
				const BackoffBackendSharedEntry *gl = getBackoffEntryRO(se->groupLeaderIndex);

				Assert(se->weight > 0.0);
				se->targetUsage = (CPUAvailable) * (se->weight) / activeWeight / gl->numFollowersActive;
			}
		}
	}


	backoffSingleton->lastTotalStatementWeight = totalStatementWeight;
	backoffSingleton->sweeperInProgress = false;

	if (gp_debug_resqueue_priority)
	{
		StringInfoData str;

		initStringInfo(&str);
		appendStringInfo(&str, "num active statements: %d ", numActiveStatements);
		appendStringInfo(&str, "num active backends: %d ", numActiveBackends);
		appendStringInfo(&str, "targetusages: ");
		for (i = 0; i < MaxBackends; i++)
		{
			const BackoffBackendSharedEntry *se = getBackoffEntryRO(i);

			if (se->isActive)
				appendStringInfo(&str, "(%d,%f)", i, se->targetUsage);
		}
		elog(LOG, "%s", (const char *) str.data);
		pfree(str.data);
	}

}

/**
 * Initialize global sate of backoff scheduler. This is called during creation
 * of shared memory and semaphores.
 */
void
BackoffStateInit()
{
	bool		found = false;

	/* Create or attach to the shared array */
	backoffSingleton = (BackoffState *) ShmemInitStruct("Backoff Global State", sizeof(BackoffState), &found);

	if (!found)
	{
		bool		ret = false;

		/*
		 * We're the first - initialize.
		 */
		MemSet(backoffSingleton, 0, sizeof(BackoffState));
		backoffSingleton->numEntries = MaxBackends;
		backoffSingleton->backendEntries = (BackoffBackendSharedEntry *) ShmemInitStruct("Backoff Backend Entries", mul_size(sizeof(BackoffBackendSharedEntry), backoffSingleton->numEntries), &ret);
		backoffSingleton->sweeperInProgress = false;
		Assert(!ret);
	}

	on_shmem_exit(BackoffStateAtExit, 0);
}

/**
 * This backend is done working on a statement.
 */
void
BackoffBackendEntryExit()
{
	if (MyBackendId >= 0
		&& (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE))
	{
		BackoffBackendSharedEntry *se = myBackoffSharedEntry();

		Assert(se);
		setInvalid(&se->statementId);
	}
	return;
}

/**
 * Invalidate the statement id corresponding to this backend so that it may
 * be eliminated from consideration by the sweeper early.
 */
static void
BackoffStateAtExit(int code, Datum arg)
{
	BackoffBackendEntryExit();
}


/**
 * An interface to re-weigh an existing session on the master and all backends.
 * Input:
 *	session id - what session is statement on?
 *	command count - what is the command count of statement.
 *	priority value - text, what should be the new priority of this statement.
 * Output:
 *	number of backends whose weights were changed by this call.
 */
Datum
gp_adjust_priority_value(PG_FUNCTION_ARGS)
{
	int32		session_id = PG_GETARG_INT32(0);
	int32		command_count = PG_GETARG_INT32(1);
	Datum		dVal = PG_GETARG_DATUM(2);
	char	   *priorityVal;
	int			wt;

	priorityVal = TextDatumGetCString(dVal);

	wt = BackoffPriorityValueToInt(priorityVal);

	Assert(wt > 0);

	pfree(priorityVal);

	return DirectFunctionCall3(gp_adjust_priority_int, Int32GetDatum(session_id),
							Int32GetDatum(command_count), Int32GetDatum(wt));

}

/**
 * An interface to re-weigh an existing session on the master and all backends.
 * Input:
 *	session id - what session is statement on?
 *	command count - what is the command count of statement.
 *	weight - int, what should be the new priority of this statement.
 * Output:
 *	number of backends whose weights were changed by this call.
 */
Datum
gp_adjust_priority_int(PG_FUNCTION_ARGS)
{
	int32		session_id = PG_GETARG_INT32(0);
	int32		command_count = PG_GETARG_INT32(1);
	int32		wt = PG_GETARG_INT32(2);
	int			numfound = 0;
	StatementId sid;

	if (!gp_enable_resqueue_priority)
		elog(ERROR, "Query prioritization is disabled.");

	if (!superuser())
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 (errmsg("only superuser can re-prioritize a query after it has begun execution"))));

	if (Gp_role == GP_ROLE_UTILITY)
		elog(ERROR, "Query prioritization does not work in utility mode.");

	if (wt <= 0)
		elog(ERROR, "Weight of statement must be greater than 0.");

	init(&sid, session_id, command_count);

	if (Gp_role == GP_ROLE_DISPATCH)
	{
		int			i = 0;
		CdbPgResults cdb_pgresults = {NULL, 0};
		char		cmd[255];

		/*
		 * Make sure the session exists before dispatching
		 */
		for (i = 0; i < backoffSingleton->numEntries; i++)
		{
			BackoffBackendSharedEntry *se = getBackoffEntryRW(i);

			if (equalStatementId(&se->statementId, &sid))
			{
				if (gp_debug_resqueue_priority)
				{
					elog(LOG, "changing weight of (%d:%d) from %d to %d", se->statementId.sessionId, se->statementId.commandCount, se->weight, wt);
				}

				se->weight = wt;
				numfound++;
			}
		}

		if (numfound == 0)
			elog(ERROR, "Did not find any backend entries for session %d, command count %d.", session_id, command_count);

		/*
		 * Ok, it exists, dispatch the command to the segDBs.
		 */
		sprintf(cmd, "select gp_adjust_priority(%d,%d,%d)", session_id, command_count, wt);

		CdbDispatchCommand(cmd, DF_WITH_SNAPSHOT, &cdb_pgresults);

		for (i = 0; i < cdb_pgresults.numResults; i++)
		{
			struct pg_result *pgresult = cdb_pgresults.pg_results[i];

			if (PQresultStatus(pgresult) != PGRES_TUPLES_OK)
			{
				cdbdisp_clearCdbPgResults(&cdb_pgresults);
				elog(ERROR, "gp_adjust_priority: resultStatus not tuples_Ok");
			}
			else
			{

				int			j;

				for (j = 0; j < PQntuples(pgresult); j++)
				{
					int			retvalue = 0;

					retvalue = atoi(PQgetvalue(pgresult, j, 0));
					numfound += retvalue;
				}
			}
		}

		cdbdisp_clearCdbPgResults(&cdb_pgresults);

	}
	else	/* Gp_role == EXECUTE */
	{
		/*
		 * Find number of backends working on behalf of this session and
		 * distribute the weight evenly.
		 */
		int			i = 0;

		Assert(Gp_role == GP_ROLE_EXECUTE);
		for (i = 0; i < backoffSingleton->numEntries; i++)
		{
			BackoffBackendSharedEntry *se = getBackoffEntryRW(i);

			if (equalStatementId(&se->statementId, &sid))
			{
				if (gp_debug_resqueue_priority)
				{
					elog(LOG, "changing weight of (%d:%d) from %d to %d", se->statementId.sessionId, se->statementId.commandCount, se->weight, wt);
				}
				se->weight = wt;
				numfound++;
			}
		}

		if (gp_debug_resqueue_priority && numfound == 0)
		{
			elog(LOG, "did not find any matching backends on segments");
		}
	}

	PG_RETURN_INT32(numfound);
}

bool
BackoffSweeperStartRule(Datum main_arg)
{
	if (IsResQueueEnabled())
		return true;

	return false;
}

/**
 * This method is called after fork of the sweeper process. It sets up signal
 * handlers and does initialization that is required by a postgres backend.
 */
void
BackoffSweeperMain(Datum main_arg)
{
	isSweeperProcess = true;

	/* We're now ready to receive signals */
	BackgroundWorkerUnblockSignals();

	/* main loop */
	BackoffSweeperLoop();

	/* One iteration done, go away */
	proc_exit(0);
}

/**
 * Main loop of the sweeper process. It wakes up once in a while, marks backends as active
 * or not and re-calculates CPU usage among active backends.
 */
void
BackoffSweeperLoop(void)
{
	for (;;)
	{
		int		rc;

		if (gp_enable_resqueue_priority)
			BackoffSweeper();

		Assert(gp_resqueue_priority_sweeper_interval > 0.0);

		/* Sleep a while. */
		rc = WaitLatch(&MyProc->procLatch,
					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
					   gp_resqueue_priority_sweeper_interval,
					   WAIT_EVENT_BACKOFF_MAIN);
		ResetLatch(&MyProc->procLatch);

		/* emergency bailout if postmaster has died */
		if (rc & WL_POSTMASTER_DEATH)
			proc_exit(1);
	}							/* end server loop */

	return;
}

/**
 * Set returning function to inspect current state of query prioritization.
 * Input:
 *	none
 * Output:
 *	Set of (session_id, command_count, priority, weight) for all backends (on the current segment).
 *	This function is used by jetpack views gp_statement_priorities.
 */
Datum
gp_list_backend_priorities(PG_FUNCTION_ARGS)
{
	typedef struct Context
	{
		int			currentIndex;
	} Context;

	FuncCallContext *funcctx = NULL;
	Context    *context = NULL;

	if (SRF_IS_FIRSTCALL())
	{
		TupleDesc	tupdesc;
		MemoryContext oldcontext;

		/* create a function context for cross-call persistence */
		funcctx = SRF_FIRSTCALL_INIT();

		/*
		 * switch to memory context appropriate for multiple function calls
		 */
		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

		/* build tupdesc for result tuples */
		/* this had better match gp_distributed_xacts view in system_views.sql */
		tupdesc = CreateTemplateTupleDesc(4);
		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "session_id",
						   INT4OID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "command_count",
						   INT4OID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "priority",
						   TEXTOID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "weight",
						   INT4OID, -1, 0);


		funcctx->tuple_desc = BlessTupleDesc(tupdesc);

		/*
		 * Collect all the locking information that we will format and send
		 * out as a result set.
		 */
		context = (Context *) palloc(sizeof(Context));
		funcctx->user_fctx = (void *) context;
		context->currentIndex = 0;
		MemoryContextSwitchTo(oldcontext);
	}

	funcctx = SRF_PERCALL_SETUP();
	Assert(funcctx);
	context = (Context *) funcctx->user_fctx;
	Assert(context);

	if (!IsResQueueEnabled() || !gp_enable_resqueue_priority)
		SRF_RETURN_DONE(funcctx);

	while (context->currentIndex < backoffSingleton->numEntries)
	{
		Datum		values[4];
		bool		nulls[4];
		HeapTuple	tuple = NULL;
		Datum		result;
		char	   *priorityVal = NULL;

		const BackoffBackendSharedEntry *se = NULL;

		se = getBackoffEntryRO(context->currentIndex);

		Assert(se);

		if (!isValid(&se->statementId))
		{
			context->currentIndex++;
			continue;
		}

		/*
		 * Form tuple with appropriate data.
		 */
		MemSet(values, 0, sizeof(values));
		MemSet(nulls, false, sizeof(nulls));

		values[0] = Int32GetDatum((int32) se->statementId.sessionId);
		values[1] = Int32GetDatum((int32) se->statementId.commandCount);

		priorityVal = BackoffPriorityIntToValue(se->weight);

		Assert(priorityVal);

		values[2] = CStringGetTextDatum(priorityVal);
		Assert(se->weight > 0);
		values[3] = Int32GetDatum((int32) se->weight);
		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
		Assert(tuple);
		result = HeapTupleGetDatum(tuple);

		context->currentIndex++;

		SRF_RETURN_NEXT(funcctx, result);
	}

	SRF_RETURN_DONE(funcctx);
}

/**
 * What is the weight assigned to superuser issued queries?
 */
static int
BackoffSuperuserStatementWeight(void)
{
	int			wt = -1;

	Assert(superuser());
	wt = BackoffPriorityValueToInt("MAX");
	Assert(wt > 0);
	return wt;
}

/**
 * Integer value for default weight.
 */
static int
BackoffDefaultWeight(void)
{
	int			wt = BackoffPriorityValueToInt(gp_resqueue_priority_default_value);

	Assert(wt > 0);
	return wt;
}

/**
 * Get weight associated with queue. See queue.c.
 *
 * Attention is paid in order to avoid catalog lookups when not allowed.  The
 * superuser() function performs catalog lookups in certain cases. Also the
 * GetResqueueCapabilityEntry will always  do a catalog lookup. In such cases
 * use the default weight.
 */
int
ResourceQueueGetPriorityWeight(Oid queueId)
{
	List	   *capabilitiesList = NULL;
	List	   *entry = NULL;
	ListCell   *le = NULL;
	int			weight = BackoffDefaultWeight();

	if (!IsTransactionState())
		return weight;

	if (superuser())
		return BackoffSuperuserStatementWeight();

	if (queueId == InvalidOid)
		return weight;

	capabilitiesList = GetResqueueCapabilityEntry(queueId);		/* This is a list of
																 * lists */

	if (!capabilitiesList)
		return weight;

	foreach(le, capabilitiesList)
	{
		Value	   *key = NULL;

		entry = (List *) lfirst(le);
		Assert(entry);
		key = (Value *) linitial(entry);
		Assert(key->type == T_Integer); /* This is resource type id */
		if (intVal(key) == PG_RESRCTYPE_PRIORITY)
		{
			Value	   *val = lsecond(entry);

			Assert(val->type == T_String);
			weight = BackoffPriorityValueToInt(strVal(val));
		}
	}
	list_free(capabilitiesList);
	return weight;
}

typedef struct PriorityMapping
{
	const char *priorityVal;
	int			weight;
} PriorityMapping;

const struct PriorityMapping priority_map[] = {
	{"MAX", 1000000},
	{"HIGH", 1000},
	{"MEDIUM", 500},
	{"LOW", 200},
	{"MIN", 100},
	/* End of list marker */
	{NULL, 0}
};


/**
 * Resource queues are associated with priority values which are stored
 * as text. This method maps them to double values that will be used for
 * cpu target usage computations by the sweeper. Keep this method in sync
 * with its dual BackoffPriorityIntToValue().
 */
static int
BackoffPriorityValueToInt(const char *priorityVal)
{
	const PriorityMapping *p = priority_map;

	Assert(p);
	while (p->priorityVal != NULL && (pg_strcasecmp(priorityVal, p->priorityVal) != 0))
	{
		p++;
		Assert((char *) p < (const char *) priority_map + sizeof(priority_map));
	}

	if (p->priorityVal == NULL)
	{
		/* No match found, throw an error */
		elog(ERROR, "Invalid priority value.");
	}

	Assert(p->weight > 0);

	return p->weight;
}

/**
 * Dual of the method BackoffPriorityValueToInt(). Given a weight, this
 * method maps it to a text value corresponding to this weight. Caller is
 * responsible for deallocating the return pointer.
 */
char *
BackoffPriorityIntToValue(int weight)
{
	const PriorityMapping *p = priority_map;

	Assert(p);

	while (p->priorityVal != NULL && (p->weight != weight))
	{
		p = p + 1;
		Assert((char *) p < (const char *) priority_map + sizeof(priority_map));
	}

	if (p->priorityVal != NULL)
	{
		Assert(p->weight == weight);
		return pstrdup(p->priorityVal);
	}

	return pstrdup("NON-STANDARD");
}

/*
 * Helper method that verifies setting of default priority guc.
 */
bool
gpvars_check_gp_resqueue_priority_default_value (char **newval,void **extra,
												GucSource source)
{
	int			wt;

	wt = BackoffPriorityValueToInt(*newval); /* This will throw an error if * bad value is specified */

	if (wt > 0)
		return true;

	GUC_check_errmsg("invalid value for gp_resqueue_priority_default_value: \"%s\"", *newval);
	return false;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn autostats 源码

greenplumn autovacuum 源码

greenplumn bgworker 源码

greenplumn bgwriter 源码

greenplumn checkpointer 源码

greenplumn fork_process 源码

greenplumn pgarch 源码

greenplumn pgstat 源码

greenplumn postmaster 源码

greenplumn startup 源码

0  赞