greenplumn resgroup 源码
greenplumn resgroup 代码
文件路径:/src/backend/utils/resgroup/resgroup.c
/*-------------------------------------------------------------------------
*
* resgroup.c
* GPDB resource group management code.
*
*
* TERMS:
*
* - FIXED QUOTA: the minimal memory quota reserved for a slot. This quota
* is promised to be available during the lifecycle of the slot.
*
* - SHARED QUOTA: the preemptive memory quota shared by all the slots
* in a resource group. When a slot want to use more memory than its
* FIXED QUOTA it can attempt to allocate from this SHARED QUOTA, however
* this allocation is possible to fail depending on the actual usage.
*
* - MEM POOL: the global memory quota pool shared by all the resource groups.
* Overuse in this pool is strictly forbidden. A resource group must
* acquire from this pool to have enough memory quota for its slots'
* FIXED QUOTA and SHARED QUOTA, and should release overused quota to
* this pool as soon as possible.
*
* - SLOT POOL: the global slot pool shared by all the resource groups.
* A resource group must acquire a free slot in this pool for a new
* transaction to run in it.
*
* Portions Copyright (c) 2006-2010, Greenplum inc.
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
*
*
* IDENTIFICATION
* src/backend/utils/resgroup/resgroup.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <math.h>
#include "libpq-fe.h"
#include "access/genam.h"
#include "access/table.h"
#include "tcop/tcopprot.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_resgroup.h"
#include "catalog/pg_resgroupcapability.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/memquota.h"
#include "commands/resgroupcmds.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/atomics.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
#include "storage/lock.h"
#include "storage/pg_shmem.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/resgroup-ops.h"
#include "utils/resgroup.h"
#include "utils/resource_manager.h"
#include "utils/session_state.h"
#include "utils/vmem_tracker.h"
#define InvalidSlotId (-1)
#define RESGROUP_MAX_SLOTS (MaxConnections)
/*
* A hard memory limit in by pass mode, in chunks
* More chunks are reserved on QD than on QE because planner and orca
* may need more memory to generate and optimize the plan.
*/
#define RESGROUP_BYPASS_MODE_MEMORY_LIMIT_ON_QD 30
#define RESGROUP_BYPASS_MODE_MEMORY_LIMIT_ON_QE 10
/*
* GUC variables.
*/
int gp_resgroup_memory_policy = RESMANAGER_MEMORY_POLICY_NONE;
bool gp_log_resgroup_memory = false;
int gp_resgroup_memory_policy_auto_fixed_mem;
bool gp_resgroup_print_operator_memory_limits = false;
bool gp_resgroup_debug_wait_queue = true;
int memory_spill_ratio = 20;
int gp_resource_group_queuing_timeout = 0;
/*
* Data structures
*/
typedef struct ResGroupInfo ResGroupInfo;
typedef struct ResGroupHashEntry ResGroupHashEntry;
typedef struct ResGroupProcData ResGroupProcData;
typedef struct ResGroupSlotData ResGroupSlotData;
typedef struct ResGroupData ResGroupData;
typedef struct ResGroupControl ResGroupControl;
/*
* Resource group info.
*
* This records the group and groupId for a transaction.
* When group->groupId != groupId, it means the group
* has been dropped.
*/
struct ResGroupInfo
{
ResGroupData *group;
Oid groupId;
};
struct ResGroupHashEntry
{
Oid groupId;
int index;
};
/*
* Per proc resource group information.
*
* Config snapshot and runtime accounting information in current proc.
*/
struct ResGroupProcData
{
Oid groupId;
int32 memUsage; /* memory usage of current proc */
/*
* Record current bypass memory limit for each bypass queries.
* For bypass mode, memUsage of current process could accumulate in a session.
* So should limit the memory usage for each query instead of the whole session.
*/
int32 bypassMemoryLimit;
ResGroupData *group;
ResGroupSlotData *slot;
ResGroupCaps caps;
};
/*
* Per slot resource group information.
*
* Resource group have 'concurrency' number of slots.
* Each transaction acquires a slot on master before running.
* The information shared by QE processes on each segments are stored
* in this structure.
*/
struct ResGroupSlotData
{
Oid groupId;
ResGroupData *group; /* pointer to the group */
int32 memQuota; /* memory quota of current slot */
int32 memUsage; /* total memory usage of procs belongs to this slot */
int nProcs; /* number of procs in this slot */
ResGroupSlotData *next;
ResGroupCaps caps;
};
/*
* Resource group operations for memory.
*
* Groups with different memory auditor will have different
* operations.
*/
typedef struct ResGroupMemOperations
{
void (*group_mem_on_create) (Oid groupId, ResGroupData *group);
void (*group_mem_on_alter) (Oid groupId, ResGroupData *group);
void (*group_mem_on_drop) (Oid groupId, ResGroupData *group);
void (*group_mem_on_notify) (ResGroupData *group);
void (*group_mem_on_dump) (ResGroupData *group, StringInfo str);
} ResGroupMemOperations;
/*
* Resource group information.
*/
struct ResGroupData
{
Oid groupId; /* Id for this group */
/*
* memGap is calculated as:
* (memory limit (before alter) - memory expected (after alter))
*
* It stands for how many memory (in chunks) this group should
* give back to MEM POOL.
*/
int32 memGap;
int32 memExpected; /* expected memory chunks according to current caps */
int32 memQuotaGranted; /* memory chunks for quota part */
int32 memSharedGranted; /* memory chunks for shared part */
volatile int32 memQuotaUsed; /* memory chunks assigned to all the running slots */
/*
* memory usage of this group, should always equal to the
* sum of session memory(session_state->sessionVmem) that
* belongs to this group
*/
volatile int32 memUsage;
volatile int32 memSharedUsage;
volatile int nRunning; /* number of running trans */
volatile int nRunningBypassed; /* number of running trans in bypass mode */
int totalExecuted; /* total number of executed trans */
int totalQueued; /* total number of queued trans */
int64 totalQueuedTimeMs; /* total queue time, in milliseconds */
PROC_QUEUE waitProcs; /* list of PGPROC objects waiting on this group */
/*
* operation functions for resource group
*/
const ResGroupMemOperations *groupMemOps;
bool lockedForDrop; /* true if resource group is dropped but not committed yet */
ResGroupCaps caps; /* capabilities of this group */
};
struct ResGroupControl
{
int32 totalChunks; /* total memory chunks on this segment */
/*
* Safe memory threshold:
* if remained global shared memory is less than this threshold,
* then the resource group memory usage is in red zone.
* Note that safeChunksThreshold100 is 100 times bigger than the real safe chunks.
* This is used to avoid rounding problem caused by runaway_detector_activation_percent
*/
pg_atomic_uint32 safeChunksThreshold100;
pg_atomic_uint32 freeChunks; /* memory chunks not allocated to any group,
will be used for the query which group share
memory is not enough*/
int32 chunkSizeInBits;
int segmentsOnMaster;
ResGroupSlotData *slots; /* slot pool shared by all resource groups */
ResGroupSlotData *freeSlot; /* head of the free list */
HTAB *htbl;
/*
* The hash table for resource groups in shared memory should only be populated
* once, so we add a flag here to implement this requirement.
*/
bool loaded;
int nGroups;
ResGroupData groups[1];
};
bool gp_resource_group_enable_cgroup_memory = false;
bool gp_resource_group_enable_cgroup_swap = false;
bool gp_resource_group_enable_cgroup_cpuset = false;
/* hooks */
resgroup_assign_hook_type resgroup_assign_hook = NULL;
/* static variables */
static ResGroupControl *pResGroupControl = NULL;
static ResGroupProcData __self =
{
InvalidOid,
};
static ResGroupProcData *self = &__self;
/* If we are waiting on a group, this points to the associated group */
static ResGroupData *groupAwaited = NULL;
static TimestampTz groupWaitStart;
static TimestampTz groupWaitEnd;
/* the resource group self is running in bypass mode */
static ResGroupData *bypassedGroup = NULL;
/* a fake slot used in bypass mode */
static ResGroupSlotData bypassedSlot;
/* static functions */
static bool groupApplyMemCaps(ResGroupData *group);
static int32 mempoolReserve(Oid groupId, int32 chunks);
static void mempoolRelease(Oid groupId, int32 chunks);
static void groupRebalanceQuota(ResGroupData *group,
int32 chunks,
const ResGroupCaps *caps);
static void decideTotalChunks(int32 *totalChunks, int32 *chunkSizeInBits);
static int32 groupGetMemExpected(const ResGroupCaps *caps);
static int32 groupGetMemQuotaExpected(const ResGroupCaps *caps);
static int32 groupGetMemSharedExpected(const ResGroupCaps *caps);
static int32 groupGetMemSpillTotal(const ResGroupCaps *caps);
static int32 slotGetMemQuotaExpected(const ResGroupCaps *caps);
static int32 slotGetMemQuotaOnQE(const ResGroupCaps *caps, ResGroupData *group);
static int32 slotGetMemSpill(const ResGroupCaps *caps);
static void wakeupSlots(ResGroupData *group, bool grant);
static void notifyGroupsOnMem(Oid skipGroupId);
static int32 mempoolAutoRelease(ResGroupData *group);
static int32 mempoolAutoReserve(ResGroupData *group, const ResGroupCaps *caps);
static ResGroupData *groupHashNew(Oid groupId);
static ResGroupData *groupHashFind(Oid groupId, bool raise);
static ResGroupData *groupHashRemove(Oid groupId);
static void waitOnGroup(ResGroupData *group, bool isMoveQuery);
static ResGroupData *createGroup(Oid groupId, const ResGroupCaps *caps);
static void removeGroup(Oid groupId);
static void AtProcExit_ResGroup(int code, Datum arg);
static void groupWaitCancel(bool isMoveQuery);
static int32 groupReserveMemQuota(ResGroupData *group);
static void groupReleaseMemQuota(ResGroupData *group, ResGroupSlotData *slot);
static int32 groupIncMemUsage(ResGroupData *group,
ResGroupSlotData *slot,
int32 chunks);
static int32 groupDecMemUsage(ResGroupData *group,
ResGroupSlotData *slot,
int32 chunks);
static int32 groupIncSlotMemUsage(ResGroupData *group, ResGroupSlotData *slot);
static void groupDecSlotMemUsage(ResGroupData *group, ResGroupSlotData *slot);
static void initSlot(ResGroupSlotData *slot, ResGroupData *group,
int32 slotMemQuota);
static void selfAttachResGroup(ResGroupData *group, ResGroupSlotData *slot);
static void selfDetachResGroup(ResGroupData *group, ResGroupSlotData *slot);
static bool slotpoolInit(void);
static ResGroupSlotData *slotpoolAllocSlot(void);
static void slotpoolFreeSlot(ResGroupSlotData *slot);
static ResGroupSlotData *groupGetSlot(ResGroupData *group);
static void groupPutSlot(ResGroupData *group, ResGroupSlotData *slot);
static Oid decideResGroupId(void);
static void decideResGroup(ResGroupInfo *pGroupInfo);
static bool groupIncBypassedRef(ResGroupInfo *pGroupInfo);
static void groupDecBypassedRef(ResGroupData *group);
static ResGroupSlotData *groupAcquireSlot(ResGroupInfo *pGroupInfo, bool isMoveQuery);
static void groupReleaseSlot(ResGroupData *group, ResGroupSlotData *slot, bool isMoveQuery);
static void addTotalQueueDuration(ResGroupData *group);
static void groupSetMemorySpillRatio(const ResGroupCaps *caps);
static char *groupDumpMemUsage(ResGroupData *group);
static void selfValidateResGroupInfo(void);
static bool selfIsAssigned(void);
static void selfSetGroup(ResGroupData *group);
static void selfUnsetGroup(void);
static void selfSetSlot(ResGroupSlotData *slot);
static void selfUnsetSlot(void);
static bool procIsWaiting(const PGPROC *proc);
static void procWakeup(PGPROC *proc);
static int slotGetId(const ResGroupSlotData *slot);
static void groupWaitQueueValidate(const ResGroupData *group);
static void groupWaitProcValidate(PGPROC *proc, PROC_QUEUE *head);
static void groupWaitQueuePush(ResGroupData *group, PGPROC *proc);
static PGPROC *groupWaitQueuePop(ResGroupData *group);
static void groupWaitQueueErase(ResGroupData *group, PGPROC *proc);
static bool groupWaitQueueIsEmpty(const ResGroupData *group);
static bool shouldBypassQuery(const char *query_string);
static void lockResGroupForDrop(ResGroupData *group);
static void unlockResGroupForDrop(ResGroupData *group);
static bool groupIsDropped(ResGroupInfo *pGroupInfo);
static void resgroupDumpGroup(StringInfo str, ResGroupData *group);
static void resgroupDumpWaitQueue(StringInfo str, PROC_QUEUE *queue);
static void resgroupDumpCaps(StringInfo str, ResGroupCap *caps);
static void resgroupDumpSlots(StringInfo str);
static void resgroupDumpFreeSlots(StringInfo str);
static void sessionSetSlot(ResGroupSlotData *slot);
static void sessionResetSlot(void);
static ResGroupSlotData *sessionGetSlot(void);
static void bindGroupOperation(ResGroupData *group);
static void groupMemOnAlterForVmtracker(Oid groupId, ResGroupData *group);
static void groupMemOnDropForVmtracker(Oid groupId, ResGroupData *group);
static void groupMemOnNotifyForVmtracker(ResGroupData *group);
static void groupMemOnDumpForVmtracker(ResGroupData *group, StringInfo str);
static void groupMemOnAlterForCgroup(Oid groupId, ResGroupData *group);
static void groupMemOnDropForCgroup(Oid groupId, ResGroupData *group);
static void groupMemOnNotifyForCgroup(ResGroupData *group);
static void groupMemOnDumpForCgroup(ResGroupData *group, StringInfo str);
static void groupApplyCgroupMemInc(ResGroupData *group);
static void groupApplyCgroupMemDec(ResGroupData *group);
static void cpusetOperation(char *cpuset1,
const char *cpuset2,
int len,
bool sub);
#ifdef USE_ASSERT_CHECKING
static bool selfHasGroup(void);
static bool selfHasSlot(void);
static void slotValidate(const ResGroupSlotData *slot);
static bool slotIsInFreelist(const ResGroupSlotData *slot);
static bool slotIsInUse(const ResGroupSlotData *slot);
static bool groupIsNotDropped(const ResGroupData *group);
static bool groupWaitQueueFind(ResGroupData *group, const PGPROC *proc);
#endif /* USE_ASSERT_CHECKING */
/*
* Operations of memory for resource groups with vmtracker memory auditor.
*/
static const ResGroupMemOperations resgroup_memory_operations_vmtracker = {
.group_mem_on_create = NULL,
.group_mem_on_alter = groupMemOnAlterForVmtracker,
.group_mem_on_drop = groupMemOnDropForVmtracker,
.group_mem_on_notify = groupMemOnNotifyForVmtracker,
.group_mem_on_dump = groupMemOnDumpForVmtracker,
};
/*
* Operations of memory for resource groups with cgroup memory auditor.
*/
static const ResGroupMemOperations resgroup_memory_operations_cgroup = {
.group_mem_on_create = NULL,
.group_mem_on_alter = groupMemOnAlterForCgroup,
.group_mem_on_drop = groupMemOnDropForCgroup,
.group_mem_on_notify = groupMemOnNotifyForCgroup,
.group_mem_on_dump = groupMemOnDumpForCgroup,
};
/*
* Estimate size the resource group structures will need in
* shared memory.
*/
Size
ResGroupShmemSize(void)
{
Size size = 0;
/* The hash of groups. */
size = hash_estimate_size(MaxResourceGroups, sizeof(ResGroupHashEntry));
/* The control structure. */
size = add_size(size, sizeof(ResGroupControl) - sizeof(ResGroupData));
/* The control structure. */
size = add_size(size, mul_size(MaxResourceGroups, sizeof(ResGroupData)));
/* The slot pool. */
size = add_size(size, mul_size(RESGROUP_MAX_SLOTS, sizeof(ResGroupSlotData)));
/* Add a safety margin */
size = add_size(size, size / 10);
return size;
}
/*
* Initialize the global ResGroupControl struct of resource groups.
*/
void
ResGroupControlInit(void)
{
int i;
bool found;
HASHCTL info;
int hash_flags;
int size;
size = sizeof(*pResGroupControl) - sizeof(ResGroupData);
size += mul_size(MaxResourceGroups, sizeof(ResGroupData));
pResGroupControl = ShmemInitStruct("global resource group control",
size, &found);
if (found)
return;
if (pResGroupControl == NULL)
goto error_out;
/* Set key and entry sizes of hash table */
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(ResGroupHashEntry);
info.hash = tag_hash;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
LOG_RESGROUP_DEBUG(LOG, "creating hash table for %d resource groups", MaxResourceGroups);
pResGroupControl->htbl = ShmemInitHash("Resource Group Hash Table",
MaxResourceGroups,
MaxResourceGroups,
&info, hash_flags);
if (!pResGroupControl->htbl)
goto error_out;
/*
* No need to acquire LWLock here, since this is expected to be called by
* postmaster only
*/
pResGroupControl->loaded = false;
pResGroupControl->nGroups = MaxResourceGroups;
pResGroupControl->totalChunks = 0;
pg_atomic_init_u32(&pResGroupControl->safeChunksThreshold100, 0);
pg_atomic_init_u32(&pResGroupControl->freeChunks, 0);
pResGroupControl->chunkSizeInBits = BITS_IN_MB;
for (i = 0; i < MaxResourceGroups; i++)
pResGroupControl->groups[i].groupId = InvalidOid;
if (!slotpoolInit())
goto error_out;
return;
error_out:
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for resource group control")));
}
/*
* Allocate a resource group entry from a hash table
*/
void
AllocResGroupEntry(Oid groupId, const ResGroupCaps *caps)
{
ResGroupData *group;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
group = createGroup(groupId, caps);
Assert(group != NULL);
LWLockRelease(ResGroupLock);
}
/*
* Load the resource groups in shared memory. Note this
* can only be done after enough setup has been done. This uses
* heap_open etc which in turn requires shared memory to be set up.
*/
void
InitResGroups(void)
{
HeapTuple tuple;
SysScanDesc sscan;
int numGroups;
CdbComponentDatabaseInfo *qdinfo;
ResGroupCaps caps;
Relation relResGroup;
Relation relResGroupCapability;
char cpuset[MaxCpuSetLength] = {0};
int defaultCore = -1;
Bitmapset *bmsUnused = NULL;
on_shmem_exit(AtProcExit_ResGroup, 0);
/*
* On master and segments, the first backend does the initialization.
*/
if (pResGroupControl->loaded)
return;
if (Gp_role == GP_ROLE_DISPATCH && pResGroupControl->segmentsOnMaster == 0)
{
Assert(IS_QUERY_DISPATCHER());
qdinfo = cdbcomponent_getComponentInfo(MASTER_CONTENT_ID);
pResGroupControl->segmentsOnMaster = qdinfo->hostPrimaryCount;
Assert(pResGroupControl->segmentsOnMaster > 0);
}
/*
* The resgroup shared mem initialization must be serialized. Only the first session
* should do the init.
* Serialization is done by LW_EXCLUSIVE ResGroupLock. However, we must obtain all DB
* locks before obtaining LWlock to prevent deadlock.
*/
relResGroup = table_open(ResGroupRelationId, AccessShareLock);
relResGroupCapability = table_open(ResGroupCapabilityRelationId, AccessShareLock);
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
if (pResGroupControl->loaded)
goto exit;
/* These initialization must be done before createGroup() */
decideTotalChunks(&pResGroupControl->totalChunks, &pResGroupControl->chunkSizeInBits);
pg_atomic_write_u32(&pResGroupControl->freeChunks, pResGroupControl->totalChunks);
pg_atomic_write_u32(&pResGroupControl->safeChunksThreshold100,
pResGroupControl->totalChunks * (100 - runaway_detector_activation_percent));
if (pResGroupControl->totalChunks == 0)
ereport(PANIC,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("insufficient memory available"),
errhint("Increase gp_resource_group_memory_limit")));
ResGroupOps_Init();
if (gp_resource_group_enable_cgroup_cpuset)
{
/* Get cpuset from cpuset/gpdb, and transform it into bitset */
ResGroupOps_GetCpuSet(RESGROUP_ROOT_ID, cpuset, MaxCpuSetLength);
bmsUnused = CpusetToBitset(cpuset, MaxCpuSetLength);
/* get the minimum core number, in case of the zero core is not exist */
defaultCore = bms_next_member(bmsUnused, -1);
Assert(defaultCore >= 0);
}
numGroups = 0;
sscan = systable_beginscan(relResGroup, InvalidOid, false, NULL, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(sscan)))
{
Oid groupId = ((Form_pg_resgroup) GETSTRUCT(tuple))->oid;
ResGroupData *group;
int cpuRateLimit;
GetResGroupCapabilities(relResGroupCapability, groupId, &caps);
cpuRateLimit = caps.cpuRateLimit;
group = createGroup(groupId, &caps);
Assert(group != NULL);
ResGroupOps_CreateGroup(groupId);
ResGroupOps_SetMemoryLimit(groupId, caps.memLimit);
if (caps.cpuRateLimit != CPU_RATE_LIMIT_DISABLED)
{
ResGroupOps_SetCpuRateLimit(groupId, caps.cpuRateLimit);
}
else
{
Bitmapset *bmsCurrent = CpusetToBitset(caps.cpuset,
MaxCpuSetLength);
Bitmapset *bmsCommon = bms_intersect(bmsCurrent, bmsUnused);
Bitmapset *bmsMissing = bms_difference(bmsCurrent, bmsCommon);
/*
* Do not call EnsureCpusetIsAvailable() here as resource group is
* not activated yet
*/
if (!gp_resource_group_enable_cgroup_cpuset)
{
ereport(WARNING,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cgroup is not properly configured to use the cpuset feature"),
errhint("Extra cgroup configurations are required to enable this feature, "
"please refer to the Greenplum Documentations for details")));
}
Assert(caps.cpuRateLimit == CPU_RATE_LIMIT_DISABLED);
if (bms_is_empty(bmsMissing))
{
/*
* write cpus to corresponding file
* if all the cores are available
*/
ResGroupOps_SetCpuSet(groupId, caps.cpuset);
bmsUnused = bms_del_members(bmsUnused, bmsCurrent);
}
else
{
char cpusetMissing[MaxCpuSetLength] = {0};
/*
* if some of the cores are unavailable, just set defaultCore
* to this group and send a warning message, so the system
* can startup, then DBA can fix it
*/
snprintf(cpuset, MaxCpuSetLength, "%d", defaultCore);
ResGroupOps_SetCpuSet(groupId, cpuset);
BitsetToCpuset(bmsMissing, cpusetMissing, MaxCpuSetLength);
ereport(WARNING,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cpu cores %s are unavailable on the system "
"in resource group %s",
cpusetMissing, GetResGroupNameForId(groupId)),
errhint("using core %d for this resource group, "
"please adjust the settings and restart",
defaultCore)));
}
}
numGroups++;
Assert(numGroups <= MaxResourceGroups);
}
systable_endscan(sscan);
if (gp_resource_group_enable_cgroup_cpuset)
{
/*
* set default cpuset
*/
if (bms_is_empty(bmsUnused))
{
/* no unused core, assign default core to default group */
snprintf(cpuset, MaxCpuSetLength, "%d", defaultCore);
}
else
{
/* assign all unused cores to default group */
BitsetToCpuset(bmsUnused, cpuset, MaxCpuSetLength);
}
Assert(cpuset[0]);
Assert(!CpusetIsEmpty(cpuset));
ResGroupOps_SetCpuSet(DEFAULT_CPUSET_GROUP_ID, cpuset);
}
pResGroupControl->loaded = true;
LOG_RESGROUP_DEBUG(LOG, "initialized %d resource groups", numGroups);
exit:
LWLockRelease(ResGroupLock);
/*
* release lock here to guarantee we have no lock held when acquiring
* resource group slot
*/
table_close(relResGroup, AccessShareLock);
table_close(relResGroupCapability, AccessShareLock);
}
/*
* Check resource group status when DROP RESOURCE GROUP
*
* Errors out if there're running transactions, otherwise lock the resource group.
* New transactions will be queued if the resource group is locked.
*/
void
ResGroupCheckForDrop(Oid groupId, char *name)
{
ResGroupData *group;
if (Gp_role != GP_ROLE_DISPATCH)
return;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
group = groupHashFind(groupId, true);
if (group->nRunning + group->nRunningBypassed > 0)
{
int nQuery = group->nRunning + group->nRunningBypassed + group->waitProcs.size;
Assert(name != NULL);
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("cannot drop resource group \"%s\"", name),
errhint(" The resource group is currently managing %d query(ies) and cannot be dropped.\n"
"\tTerminate the queries first or try dropping the group later.\n"
"\tThe view pg_stat_activity tracks the queries managed by resource groups.", nQuery)));
}
lockResGroupForDrop(group);
LWLockRelease(ResGroupLock);
}
/*
* Drop resource group call back function
*
* Wake up the backends in the wait queue when DROP RESOURCE GROUP finishes.
* Unlock the resource group if the transaction is aborted.
* Remove the resource group entry in shared memory if the transaction is committed.
*
* This function is called in the callback function of DROP RESOURCE GROUP.
*/
void
ResGroupDropFinish(const ResourceGroupCallbackContext *callbackCtx,
bool isCommit)
{
ResGroupData *group;
volatile int savedInterruptHoldoffCount;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
PG_TRY();
{
savedInterruptHoldoffCount = InterruptHoldoffCount;
group = groupHashFind(callbackCtx->groupid, true);
if (Gp_role == GP_ROLE_DISPATCH)
{
wakeupSlots(group, false);
unlockResGroupForDrop(group);
}
if (isCommit)
{
bool migrate;
/* Only migrate processes out of vmtracker groups */
migrate = group->caps.memAuditor == RESGROUP_MEMORY_AUDITOR_VMTRACKER;
removeGroup(callbackCtx->groupid);
if (!CpusetIsEmpty(group->caps.cpuset))
{
if (gp_resource_group_enable_cgroup_cpuset)
{
/* reset default group, add cpu cores to it */
char cpuset[MaxCpuSetLength];
ResGroupOps_GetCpuSet(DEFAULT_CPUSET_GROUP_ID,
cpuset, MaxCpuSetLength);
CpusetUnion(cpuset, group->caps.cpuset, MaxCpuSetLength);
ResGroupOps_SetCpuSet(DEFAULT_CPUSET_GROUP_ID, cpuset);
}
}
ResGroupOps_DestroyGroup(callbackCtx->groupid, migrate);
}
}
PG_CATCH();
{
InterruptHoldoffCount = savedInterruptHoldoffCount;
if (elog_demote(WARNING))
{
EmitErrorReport();
FlushErrorState();
}
else
{
elog(LOG, "unable to demote error");
}
}
PG_END_TRY();
LWLockRelease(ResGroupLock);
}
/*
* Remove the resource group entry in shared memory if the transaction is aborted.
*
* This function is called in the callback function of CREATE RESOURCE GROUP.
*/
void
ResGroupCreateOnAbort(const ResourceGroupCallbackContext *callbackCtx)
{
volatile int savedInterruptHoldoffCount;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
PG_TRY();
{
savedInterruptHoldoffCount = InterruptHoldoffCount;
removeGroup(callbackCtx->groupid);
/* remove the os dependent part for this resource group */
ResGroupOps_DestroyGroup(callbackCtx->groupid, true);
if (!CpusetIsEmpty(callbackCtx->caps.cpuset) &&
gp_resource_group_enable_cgroup_cpuset)
{
/* return cpu cores to default group */
char defaultGroupCpuset[MaxCpuSetLength];
ResGroupOps_GetCpuSet(DEFAULT_CPUSET_GROUP_ID,
defaultGroupCpuset,
MaxCpuSetLength);
CpusetUnion(defaultGroupCpuset,
callbackCtx->caps.cpuset,
MaxCpuSetLength);
ResGroupOps_SetCpuSet(DEFAULT_CPUSET_GROUP_ID, defaultGroupCpuset);
}
}
PG_CATCH();
{
InterruptHoldoffCount = savedInterruptHoldoffCount;
if (elog_demote(WARNING))
{
EmitErrorReport();
FlushErrorState();
}
else
{
elog(LOG, "unable to demote error");
}
}
PG_END_TRY();
LWLockRelease(ResGroupLock);
}
/*
* Apply the new resgroup caps.
*/
void
ResGroupAlterOnCommit(const ResourceGroupCallbackContext *callbackCtx)
{
ResGroupData *group;
volatile int savedInterruptHoldoffCount;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
PG_TRY();
{
savedInterruptHoldoffCount = InterruptHoldoffCount;
group = groupHashFind(callbackCtx->groupid, true);
group->caps = callbackCtx->caps;
if (callbackCtx->limittype == RESGROUP_LIMIT_TYPE_CPU)
{
ResGroupOps_SetCpuRateLimit(callbackCtx->groupid,
callbackCtx->caps.cpuRateLimit);
}
else if (callbackCtx->limittype == RESGROUP_LIMIT_TYPE_CPUSET)
{
if (gp_resource_group_enable_cgroup_cpuset)
ResGroupOps_SetCpuSet(callbackCtx->groupid,
callbackCtx->caps.cpuset);
}
else if (callbackCtx->limittype != RESGROUP_LIMIT_TYPE_MEMORY_SPILL_RATIO)
{
Assert(pResGroupControl->totalChunks > 0);
ResGroupCap memLimitGap = 0;
if (callbackCtx->limittype == RESGROUP_LIMIT_TYPE_MEMORY)
memLimitGap = callbackCtx->oldCaps.memLimit - callbackCtx->caps.memLimit;
group->memGap += pResGroupControl->totalChunks * memLimitGap / 100;
Assert(group->groupMemOps != NULL);
if (group->groupMemOps->group_mem_on_alter)
group->groupMemOps->group_mem_on_alter(callbackCtx->groupid, group);
}
/* reset default group if cpuset has changed */
if (strcmp(callbackCtx->oldCaps.cpuset, callbackCtx->caps.cpuset) &&
gp_resource_group_enable_cgroup_cpuset)
{
char defaultCpusetGroup[MaxCpuSetLength];
/* get current default group value */
ResGroupOps_GetCpuSet(DEFAULT_CPUSET_GROUP_ID,
defaultCpusetGroup,
MaxCpuSetLength);
/* Add old value to default group
* sub new value from default group */
CpusetUnion(defaultCpusetGroup,
callbackCtx->oldCaps.cpuset,
MaxCpuSetLength);
CpusetDifference(defaultCpusetGroup,
callbackCtx->caps.cpuset,
MaxCpuSetLength);
ResGroupOps_SetCpuSet(DEFAULT_CPUSET_GROUP_ID, defaultCpusetGroup);
}
}
PG_CATCH();
{
InterruptHoldoffCount = savedInterruptHoldoffCount;
if (elog_demote(WARNING))
{
EmitErrorReport();
FlushErrorState();
}
else
{
elog(LOG, "unable to demote error");
}
}
PG_END_TRY();
LWLockRelease(ResGroupLock);
}
bool
ResGroupIsAssigned(void)
{
return selfIsAssigned();
}
/*
* Get resource group id of my proc.
*
* Returns InvalidOid in any of below cases:
* - resource group is not enabled;
* - resource group is not activated (initialized);
* - my proc is not running inside a transaction;
* - my proc is not assigned a resource group yet;
*
* Otherwise a valid resource group id is returned.
*
* This function is not dead code although there is no consumer in the gpdb
* code tree. Some extensions require this to get the internal resource group
* information.
*/
Oid
GetMyResGroupId(void)
{
return self->groupId;
}
int32
ResGroupGetVmemLimitChunks(void)
{
Assert(IsResGroupEnabled());
return pResGroupControl->totalChunks;
}
int32
ResGroupGetVmemChunkSizeInBits(void)
{
Assert(IsResGroupEnabled());
return pResGroupControl->chunkSizeInBits;
}
int32
ResGroupGetMaxChunksPerQuery(void)
{
return ceil(gp_vmem_limit_per_query /
(1024.0 * (1 << (pResGroupControl->chunkSizeInBits - BITS_IN_MB))));
}
/*
* Retrieve statistic information of type from resource group
*/
Datum
ResGroupGetStat(Oid groupId, ResGroupStatType type)
{
ResGroupData *group;
Interval *interval;
Datum result;
Assert(IsResGroupActivated());
LWLockAcquire(ResGroupLock, LW_SHARED);
group = groupHashFind(groupId, true);
switch (type)
{
case RES_GROUP_STAT_NRUNNING:
result = Int32GetDatum(group->nRunning + group->nRunningBypassed);
break;
case RES_GROUP_STAT_NQUEUEING:
result = Int32GetDatum(group->waitProcs.size);
break;
case RES_GROUP_STAT_TOTAL_EXECUTED:
result = Int32GetDatum(group->totalExecuted);
break;
case RES_GROUP_STAT_TOTAL_QUEUED:
result = Int32GetDatum(group->totalQueued);
break;
case RES_GROUP_STAT_TOTAL_QUEUE_TIME:
/*
* Turn milliseconds in totalQueuedTimeMs into an Interval.
*
* Note: we only use the 'time' field. The user can call
* justify_interval() if she wants.
*/
interval = (Interval *) palloc(sizeof(Interval));
interval->time = group->totalQueuedTimeMs * 1000;
interval->day = 0;
interval->month = 0;
result = IntervalPGetDatum(interval);
break;
case RES_GROUP_STAT_MEM_USAGE:
result = CStringGetDatum(groupDumpMemUsage(group));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("invalid stat type %d", type)));
}
LWLockRelease(ResGroupLock);
return result;
}
/*
* Get the number of primary segments on this host
*/
int
ResGroupGetHostPrimaryCount()
{
return (Gp_role == GP_ROLE_EXECUTE ? host_primary_segment_count : pResGroupControl->segmentsOnMaster);
}
static char *
groupDumpMemUsage(ResGroupData *group)
{
StringInfoData memUsage;
initStringInfo(&memUsage);
Assert(group->groupMemOps != NULL);
if (group->groupMemOps->group_mem_on_dump)
group->groupMemOps->group_mem_on_dump(group, &memUsage);
return memUsage.data;
}
/*
* Dump memory information for current resource group.
*/
void
ResGroupDumpMemoryInfo(void)
{
ResGroupSlotData *slot = self->slot;
ResGroupData *group = self->group;
if (group)
{
Assert(selfIsAssigned());
write_log("Resource group memory information: "
"current group id is %u, "
"memLimit cap is %d, "
"memSharedQuota cap is %d, "
"memSpillRatio cap is %d, "
"group expected memory limit is %d MB, "
"memory quota granted in currenct group is %d MB, "
"shared quota granted in current group is %d MB, "
"memory assigned to all running slots is %d MB, "
"memory usage in current group is %d MB, "
"memory shared usage in current group is %d MB, "
"memory quota in current slot is %d MB, "
"memory usage in current slot is %d MB, "
"memory usage in current proc is %d MB",
group->groupId,
group->caps.memLimit,
group->caps.memSharedQuota,
group->caps.memSpillRatio,
VmemTracker_ConvertVmemChunksToMB(group->memExpected),
VmemTracker_ConvertVmemChunksToMB(group->memQuotaGranted),
VmemTracker_ConvertVmemChunksToMB(group->memSharedGranted),
VmemTracker_ConvertVmemChunksToMB(group->memQuotaUsed),
VmemTracker_ConvertVmemChunksToMB(group->memUsage),
VmemTracker_ConvertVmemChunksToMB(group->memSharedUsage),
VmemTracker_ConvertVmemChunksToMB(slot->memQuota),
VmemTracker_ConvertVmemChunksToMB(slot->memUsage),
VmemTracker_ConvertVmemChunksToMB(self->memUsage));
}
else
{
Assert(!selfIsAssigned());
write_log("Resource group memory information: "
"memory usage in current proc is %d MB",
VmemTracker_ConvertVmemChunksToMB(self->memUsage));
}
}
/*
* Reserve 'memoryChunks' number of chunks for current resource group.
* It will first try to reserve memory from the resource group slot; if the slot
* quota exceeded, it will reserve memory from the shared zone. It fails if the
* shared quota is also exceeded, and no memory is reserved.
*
* 'overuseChunks' number of chunks can be overused for error handling,
* in such a case waiverUsed is marked as true.
*/
bool
ResGroupReserveMemory(int32 memoryChunks, int32 overuseChunks, bool *waiverUsed)
{
int32 overuseMem;
ResGroupSlotData *slot = self->slot;
ResGroupData *group = self->group;
/*
* Memories may be allocated before resource group is initialized,
* however,we need to track those memories once resource group is
* enabled, so we use IsResGroupEnabled() instead of
* IsResGroupActivated() here.
*/
if (!IsResGroupEnabled())
return true;
Assert(memoryChunks >= 0);
/*
* Bypass the limit check when we are not in a valid resource group.
* But will update the memory usage of this proc, and it will be added up
* when this proc is assigned to a valid resource group.
*/
self->memUsage += memoryChunks;
if (bypassedGroup)
{
/*
* Do not allow to allocate more than the per proc limit.
*/
if (self->memUsage > self->bypassMemoryLimit)
{
self->memUsage -= memoryChunks;
return false;
}
/*
* Set group & slot to bypassed ones so we could follow the limitation
* checking logic as normal transactions.
*/
group = bypassedGroup;
slot = &bypassedSlot;
}
else if (!selfIsAssigned())
return true;
Assert(bypassedGroup || slotIsInUse(slot));
Assert(group->memUsage >= 0);
Assert(self->memUsage >= 0);
/* add memoryChunks into group & slot memory usage */
overuseMem = groupIncMemUsage(group, slot, memoryChunks);
/* then check whether there is over usage */
if (CritSectionCount == 0)
{
if (overuseMem > overuseChunks)
{
/* if the over usage is larger than allowed then revert the change */
groupDecMemUsage(group, slot, memoryChunks);
/* also revert in proc */
self->memUsage -= memoryChunks;
Assert(self->memUsage >= 0);
if (overuseChunks == 0)
ResGroupDumpMemoryInfo();
return false;
}
else if (overuseMem > 0)
{
/* the over usage is within the allowed threshold */
*waiverUsed = true;
}
}
return true;
}
/*
* Release the memory of resource group
*/
void
ResGroupReleaseMemory(int32 memoryChunks)
{
ResGroupSlotData *slot = self->slot;
ResGroupData *group = self->group;
if (!IsResGroupEnabled())
return;
Assert(memoryChunks >= 0);
Assert(memoryChunks <= self->memUsage);
self->memUsage -= memoryChunks;
if (bypassedGroup)
{
/*
* Set group & slot to bypassed ones so we could follow the release
* logic as normal transactions.
*/
group = bypassedGroup;
slot = &bypassedSlot;
}
else if (!selfIsAssigned())
return;
Assert(bypassedGroup || slotIsInUse(slot));
groupDecMemUsage(group, slot, memoryChunks);
}
int64
ResourceGroupGetQueryMemoryLimit(void)
{
ResGroupSlotData *slot = self->slot;
int64 memSpill;
Assert(Gp_role == GP_ROLE_DISPATCH);
if (bypassedGroup)
{
int64 bytesInMB = 1 << BITS_IN_MB;
int64 bytesInChunk = (int64) 1 << VmemTracker_GetChunkSizeInBits();
/*
* In bypass mode there is a hard memory limit of
* RESGROUP_BYPASS_MODE_MEMORY_LIMIT_ON_QE chunk,
* we should make sure query_mem + misc mem <= chunk.
*/
return Min(bytesInMB,
bytesInChunk * RESGROUP_BYPASS_MODE_MEMORY_LIMIT_ON_QE / 2);
}
Assert(selfIsAssigned());
if (IsResManagerMemoryPolicyNone())
return 0;
memSpill = slotGetMemSpill(&slot->caps);
/* memSpill is already converted to chunks */
Assert(memSpill >= 0);
return memSpill << VmemTracker_GetChunkSizeInBits();
}
/*
* removeGroup -- remove resource group from share memory and
* reclaim the group's memory back to MEM POOL.
*/
static void
removeGroup(Oid groupId)
{
ResGroupData *group;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(OidIsValid(groupId));
group = groupHashRemove(groupId);
Assert(group->groupMemOps != NULL);
if (group->groupMemOps->group_mem_on_drop)
group->groupMemOps->group_mem_on_drop(groupId, group);
group->groupId = InvalidOid;
notifyGroupsOnMem(groupId);
}
/*
* createGroup -- initialize the elements for a resource group.
*
* Notes:
* It is expected that the appropriate lightweight lock is held before
* calling this - unless we are the startup process.
*/
static ResGroupData *
createGroup(Oid groupId, const ResGroupCaps *caps)
{
ResGroupData *group;
int32 chunks;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(OidIsValid(groupId));
group = groupHashNew(groupId);
Assert(group != NULL);
group->groupId = groupId;
group->caps = *caps;
group->nRunning = 0;
group->nRunningBypassed = 0;
ProcQueueInit(&group->waitProcs);
group->totalExecuted = 0;
group->totalQueued = 0;
group->memGap = 0;
group->memUsage = 0;
group->memSharedUsage = 0;
group->memQuotaUsed = 0;
group->groupMemOps = NULL;
group->totalQueuedTimeMs = 0;
group->lockedForDrop = false;
group->memQuotaGranted = 0;
group->memSharedGranted = 0;
group->memExpected = groupGetMemExpected(caps);
chunks = mempoolReserve(groupId, group->memExpected);
groupRebalanceQuota(group, chunks, caps);
bindGroupOperation(group);
return group;
}
/*
* Bind operation to resource group according to memory auditor.
*/
static void
bindGroupOperation(ResGroupData *group)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
if (group->caps.memAuditor == RESGROUP_MEMORY_AUDITOR_VMTRACKER)
group->groupMemOps = &resgroup_memory_operations_vmtracker;
else if (group->caps.memAuditor == RESGROUP_MEMORY_AUDITOR_CGROUP)
group->groupMemOps = &resgroup_memory_operations_cgroup;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid memory auditor: %d", group->caps.memAuditor)));
}
/*
* Add chunks into group and slot memory usage.
*
* Return the total over used chunks of global share
*/
static int32
groupIncMemUsage(ResGroupData *group, ResGroupSlotData *slot, int32 chunks)
{
int32 slotMemUsage; /* the memory current slot has been used */
int32 sharedMemUsage; /* the total shared memory usage,
sum of group share and global share */
int32 globalOveruse = 0; /* the total over used chunks of global share*/
/* Add the chunks to memUsage in slot */
slotMemUsage = pg_atomic_add_fetch_u32((pg_atomic_uint32 *) &slot->memUsage,
chunks);
/* Check whether shared memory should be added */
sharedMemUsage = slotMemUsage - slot->memQuota;
if (sharedMemUsage > 0)
{
/* Decide how many chunks should be counted as shared memory */
int32 deltaSharedMemUsage = Min(sharedMemUsage, chunks);
/* Add these chunks to memSharedUsage in group,
* and record the old value*/
int32 oldSharedUsage = pg_atomic_fetch_add_u32((pg_atomic_uint32 *)
&group->memSharedUsage,
deltaSharedMemUsage);
/* the free space of group share */
int32 oldSharedFree = Max(0, group->memSharedGranted - oldSharedUsage);
/* Calculate the global over used chunks */
int32 deltaGlobalSharedMemUsage = Max(0, deltaSharedMemUsage - oldSharedFree);
/* freeChunks -= deltaGlobalSharedMemUsage and get the new value */
int32 newFreeChunks = pg_atomic_sub_fetch_u32(&pResGroupControl->freeChunks,
deltaGlobalSharedMemUsage);
/* calculate the total over used chunks of global share */
globalOveruse = Max(0, 0 - newFreeChunks);
}
/* Add the chunks to memUsage in group */
pg_atomic_add_fetch_u32((pg_atomic_uint32 *) &group->memUsage,
chunks);
return globalOveruse;
}
/*
* Sub chunks from group ,slot memory usage and global shared memory.
* return memory chunks of global shared released this time
*/
static int32
groupDecMemUsage(ResGroupData *group, ResGroupSlotData *slot, int32 chunks)
{
int32 value;
int32 slotMemUsage;
int32 sharedMemUsage;
/* Sub chunks from memUsage in group */
value = pg_atomic_sub_fetch_u32((pg_atomic_uint32 *) &group->memUsage,
chunks);
Assert(value >= 0);
/* Sub chunks from memUsage in slot */
slotMemUsage = pg_atomic_fetch_sub_u32((pg_atomic_uint32 *) &slot->memUsage,
chunks);
Assert(slotMemUsage >= chunks);
/* Check whether shared memory should be subed */
sharedMemUsage = slotMemUsage - slot->memQuota;
if (sharedMemUsage > 0)
{
/* Decide how many chunks should be counted as shared memory */
int32 deltaSharedMemUsage = Min(sharedMemUsage, chunks);
/* Sub chunks from memSharedUsage in group */
int32 oldSharedUsage = pg_atomic_fetch_sub_u32((pg_atomic_uint32 *) &group->memSharedUsage,
deltaSharedMemUsage);
/* record the total global share usage of current group */
int32 grpTotalGlobalUsage = Max(0, oldSharedUsage - group->memSharedGranted);
/* calculate the global share usage of current release */
int32 deltaGlobalSharedMemUsage = Min(grpTotalGlobalUsage, deltaSharedMemUsage);
/* add chunks to global shared memory */
pg_atomic_add_fetch_u32(&pResGroupControl->freeChunks,
deltaGlobalSharedMemUsage);
return deltaGlobalSharedMemUsage;
}
return 0;
}
/*
* Add the chunks of a slot in a group, it's used when move a query to a resource group
*
* Return the total over used chunks of global share
*/
static int32
groupIncSlotMemUsage(ResGroupData *group, ResGroupSlotData *slot)
{
int32 slotSharedMemUsage; /* the slot shared memory usage */
int32 globalOveruse = 0; /* the total over used chunks of global share*/
/* Check whether shared memory should be added */
slotSharedMemUsage = slot->memUsage - slot->memQuota;
if (slotSharedMemUsage > 0)
{
/* Add these chunks to memSharedUsage in group,
* and record the old value*/
int32 oldSharedUsage = pg_atomic_fetch_add_u32((pg_atomic_uint32 *)
&group->memSharedUsage,
slotSharedMemUsage);
/* the free space of group share */
int32 oldSharedFree = Max(0, group->memSharedGranted - oldSharedUsage);
/* Calculate the global over used chunks */
int32 deltaGlobalSharedMemUsage = Max(0, slotSharedMemUsage - oldSharedFree);
/* freeChunks -= deltaGlobalSharedMemUsage and get the new value */
int32 newFreeChunks = pg_atomic_sub_fetch_u32(&pResGroupControl->freeChunks,
deltaGlobalSharedMemUsage);
/* calculate the total over used chunks of global share */
globalOveruse = Max(0, 0 - newFreeChunks);
}
/* Add the chunks to memUsage in group */
pg_atomic_add_fetch_u32((pg_atomic_uint32 *) &group->memUsage, slot->memUsage);
return globalOveruse;
}
/*
* Deduct the chunks of a slot in a group, it's used when move a query to a resource group
*/
static void
groupDecSlotMemUsage(ResGroupData *group, ResGroupSlotData *slot)
{
int32 value;
int32 slotSharedMemUsage;
/* Sub chunks from memUsage in group */
value = pg_atomic_sub_fetch_u32((pg_atomic_uint32 *) &group->memUsage,
slot->memUsage);
Assert(value >= 0);
/* Check whether shared memory should be subed */
slotSharedMemUsage = slot->memUsage - slot->memQuota;
if (slotSharedMemUsage <= 0)
return;
/* Sub chunks from memSharedUsage in group */
int32 oldSharedUsage = pg_atomic_fetch_sub_u32((pg_atomic_uint32 *) &group->memSharedUsage,
slotSharedMemUsage);
Assert(oldSharedUsage >= slotSharedMemUsage);
/* record the total global share usage of current group */
int32 grpTotalGlobalUsage = Max(0, oldSharedUsage - group->memSharedGranted);
/* calculate the global share usage of current release */
int32 deltaGlobalSharedMemUsage = Min(grpTotalGlobalUsage, slotSharedMemUsage);
/* add chunks to global shared memory */
pg_atomic_add_fetch_u32(&pResGroupControl->freeChunks,
deltaGlobalSharedMemUsage);
}
/*
* Attach a process (QD or QE) to a slot.
*/
static void
selfAttachResGroup(ResGroupData *group, ResGroupSlotData *slot)
{
selfSetGroup(group);
selfSetSlot(slot);
groupIncMemUsage(group, slot, self->memUsage);
pg_atomic_add_fetch_u32((pg_atomic_uint32*) &slot->nProcs, 1);
}
/*
* Detach a process (QD or QE) from a slot.
*/
static void
selfDetachResGroup(ResGroupData *group, ResGroupSlotData *slot)
{
groupDecMemUsage(group, slot, self->memUsage);
pg_atomic_sub_fetch_u32((pg_atomic_uint32*) &slot->nProcs, 1);
selfUnsetSlot();
selfUnsetGroup();
}
/*
* Initialize the members of a slot
*/
static void
initSlot(ResGroupSlotData *slot, ResGroupData *group, int32 slotMemQuota)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!slotIsInUse(slot));
Assert(group->groupId != InvalidOid);
slot->group = group;
slot->groupId = group->groupId;
slot->caps = group->caps;
slot->memQuota = slotMemQuota;
slot->memUsage = 0;
}
/*
* Alloc and initialize slot pool
*/
static bool
slotpoolInit(void)
{
ResGroupSlotData *slot;
ResGroupSlotData *next;
int numSlots;
int memSize;
int i;
numSlots = RESGROUP_MAX_SLOTS;
memSize = mul_size(numSlots, sizeof(ResGroupSlotData));
pResGroupControl->slots = ShmemAlloc(memSize);
if (!pResGroupControl->slots)
return false;
MemSet(pResGroupControl->slots, 0, memSize);
/* push all the slots into the list */
next = NULL;
for (i = numSlots - 1; i >= 0; i--)
{
slot = &pResGroupControl->slots[i];
slot->group = NULL;
slot->groupId = InvalidOid;
slot->memQuota = -1;
slot->memUsage = 0;
slot->next = next;
next = slot;
}
pResGroupControl->freeSlot = next;
return true;
}
/*
* Alloc a slot from shared slot pool
*/
static ResGroupSlotData *
slotpoolAllocSlot(void)
{
ResGroupSlotData *slot;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(pResGroupControl->freeSlot != NULL);
slot = pResGroupControl->freeSlot;
pResGroupControl->freeSlot = slot->next;
slot->next = NULL;
return slot;
}
/*
* Free a slot back to shared slot pool
*/
static void
slotpoolFreeSlot(ResGroupSlotData *slot)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(slotIsInUse(slot));
Assert(slot->nProcs == 0);
slot->group = NULL;
slot->groupId = InvalidOid;
slot->memQuota = -1;
slot->memUsage = 0;
slot->next = pResGroupControl->freeSlot;
pResGroupControl->freeSlot = slot;
}
/*
* Get a slot with memory quota granted.
*
* A slot can be got with this function if there is enough memory quota
* available and the concurrency limit is not reached.
*
* On success the memory quota is marked as granted, nRunning is increased
* and the slot's groupId is also set accordingly, the slot id is returned.
*
* On failure nothing is changed and InvalidSlotId is returned.
*/
static ResGroupSlotData *
groupGetSlot(ResGroupData *group)
{
ResGroupSlotData *slot;
ResGroupCaps *caps;
int32 slotMemQuota;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(groupIsNotDropped(group));
caps = &group->caps;
/* First check if the concurrency limit is reached */
if (group->nRunning >= caps->concurrency)
return NULL;
slotMemQuota = groupReserveMemQuota(group);
if (slotMemQuota < 0)
return NULL;
/* Now actually get a free slot */
slot = slotpoolAllocSlot();
Assert(!slotIsInUse(slot));
initSlot(slot, group, slotMemQuota);
group->nRunning++;
return slot;
}
/*
* Put back the slot assigned to self.
*
* This will release a slot, its memory quota will be freed and
* nRunning will be decreased.
*/
static void
groupPutSlot(ResGroupData *group, ResGroupSlotData *slot)
{
int32 released;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(group->memQuotaUsed >= 0);
Assert(slotIsInUse(slot));
/* Return the memory quota granted to this slot */
groupReleaseMemQuota(group, slot);
/* Return the slot back to free list */
slotpoolFreeSlot(slot);
group->nRunning--;
/* And finally release the overused memory quota */
released = mempoolAutoRelease(group);
if (released > 0)
notifyGroupsOnMem(group->groupId);
/*
* Once we have waken up other groups then the slot we just released
* might be reused, so we should not touch it anymore since now.
*/
}
/*
* Reserve memory quota for a slot in group.
*
* If there is not enough free memory quota then return -1 and nothing
* is changed; otherwise return the reserved quota size.
*/
static int32
groupReserveMemQuota(ResGroupData *group)
{
ResGroupCaps *caps;
int32 slotMemQuota;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(pResGroupControl->segmentsOnMaster > 0);
caps = &group->caps;
mempoolAutoReserve(group, caps);
/* Calculate the expected per slot quota */
slotMemQuota = slotGetMemQuotaExpected(caps);
Assert(slotMemQuota >= 0);
Assert(group->memQuotaUsed >= 0);
Assert(group->memQuotaUsed <= group->memQuotaGranted);
if (group->memQuotaUsed + slotMemQuota > group->memQuotaGranted)
{
/* No enough memory quota available, give up */
return -1;
}
group->memQuotaUsed += slotMemQuota;
return slotMemQuota;
}
/*
* Release a slot's memory quota to group.
*/
static void
groupReleaseMemQuota(ResGroupData *group, ResGroupSlotData *slot)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
group->memQuotaUsed -= slot->memQuota;
Assert(group->memQuotaUsed >= 0);
}
/*
* Pick a resource group for the current transaction.
*/
static Oid
decideResGroupId(void)
{
Oid groupId = InvalidOid;
if (resgroup_assign_hook)
groupId = resgroup_assign_hook();
if (groupId == InvalidOid)
groupId = GetResGroupIdForRole(GetUserId());
return groupId;
}
/*
* Decide the proper resource group for current role.
*
* An exception is thrown if current role is invalid.
*/
static void
decideResGroup(ResGroupInfo *pGroupInfo)
{
ResGroupData *group;
Oid groupId;
Assert(pResGroupControl != NULL);
Assert(pResGroupControl->segmentsOnMaster > 0);
Assert(Gp_role == GP_ROLE_DISPATCH);
/* always find out the up-to-date resgroup id */
groupId = decideResGroupId();
LWLockAcquire(ResGroupLock, LW_SHARED);
group = groupHashFind(groupId, false);
if (!group)
{
groupId = superuser() ? ADMINRESGROUP_OID : DEFAULTRESGROUP_OID;
group = groupHashFind(groupId, false);
}
Assert(group != NULL);
LWLockRelease(ResGroupLock);
pGroupInfo->group = group;
pGroupInfo->groupId = groupId;
}
/*
* Increase the bypassed ref count
*
* Return true if the operation is done, or false if the group is dropped.
*/
static bool
groupIncBypassedRef(ResGroupInfo *pGroupInfo)
{
ResGroupData *group = pGroupInfo->group;
bool result = false;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
/* Has the group been dropped? */
if (groupIsDropped(pGroupInfo))
goto end;
/* Is the group locked for drop? */
if (group->lockedForDrop)
goto end;
result = true;
pg_atomic_add_fetch_u32((pg_atomic_uint32 *) &group->nRunningBypassed, 1);
end:
LWLockRelease(ResGroupLock);
return result;
}
/*
* Decrease the bypassed ref count
*/
static void
groupDecBypassedRef(ResGroupData *group)
{
pg_atomic_sub_fetch_u32((pg_atomic_uint32 *) &group->nRunningBypassed, 1);
}
/*
* Acquire a resource group slot
*
* Call this function at the start of the transaction.
* This function set current resource group in MyResGroupSharedInfo,
* and current slot in MyProc->resSlot.
*/
static ResGroupSlotData *
groupAcquireSlot(ResGroupInfo *pGroupInfo, bool isMoveQuery)
{
ResGroupSlotData *slot;
ResGroupData *group;
Assert(!selfIsAssigned() || isMoveQuery);
group = pGroupInfo->group;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
/* Has the group been dropped? */
if (groupIsDropped(pGroupInfo))
{
LWLockRelease(ResGroupLock);
return NULL;
}
/* acquire a slot */
if (!group->lockedForDrop)
{
/* try to get a slot directly */
slot = groupGetSlot(group);
if (slot != NULL)
{
/* got one, lucky */
group->totalExecuted++;
LWLockRelease(ResGroupLock);
pgstat_report_resgroup(group->groupId);
return slot;
}
}
/*
* Add into group wait queue (if not waiting yet).
*/
Assert(!proc_exit_inprogress);
groupWaitQueuePush(group, MyProc);
if (!group->lockedForDrop)
group->totalQueued++;
LWLockRelease(ResGroupLock);
/*
* wait on the queue
* slot will be assigned by the proc wakes me up
* if i am waken up by DROP RESOURCE GROUP statement, the
* resSlot will be NULL.
*/
waitOnGroup(group, isMoveQuery);
if (MyProc->resSlot == NULL)
return NULL;
/*
* The waking process has granted us a valid slot.
* Update the statistic information of the resource group.
*/
slot = (ResGroupSlotData *) MyProc->resSlot;
MyProc->resSlot = NULL;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
addTotalQueueDuration(group);
group->totalExecuted++;
LWLockRelease(ResGroupLock);
pgstat_report_resgroup(group->groupId);
return slot;
}
/*
* Wake up the backends in the wait queue when 'concurrency' is increased.
* This function is called in the callback function of ALTER RESOURCE GROUP.
*
* Return TRUE if any memory quota or shared quota is returned to MEM POOL.
*/
static bool
groupApplyMemCaps(ResGroupData *group)
{
int32 reserved;
int32 released;
const ResGroupCaps *caps = &group->caps;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
group->memExpected = groupGetMemExpected(caps);
released = mempoolAutoRelease(group);
Assert(released >= 0);
/*
* suppose rg1 has memory_limit=10, memory_shared_quota=40,
* and session1 is running in rg1.
*
* now we alter rg1 memory_limit to 40 in another session,
* apparently both memory quota and shared quota are expected to increase,
* however as our design is to let them increase on new queries,
* then for session1 it won't see memory shared quota being increased
* until new queries being executed in rg1.
*
* so we should try to acquire the new quota immediately.
*/
reserved = mempoolAutoReserve(group, caps);
Assert(reserved >= 0);
return released > reserved;
}
/*
* Get quota from MEM POOL.
*
* chunks is the expected amount to get.
*
* return the actual got chunks, might be smaller than expectation.
*/
static int32
mempoolReserve(Oid groupId, int32 chunks)
{
int32 oldFreeChunks;
int32 newFreeChunks;
int32 reserved = 0;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
/* Compare And Save to avoid concurrency problem without using lock */
while (true)
{
oldFreeChunks = pg_atomic_read_u32(&pResGroupControl->freeChunks);
reserved = Min(Max(0, oldFreeChunks), chunks);
newFreeChunks = oldFreeChunks - reserved;
if (reserved == 0)
break;
if (pg_atomic_compare_exchange_u32(&pResGroupControl->freeChunks,
(uint32 *) &oldFreeChunks,
(uint32) newFreeChunks))
break;
}
/* also update the safeChunksThreshold which is used in runaway detector */
if (reserved != 0)
{
uint32 safeChunksThreshold100;
int safeChunksDelta100;
safeChunksThreshold100 = (uint32) pg_atomic_read_u32(&pResGroupControl->safeChunksThreshold100);
safeChunksDelta100 = reserved * (100 - runaway_detector_activation_percent);
if (safeChunksThreshold100 < safeChunksDelta100)
elog(ERROR, "safeChunksThreshold: %u should be positive after mempool reserved: %d",
safeChunksThreshold100, safeChunksDelta100);
pg_atomic_sub_fetch_u32(&pResGroupControl->safeChunksThreshold100, safeChunksDelta100);
}
LOG_RESGROUP_DEBUG(LOG, "allocate %u out of %u chunks to group %d",
reserved, oldFreeChunks, groupId);
Assert(newFreeChunks <= pResGroupControl->totalChunks);
return reserved;
}
/*
* Return chunks to MEM POOL.
*/
static void
mempoolRelease(Oid groupId, int32 chunks)
{
int32 newFreeChunks;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(chunks >= 0);
newFreeChunks = pg_atomic_add_fetch_u32(&pResGroupControl->freeChunks,
chunks);
/* also update the safeChunksThreshold which is used in runaway detector */
pg_atomic_add_fetch_u32(&pResGroupControl->safeChunksThreshold100,
chunks * (100 - runaway_detector_activation_percent));
LOG_RESGROUP_DEBUG(LOG, "free %u to pool(%u) chunks from group %d",
chunks, newFreeChunks - chunks, groupId);
Assert(newFreeChunks <= pResGroupControl->totalChunks);
}
/*
* Assign the chunks we get from the MEM POOL to group and rebalance
* them into the 'quota' and 'shared' part of the group, the amount
* is calculated from caps.
*/
static void
groupRebalanceQuota(ResGroupData *group, int32 chunks, const ResGroupCaps *caps)
{
int32 delta;
int32 memQuotaGranted = groupGetMemQuotaExpected(caps);
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
delta = memQuotaGranted - group->memQuotaGranted;
if (delta >= 0)
{
delta = Min(chunks, delta);
group->memQuotaGranted += delta;
chunks -= delta;
}
group->memSharedGranted += chunks;
}
/*
* Calculate the total memory chunks of the segment
*/
static void
decideTotalChunks(int32 *totalChunks, int32 *chunkSizeInBits)
{
int32 nsegments;
int32 tmptotalChunks;
int32 tmpchunkSizeInBits;
nsegments = Gp_role == GP_ROLE_EXECUTE ? host_primary_segment_count : pResGroupControl->segmentsOnMaster;
Assert(nsegments > 0);
tmptotalChunks = ResGroupOps_GetTotalMemory() * gp_resource_group_memory_limit / nsegments;
/*
* If vmem is larger than 16GB (i.e., 16K MB), we make the chunks bigger
* so that the vmem limit in chunks unit is not larger than 16K.
*/
tmpchunkSizeInBits = BITS_IN_MB;
while(tmptotalChunks > (16 * 1024))
{
tmpchunkSizeInBits++;
tmptotalChunks >>= 1;
}
*totalChunks = tmptotalChunks;
*chunkSizeInBits = tmpchunkSizeInBits;
}
/*
* Get total expected memory quota of a group in chunks
*/
static int32
groupGetMemExpected(const ResGroupCaps *caps)
{
Assert(pResGroupControl->totalChunks > 0);
return pResGroupControl->totalChunks * caps->memLimit / 100;
}
/*
* Get per-group expected memory quota in chunks
*/
static int32
groupGetMemQuotaExpected(const ResGroupCaps *caps)
{
if (caps->concurrency > 0)
return slotGetMemQuotaExpected(caps) * caps->concurrency;
else
return groupGetMemExpected(caps) *
(100 - caps->memSharedQuota) / 100;
}
/*
* Get per-group expected memory shared quota in chunks
*/
static int32
groupGetMemSharedExpected(const ResGroupCaps *caps)
{
return groupGetMemExpected(caps) - groupGetMemQuotaExpected(caps);
}
/*
* Get per-group expected memory spill in chunks
*/
static int32
groupGetMemSpillTotal(const ResGroupCaps *caps)
{
if (memory_spill_ratio != RESGROUP_FALLBACK_MEMORY_SPILL_RATIO)
/* memSpill is in percentage mode */
return groupGetMemExpected(caps) * memory_spill_ratio / 100;
else
/* memSpill is in fallback mode, return statement_mem instead */
return VmemTracker_ConvertVmemMBToChunks(statement_mem >> 10);
}
/*
* Get per-slot expected memory quota in chunks
*/
static int32
slotGetMemQuotaExpected(const ResGroupCaps *caps)
{
Assert(caps->concurrency != 0);
return groupGetMemExpected(caps) *
(100 - caps->memSharedQuota) / 100 /
caps->concurrency;
}
/*
* Get per-slot expected memory quota in chunks on QE.
*/
static int32
slotGetMemQuotaOnQE(const ResGroupCaps *caps, ResGroupData *group)
{
int nFreeSlots = caps->concurrency - group->nRunning;
/*
* On QE the runtime status must also be considered as it might have
* different caps with QD.
*/
if (nFreeSlots <= 0)
return Min(slotGetMemQuotaExpected(caps),
(group->memQuotaGranted - group->memQuotaUsed) / caps->concurrency);
else
return Min(slotGetMemQuotaExpected(caps),
(group->memQuotaGranted - group->memQuotaUsed) / nFreeSlots);
}
/*
* Get per-slot expected memory spill in chunks
*/
static int32
slotGetMemSpill(const ResGroupCaps *caps)
{
if (memory_spill_ratio != RESGROUP_FALLBACK_MEMORY_SPILL_RATIO)
{
/* memSpill is in percentage mode */
Assert(caps->concurrency != 0);
return groupGetMemSpillTotal(caps) / caps->concurrency;
}
else
{
/*
* memSpill is in fallback mode, it is an absolute value, no need to
* divide by concurrency.
*/
return groupGetMemSpillTotal(caps);
}
}
/*
* Attempt to wake up pending slots in the group.
*
* - grant indicates whether to grant the proc a slot;
* - release indicates whether to wake up the proc with the LWLock
* temporarily released;
*
* When grant is true we'll give up once no slot can be get,
* e.g. due to lack of free slot or enough memory quota.
*
* When grant is false all the pending procs will be woken up.
*/
static void
wakeupSlots(ResGroupData *group, bool grant)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
while (!groupWaitQueueIsEmpty(group))
{
PGPROC *waitProc;
ResGroupSlotData *slot = NULL;
if (grant)
{
/* try to get a slot for that proc */
slot = groupGetSlot(group);
if (slot == NULL)
/* if can't get one then give up */
break;
}
/* wake up one process in the wait queue */
waitProc = groupWaitQueuePop(group);
waitProc->resSlot = slot;
procWakeup(waitProc);
}
}
/*
* When a group returns chunks to MEM POOL, we need to:
* 1. For groups with vmtracker memory auditor, wake up the
* transactions waiting on them for memory quota.
* 2. For groups with cgroup memory auditor, increase their
* memory limit if needed.
*/
static void
notifyGroupsOnMem(Oid skipGroupId)
{
int i;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
for (i = 0; i < MaxResourceGroups; i++)
{
ResGroupData *group = &pResGroupControl->groups[i];
if (group->groupId == InvalidOid)
continue;
if (group->groupId == skipGroupId)
continue;
Assert(group->groupMemOps != NULL);
if (group->groupMemOps->group_mem_on_notify)
group->groupMemOps->group_mem_on_notify(group);
if (!pg_atomic_read_u32(&pResGroupControl->freeChunks))
break;
}
}
/*
* Release overused memory quota to MEM POOL.
*
* Both overused shared and non-shared memory quota will be released.
*
* If there was enough non-shared memory quota for free slots,
* then after this call there will still be enough non-shared memory quota.
*
* If this function is called after a slot is released, make sure that
* group->nRunning is updated before this function.
*
* Return the total released quota in chunks, can be 0.
*
* XXX: Some examples.
*
* Suppose concurrency is 10, running is 4,
* memory limit is 0.5, memory shared is 0.4
*
* assume currentSharedUsage is 0
*
* currentSharedStocks is 0.5*0.4 = 0.2
* memQuotaGranted is 0.5*0.6 = 0.3
* memStocksInuse is 0.5*0.4/10*6 = 0.12
* memStocksFree is 0.3 - 0.12 = 0.18
*
* * memLimit: 0.5 -> 0.4
* for memQuotaGranted we could free 0.18 - 0.4*0.6/10*6 = 0.18-0.144 = 0.036
* new memQuotaGranted is 0.3-0.036 = 0.264
* new memStocksFree is 0.18-0.036 = 0.144
* for memShared we could free currentSharedStocks - Max(currentSharedUsage, 0.4*0.4)=0.04
* new currentSharedStocks is 0.2-0.04 = 0.16
*
* * concurrency: 10 -> 20
* for memQuotaGranted we could free 0.144 - 0.4*0.6/20*16 = 0.144 - 0.24*0.8 = -0.048
* for memShared we could free currentSharedStocks - Max(currentSharedUsage, 0.4*0.4)=0.00
*
* * memShared: 0.4 -> 0.2
* for memQuotaGranted we could free 0.144 - 0.4*0.8/20*16 = 0.144 - 0.256 = -0.122
* for memShared we could free currentSharedUsage - Max(currentSharedUsage, 0.4*0.2)=0.08
* new currentSharedStocks is 0.16-0.08 = 0.08
*
* * memShared: 0.2 -> 0.6
* for memQuotaGranted we could free 0.144 - 0.4*0.4/20*16 = 0.144 - 0.128 = 0.016
* new memQuotaGranted is 0.264 - 0.016 = 0.248
* new memStocksFree is 0.144 - 0.016 = 0.128
* for memShared we could free currentSharedUsage - Max(currentSharedUsage, 0.4*0.6) = -0.18
*
* * memLimit: 0.4 -> 0.2
* for memQuotaGranted we could free 0.128 - 0.2*0.4/20*16 = 0.128 - 0.064 = 0.064
* new memQuotaGranted is 0.248-0.064 = 0.184
* new memStocksFree is 0.128 - 0.064 = 0.064
* for memShared we could free currentSharedStocks - Max(currentSharedUsage, 0.2*0.6) = -0.04
*/
static int32
mempoolAutoRelease(ResGroupData *group)
{
int32 memQuotaNeeded;
int32 memQuotaToFree;
int32 memSharedNeeded;
int32 memSharedToFree;
int32 nfreeSlots;
ResGroupCaps *caps = &group->caps;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
/* nfreeSlots is the number of free slots */
nfreeSlots = caps->concurrency - group->nRunning;
/* the in use non-shared quota must be reserved */
memQuotaNeeded = group->memQuotaUsed;
/* also should reserve enough non-shared quota for free slots */
memQuotaNeeded +=
nfreeSlots > 0 ? slotGetMemQuotaExpected(caps) * nfreeSlots : 0;
memQuotaToFree = group->memQuotaGranted - memQuotaNeeded;
if (memQuotaToFree > 0)
{
/* release the over used non-shared quota to MEM POOL */
mempoolRelease(group->groupId, memQuotaToFree);
group->memQuotaGranted -= memQuotaToFree;
}
memSharedNeeded = Max(group->memSharedUsage,
groupGetMemSharedExpected(caps));
memSharedToFree = group->memSharedGranted - memSharedNeeded;
if (memSharedToFree > 0)
{
/* release the over used shared quota to MEM POOL */
mempoolRelease(group->groupId, memSharedToFree);
group->memSharedGranted -= memSharedToFree;
}
return Max(memQuotaToFree, 0) + Max(memSharedToFree, 0);
}
/*
* Try to acquire enough quota & shared quota for current group from MEM POOL,
* the actual acquired quota depends on system loads.
*
* Return the reserved quota in chunks, can be 0.
*/
static int32
mempoolAutoReserve(ResGroupData *group, const ResGroupCaps *caps)
{
int32 currentMemStocks = group->memSharedGranted + group->memQuotaGranted;
int32 neededMemStocks = group->memExpected - currentMemStocks;
int32 chunks = 0;
if (neededMemStocks > 0)
{
chunks = mempoolReserve(group->groupId, neededMemStocks);
groupRebalanceQuota(group, chunks, caps);
}
return chunks;
}
/* Update the total queued time of this group */
static void
addTotalQueueDuration(ResGroupData *group)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
if (group == NULL)
return;
group->totalQueuedTimeMs += (groupWaitEnd - groupWaitEnd);
}
/*
* Release the resource group slot
*
* Call this function at the end of the transaction.
*/
static void
groupReleaseSlot(ResGroupData *group, ResGroupSlotData *slot, bool isMoveQuery)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!selfIsAssigned() || isMoveQuery);
groupPutSlot(group, slot);
/*
* We should wake up other pending queries on master nodes.
*/
if (IS_QUERY_DISPATCHER())
/*
* My slot is put back, then how many queuing queries should I wake up?
* Maybe zero, maybe one, maybe more, depends on how the resgroup's
* configuration were changed during our execution.
*/
wakeupSlots(group, true);
}
/*
* Serialize the resource group information that need to dispatch to segment.
*/
void
SerializeResGroupInfo(StringInfo str)
{
unsigned int cpuset_len;
int32 itmp;
ResGroupCaps empty_caps;
ResGroupCaps *caps;
if (selfIsAssigned())
caps = &self->caps;
else
{
ClearResGroupCaps(&empty_caps);
caps = &empty_caps;
}
itmp = htonl(self->groupId);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
itmp = htonl(caps->concurrency);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
itmp = htonl(caps->cpuRateLimit);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
itmp = htonl(caps->memLimit);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
itmp = htonl(caps->memSharedQuota);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
itmp = htonl(caps->memSpillRatio);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
itmp = htonl(caps->memAuditor);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
cpuset_len = strlen(caps->cpuset);
itmp = htonl(cpuset_len);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
appendBinaryStringInfo(str, caps->cpuset, cpuset_len);
itmp = htonl(bypassedSlot.groupId);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(itmp));
}
/*
* Deserialize the resource group information dispatched by QD.
*/
void
DeserializeResGroupInfo(struct ResGroupCaps *capsOut,
Oid *groupId,
const char *buf,
int len)
{
int32 itmp;
unsigned int cpuset_len;
const char *ptr = buf;
Assert(len > 0);
ClearResGroupCaps(capsOut);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
*groupId = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
capsOut->concurrency = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
capsOut->cpuRateLimit = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
capsOut->memLimit = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
capsOut->memSharedQuota = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
capsOut->memSpillRatio = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
capsOut->memAuditor = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
cpuset_len = ntohl(itmp);
if (cpuset_len >= sizeof(capsOut->cpuset))
elog(ERROR, "malformed serialized resource group info");
memcpy(capsOut->cpuset, ptr, cpuset_len); ptr += cpuset_len;
capsOut->cpuset[cpuset_len] = '\0';
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
bypassedSlot.groupId = ntohl(itmp);
Assert(len == ptr - buf);
}
/*
* Check whether resource group should be assigned on master.
*/
bool
ShouldAssignResGroupOnMaster(void)
{
/*
* Bypass resource group when it's waiting for a resource group slot. e.g.
* MyProc was interrupted by SIGTERM while waiting for resource group slot.
* Some callbacks - RemoveTempRelationsCallback for example - open new
* transactions on proc exit. It can cause a double add of MyProc to the
* waiting queue (and its corruption).
*
* Also bypass resource group when it's exiting.
*/
return IsResGroupActivated() &&
IsNormalProcessingMode() &&
Gp_role == GP_ROLE_DISPATCH &&
!proc_exit_inprogress &&
!procIsWaiting(MyProc);
}
/*
* Check whether resource group should be un-assigned.
* This will be called on both master and segments.
*/
bool
ShouldUnassignResGroup(void)
{
return IsResGroupActivated() &&
IsNormalProcessingMode() &&
(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE);
}
/*
* On master, QD is assigned to a resource group at the beginning of a transaction.
* It will first acquire a slot from the resource group, and then, it will get the
* current capability snapshot, update the memory usage information, and add to
* the corresponding cgroup.
*/
void
AssignResGroupOnMaster(void)
{
ResGroupSlotData *slot;
ResGroupInfo groupInfo;
Assert(Gp_role == GP_ROLE_DISPATCH);
/*
* if query should be bypassed, do not assign a
* resource group, leave self unassigned
*/
if (shouldBypassQuery(debug_query_string))
{
/*
* Although we decide to bypass this query we should load the
* memory_spill_ratio setting from the resgroup, otherwise a
* `SHOW memory_spill_ratio` command will output the default value 20
* if it's the first query in the connection (make sure tab completion
* is not triggered otherwise it will run some implicit query before
* you execute the SHOW command).
*
* Also need to increase a bypassed ref count to prevent the group
* being dropped concurrently.
*/
do {
decideResGroup(&groupInfo);
} while (!groupIncBypassedRef(&groupInfo));
/* Record which resgroup we are running in */
bypassedGroup = groupInfo.group;
/* Update pg_stat_activity statistics */
bypassedGroup->totalExecuted++;
pgstat_report_resgroup(bypassedGroup->groupId);
/* Initialize the fake slot */
bypassedSlot.group = groupInfo.group;
bypassedSlot.groupId = groupInfo.groupId;
bypassedSlot.memQuota = 0;
bypassedSlot.memUsage = 0;
/* Attach self memory usage to resgroup */
groupIncMemUsage(bypassedGroup, &bypassedSlot, self->memUsage);
/* Record the bypass memory limit of current query */
self->bypassMemoryLimit = self->memUsage + RESGROUP_BYPASS_MODE_MEMORY_LIMIT_ON_QD;
/* Add into cgroup */
ResGroupOps_AssignGroup(bypassedGroup->groupId,
&bypassedGroup->caps,
MyProcPid);
groupSetMemorySpillRatio(&bypassedGroup->caps);
return;
}
PG_TRY();
{
do {
decideResGroup(&groupInfo);
/* Acquire slot */
slot = groupAcquireSlot(&groupInfo, false);
} while (slot == NULL);
/* Set resource group slot for current session */
sessionSetSlot(slot);
/* Add proc memory accounting info into group and slot */
selfAttachResGroup(groupInfo.group, slot);
/* Init self */
self->caps = slot->caps;
/* Don't error out before this line in this function */
SIMPLE_FAULT_INJECTOR("resgroup_assigned_on_master");
/* Add into cgroup */
ResGroupOps_AssignGroup(self->groupId, &(self->caps), MyProcPid);
/* Set spill guc */
groupSetMemorySpillRatio(&slot->caps);
}
PG_CATCH();
{
UnassignResGroup(false);
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* Detach from a resource group at the end of the transaction.
*/
void
UnassignResGroup(bool releaseSlot)
{
ResGroupData *group = self->group;
ResGroupSlotData *slot = self->slot;
if (bypassedGroup)
{
/* bypass mode ref count is only maintained on qd */
if (Gp_role == GP_ROLE_DISPATCH)
groupDecBypassedRef(bypassedGroup);
/* Detach self memory usage from resgroup */
groupDecMemUsage(bypassedGroup, &bypassedSlot, self->memUsage);
/* Reset the fake slot */
bypassedSlot.group = NULL;
bypassedSlot.groupId = InvalidOid;
bypassedGroup = NULL;
/* Update pg_stat_activity statistics */
pgstat_report_resgroup(InvalidOid);
return;
}
if (Gp_role == GP_ROLE_EXECUTE && IS_QUERY_DISPATCHER())
SIMPLE_FAULT_INJECTOR("unassign_resgroup_start_entrydb");
if (!selfIsAssigned())
return;
/* Cleanup self */
if (self->memUsage > 10)
LOG_RESGROUP_DEBUG(LOG, "idle proc memory usage: %d", self->memUsage);
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
/* Sub proc memory accounting info from group and slot */
selfDetachResGroup(group, slot);
/* Release the slot if no reference. */
if (slot->nProcs == 0 || releaseSlot)
{
if (releaseSlot)
{
/* release the memory left in the slot if there's entryDB */
groupDecSlotMemUsage(group, slot);
slot->nProcs = 0;
}
groupReleaseSlot(group, slot, false);
/*
* Reset resource group slot for current session. Note MySessionState
* could be reset as NULL in shmem_exit() before.
*/
sessionResetSlot();
}
LWLockRelease(ResGroupLock);
if (Gp_role == GP_ROLE_DISPATCH)
SIMPLE_FAULT_INJECTOR("unassign_resgroup_end_qd");
pgstat_report_resgroup(InvalidOid);
}
/*
* QEs are not assigned/unassigned to a resource group on segments for each
* transaction, instead, they switch resource group when a new resource group
* id or slot id is dispatched.
*/
void
SwitchResGroupOnSegment(const char *buf, int len)
{
Oid newGroupId;
ResGroupCaps caps;
ResGroupData *group;
ResGroupSlotData *slot;
DeserializeResGroupInfo(&caps, &newGroupId, buf, len);
/*
* QD will dispatch the resgroup id via bypassedSlot.groupId
* in bypass mode.
*/
if (bypassedSlot.groupId != InvalidOid)
{
/* Are we already running in bypass mode? */
if (bypassedGroup != NULL)
{
Assert(bypassedGroup->groupId == bypassedSlot.groupId);
return;
}
/* Find out the resgroup by id */
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
bypassedGroup = groupHashFind(bypassedSlot.groupId, true);
LWLockRelease(ResGroupLock);
Assert(bypassedGroup != NULL);
/* Initialize the fake slot */
bypassedSlot.memQuota = 0;
bypassedSlot.memUsage = 0;
/* Attach self memory usage to resgroup */
groupIncMemUsage(bypassedGroup, &bypassedSlot, self->memUsage);
/* Record the bypass memory limit of current query */
self->bypassMemoryLimit = self->memUsage + RESGROUP_BYPASS_MODE_MEMORY_LIMIT_ON_QE;
return;
}
if (newGroupId == InvalidOid)
{
UnassignResGroup(false);
return;
}
if (self->groupId != InvalidOid)
{
/* it's not the first dispatch in the same transaction */
Assert(self->groupId == newGroupId);
Assert(self->caps.concurrency == caps.concurrency);
Assert(self->caps.cpuRateLimit == caps.cpuRateLimit);
Assert(self->caps.memLimit == caps.memLimit);
Assert(self->caps.memSharedQuota == caps.memSharedQuota);
Assert(self->caps.memSpillRatio == caps.memSpillRatio);
Assert(self->caps.memAuditor == caps.memAuditor);
Assert(!strcmp(self->caps.cpuset, caps.cpuset));
return;
}
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
group = groupHashFind(newGroupId, true);
Assert(group != NULL);
/* Init self */
Assert(host_primary_segment_count > 0);
Assert(caps.concurrency > 0);
self->caps = caps;
/* Init slot */
slot = sessionGetSlot();
if (slot != NULL)
{
Assert(slotIsInUse(slot));
Assert(slot->groupId == newGroupId);
}
else
{
/* This is the first QE of this session, allocate a slot from slot pool */
slot = slotpoolAllocSlot();
Assert(!slotIsInUse(slot));
sessionSetSlot(slot);
mempoolAutoReserve(group, &caps);
initSlot(slot, group,
slotGetMemQuotaOnQE(&caps, group));
group->memQuotaUsed += slot->memQuota;
Assert(group->memQuotaUsed <= group->memQuotaGranted);
group->nRunning++;
}
selfAttachResGroup(group, slot);
LWLockRelease(ResGroupLock);
/* finally we can say we are in a valid resgroup */
Assert(selfIsAssigned());
/* Add into cgroup */
ResGroupOps_AssignGroup(self->groupId, &(self->caps), MyProcPid);
}
/*
* Wait on the queue of resource group
*/
static void
waitOnGroup(ResGroupData *group, bool isMoveQuery)
{
int64 timeout = -1;
int64 curTime;
const char *old_status;
char *new_status = NULL;
int len;
PGPROC *proc = MyProc;
const char *queueStr = " queuing";
Assert(!LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!selfIsAssigned() || isMoveQuery);
/* set ps status to waiting */
if (update_process_title)
{
old_status = get_real_act_ps_display(&len);
new_status = (char *) palloc(len + strlen(queueStr) + 1);
memcpy(new_status, old_status, len);
strcpy(new_status + len, queueStr);
set_ps_display(new_status, false);
/* truncate off " queuing" */
new_status[len] = '\0';
}
/*
* The low bits of 'wait_event_info' argument to WaitLatch are
* not enough to store a full Oid, so we set groupId out-of-band,
* via the backend entry.
*/
pgstat_report_resgroup(group->groupId);
/*
* Mark that we are waiting on resource group
*
* This is used for interrupt cleanup, similar to lockAwaited in ProcSleep
*/
groupAwaited = group;
groupWaitStart = GetCurrentTimestamp();
/*
* Make sure we have released all locks before going to sleep, to eliminate
* deadlock situations
*/
PG_TRY();
{
for (;;)
{
ResetLatch(&proc->procLatch);
CHECK_FOR_INTERRUPTS();
if (!procIsWaiting(proc))
break;
if (gp_resource_group_queuing_timeout > 0)
{
curTime = GetCurrentTimestamp();
timeout = gp_resource_group_queuing_timeout - (curTime - groupWaitStart) / 1000;
if (timeout < 0)
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("canceling statement due to resource group waiting timeout")));
WaitLatch(&proc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
(long) timeout, PG_WAIT_RESOURCE_GROUP);
}
else
{
WaitLatch(&proc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
PG_WAIT_RESOURCE_GROUP);
}
}
}
PG_CATCH();
{
/* reset ps status */
if (update_process_title)
{
set_ps_display(new_status, false);
pfree(new_status);
}
groupWaitCancel(false);
PG_RE_THROW();
}
PG_END_TRY();
groupAwaited = NULL;
/* reset ps status */
if (update_process_title)
{
set_ps_display(new_status, false);
pfree(new_status);
}
}
/*
* groupHashNew -- return a new (empty) group object to initialize.
*
* Notes
* The resource group lightweight lock (ResGroupLock) *must* be held for
* this operation.
*/
static ResGroupData *
groupHashNew(Oid groupId)
{
int i;
bool found;
ResGroupHashEntry *entry;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(groupId != InvalidOid);
for (i = 0; i < pResGroupControl->nGroups; i++)
{
if (pResGroupControl->groups[i].groupId == InvalidOid)
break;
}
Assert(i < pResGroupControl->nGroups);
entry = (ResGroupHashEntry *)
hash_search(pResGroupControl->htbl, (void *) &groupId, HASH_ENTER, &found);
/* caller should test that the group does not exist already */
Assert(!found);
entry->index = i;
return &pResGroupControl->groups[i];
}
/*
* groupHashFind -- return the group for a given oid.
*
* If the group cannot be found, then NULL is returned if 'raise' is false,
* otherwise an exception is thrown.
*
* Notes
* The resource group lightweight lock (ResGroupLock) *must* be held for
* this operation.
*/
static ResGroupData *
groupHashFind(Oid groupId, bool raise)
{
bool found;
ResGroupHashEntry *entry;
Assert(LWLockHeldByMe(ResGroupLock));
entry = (ResGroupHashEntry *)
hash_search(pResGroupControl->htbl, (void *) &groupId, HASH_FIND, &found);
if (!found)
{
ereport(raise ? ERROR : LOG,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("cannot find resource group with Oid %d in shared memory",
groupId)));
return NULL;
}
Assert(entry->index < pResGroupControl->nGroups);
return &pResGroupControl->groups[entry->index];
}
/*
* groupHashRemove -- remove the group for a given oid.
*
* If the group cannot be found then an exception is thrown.
*
* Notes
* The resource group lightweight lock (ResGroupLock) *must* be held for
* this operation.
*/
static ResGroupData *
groupHashRemove(Oid groupId)
{
bool found;
ResGroupHashEntry *entry;
ResGroupData *group;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
entry = (ResGroupHashEntry*)hash_search(pResGroupControl->htbl,
(void *) &groupId,
HASH_REMOVE,
&found);
if (!found)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("cannot find resource group with Oid %d in shared memory to remove",
groupId)));
group = &pResGroupControl->groups[entry->index];
return group;
}
/* Process exit without waiting for slot or received SIGTERM */
static void
AtProcExit_ResGroup(int code, Datum arg)
{
groupWaitCancel(false);
}
/*
* Handle the interrupt cases when waiting on the queue
*
* The proc may wait on the queue for a slot, or wait for the
* DROP transaction to finish. In the first case, at the same time
* we get interrupted (SIGINT or SIGTERM), we could have been
* granted a slot or not. In the second case, there's no running
* transaction in the group. If the DROP transaction is finished
* (commit or abort) at the same time as we get interrupted,
* MyProc should have been removed from the wait queue, and the
* ResGroupData entry may have been removed if the DROP is committed.
*/
static void
groupWaitCancel(bool isMoveQuery)
{
ResGroupData *group;
ResGroupSlotData *slot;
/* Nothing to do if we weren't waiting on a group */
if (groupAwaited == NULL)
return;
pgstat_report_wait_end();
groupWaitEnd = GetCurrentTimestamp();
Assert(!selfIsAssigned() || isMoveQuery);
group = groupAwaited;
/* We are sure to be interrupted in the for loop of waitOnGroup now */
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
AssertImply(procIsWaiting(MyProc),
groupWaitQueueFind(group, MyProc));
if (procIsWaiting(MyProc))
{
/*
* Still waiting on the queue when get interrupted, remove
* myself from the queue
*/
Assert(!groupWaitQueueIsEmpty(group));
groupWaitQueueErase(group, MyProc);
addTotalQueueDuration(group);
}
else if (MyProc->resSlot != NULL)
{
/* Woken up by a slot holder */
Assert(!procIsWaiting(MyProc));
/* First complete the slot's transfer from MyProc to self */
slot = MyProc->resSlot;
MyProc->resSlot = NULL;
/*
* Similar as groupReleaseSlot(), how many pending queries to
* wake up depends on how many slots we can get.
*/
groupReleaseSlot(group, slot, false);
/*
* Reset resource group slot for current session. Note MySessionState
* could be reset as NULL in shmem_exit() before.
*/
sessionResetSlot();
group->totalExecuted++;
addTotalQueueDuration(group);
}
else
{
/*
* The transaction of DROP RESOURCE GROUP is finished,
* groupAcquireSlot will do the retry.
*
* The resource group pointed by self->group may have
* already been removed by here.
*/
Assert(!procIsWaiting(MyProc));
}
LWLockRelease(ResGroupLock);
groupAwaited = NULL;
}
static void
groupSetMemorySpillRatio(const ResGroupCaps *caps)
{
char value[64];
/* No need to set memory_spill_ratio if it is already up-to-date */
if (caps->memSpillRatio == memory_spill_ratio)
return;
snprintf(value, sizeof(value), "%d", caps->memSpillRatio);
set_config_option("memory_spill_ratio", value, PGC_USERSET, PGC_S_RESGROUP,
GUC_ACTION_SET, true, 0, false);
}
void
ResGroupGetMemInfo(int *memLimit, int *slotQuota, int *sharedQuota)
{
const ResGroupCaps *caps = &self->caps;
*memLimit = groupGetMemExpected(caps);
*slotQuota = caps->concurrency ? slotGetMemQuotaExpected(caps) : -1;
*sharedQuota = groupGetMemSharedExpected(caps);
}
/*
* Validate the consistency of the resgroup information in self.
*
* This function checks the consistency of (group & groupId).
*/
static void
selfValidateResGroupInfo(void)
{
Assert(self->memUsage >= 0);
AssertImply(self->groupId != InvalidOid,
self->group != NULL);
}
/*
* Check whether self is assigned.
*
* This is mostly equal to (selfHasSlot() && selfHasGroup()),
* however this function requires the slot and group to be in
* a consistent status, they must both be set or unset,
* so calling this function during the assign/unassign/switch process
* might cause an error, use with caution.
*
* Even selfIsAssigned() is true it doesn't mean the assign/switch
* process is completely done, for example the memory accounting
* information might not been updated yet.
*
* This function doesn't check whether the assigned resgroup
* is valid or dropped.
*/
static bool
selfIsAssigned(void)
{
selfValidateResGroupInfo();
AssertImply(self->group == NULL,
self->slot == NULL);
AssertImply(self->group != NULL,
self->slot != NULL);
return self->groupId != InvalidOid;
}
#ifdef USE_ASSERT_CHECKING
/*
* Check whether self has been set a slot.
*
* We don't check whether a resgroup is set or not.
*/
static bool
selfHasSlot(void)
{
return self->slot != NULL;
}
/*
* Check whether self has been set a resgroup.
*
* Consistency will be checked on the groupId and group pointer.
*
* We don't check whether the resgroup is valid or dropped.
*
* We don't check whether a slot is set or not.
*/
static bool
selfHasGroup(void)
{
AssertImply(self->groupId != InvalidOid,
self->group != NULL);
return self->groupId != InvalidOid;
}
#endif /* USE_ASSERT_CHECKING */
/*
* Set both the groupId and the group pointer in self.
*
* The group must not be dropped.
*
* Some over limitations are put to force the caller understand
* what it's doing and what it wants:
* - self must has not been set a resgroup;
*/
static void
selfSetGroup(ResGroupData *group)
{
Assert(!selfIsAssigned());
Assert(groupIsNotDropped(group));
self->group = group;
self->groupId = group->groupId;
}
/*
* Unset both the groupId and the resgroup pointer in self.
*
* Some over limitations are put to force the caller understand
* what it's doing and what it wants:
* - self must has been set a resgroup;
*/
static void
selfUnsetGroup(void)
{
Assert(selfHasGroup());
Assert(!selfHasSlot());
self->groupId = InvalidOid;
self->group = NULL;
}
/*
* Set the slot pointer in self.
*
* Some over limitations are put to force the caller understand
* what it's doing and what it wants:
* - self must has been set a resgroup;
* - self must has not been set a slot before set;
*/
static void
selfSetSlot(ResGroupSlotData *slot)
{
Assert(selfHasGroup());
Assert(!selfHasSlot());
Assert(slotIsInUse(slot));
self->slot = slot;
}
/*
* Unset the slot pointer in self.
*
* Some over limitations are put to force the caller understand
* what it's doing and what it wants:
* - self must has been set a resgroup;
* - self must has been set a slot before unset;
*/
static void
selfUnsetSlot(void)
{
Assert(selfHasGroup());
Assert(selfHasSlot());
self->slot = NULL;
}
/*
* Check whether proc is in some resgroup's wait queue.
*
* The LWLock is not required.
*
* This function does not check whether proc is in a specific resgroup's
* wait queue. To make this check use groupWaitQueueFind().
*/
static bool
procIsWaiting(const PGPROC *proc)
{
/*------
* The typical asm instructions fow below C operation can be like this:
* ( gcc 4.8.5-11, x86_64-redhat-linux, -O0 )
*
* mov -0x8(%rbp),%rax ; load proc
* mov 0x8(%rax),%rax ; load proc->links.next
* cmp $0,%rax ; compare with NULL
* setne %al ; store the result
*
* The operation is atomic, so a lock is not required here.
*------
*/
return proc->links.next != NULL;
}
/*
* Notify a proc it's woken up.
*/
static void
procWakeup(PGPROC *proc)
{
Assert(!procIsWaiting(proc));
SetLatch(&proc->procLatch);
}
#ifdef USE_ASSERT_CHECKING
/*
* Validate a slot's attributes.
*/
static void
slotValidate(const ResGroupSlotData *slot)
{
Assert(slot != NULL);
/* further checks whether the slot is freed or idle */
if (slot->groupId == InvalidOid)
{
Assert(slot->nProcs == 0);
Assert(slot->memQuota < 0);
Assert(slot->memUsage == 0);
}
else
{
Assert(!slotIsInFreelist(slot));
AssertImply(Gp_role == GP_ROLE_EXECUTE, slot == sessionGetSlot());
}
}
/*
* A slot is in use if it has a valid groupId.
*/
static bool
slotIsInUse(const ResGroupSlotData *slot)
{
slotValidate(slot);
return slot->groupId != InvalidOid;
}
static bool
slotIsInFreelist(const ResGroupSlotData *slot)
{
ResGroupSlotData *current;
current = pResGroupControl->freeSlot;
for ( ; current != NULL; current = current->next)
{
if (current == slot)
return true;
}
return false;
}
#endif /* USE_ASSERT_CHECKING */
/*
* Get the slot id of the given slot.
*
* Return InvalidSlotId if slot is NULL.
*/
static int
slotGetId(const ResGroupSlotData *slot)
{
int slotId;
if (slot == NULL)
return InvalidSlotId;
slotId = slot - pResGroupControl->slots;
Assert(slotId >= 0);
Assert(slotId < RESGROUP_MAX_SLOTS);
return slotId;
}
static void
lockResGroupForDrop(ResGroupData *group)
{
if (group->lockedForDrop)
return;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(group->nRunning == 0);
Assert(group->nRunningBypassed == 0);
group->lockedForDrop = true;
}
static void
unlockResGroupForDrop(ResGroupData *group)
{
if (!group->lockedForDrop)
return;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(group->nRunning == 0);
Assert(group->nRunningBypassed == 0);
group->lockedForDrop = false;
}
#ifdef USE_ASSERT_CHECKING
/*
* Check whether a resgroup is dropped.
*
* A dropped resgroup has groupId == InvalidOid,
* however there is also the case that the resgroup is first dropped
* then the shm struct is reused by another newly created resgroup,
* in such a case the groupId is not InvalidOid but the original
* resgroup does is dropped.
*
* So this function is not always reliable, use with caution.
*/
static bool
groupIsNotDropped(const ResGroupData *group)
{
return group
&& group->groupId != InvalidOid;
}
#endif /* USE_ASSERT_CHECKING */
/*
* Validate the consistency of the resgroup wait queue.
*/
static void
groupWaitQueueValidate(const ResGroupData *group)
{
const PROC_QUEUE *waitQueue;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
waitQueue = &group->waitProcs;
if (gp_resgroup_debug_wait_queue)
{
if (waitQueue->size == 0)
{
if (waitQueue->links.next != &waitQueue->links ||
waitQueue->links.prev != &waitQueue->links)
elog(PANIC, "resource group wait queue is corrupted");
}
else
{
PGPROC *nextProc = (PGPROC *)waitQueue->links.next;
PGPROC *prevProc = (PGPROC *)waitQueue->links.prev;
if (!nextProc->mppIsWriter ||
!prevProc->mppIsWriter ||
nextProc->links.prev != &waitQueue->links ||
prevProc->links.next != &waitQueue->links)
elog(PANIC, "resource group wait queue is corrupted");
}
return;
}
AssertImply(waitQueue->size == 0,
waitQueue->links.next == &waitQueue->links &&
waitQueue->links.prev == &waitQueue->links);
}
static void
groupWaitProcValidate(PGPROC *proc, PROC_QUEUE *head)
{
PGPROC *nextProc = (PGPROC *)proc->links.next;
PGPROC *prevProc = (PGPROC *)proc->links.prev;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
if (!gp_resgroup_debug_wait_queue)
return;
if (!proc->mppIsWriter ||
((PROC_QUEUE *)nextProc != head && !nextProc->mppIsWriter) ||
((PROC_QUEUE *)prevProc != head && !prevProc->mppIsWriter) ||
nextProc->links.prev != &proc->links ||
prevProc->links.next != &proc->links)
elog(PANIC, "resource group wait queue is corrupted");
return;
}
/*
* Push a proc to the resgroup wait queue.
*/
static void
groupWaitQueuePush(ResGroupData *group, PGPROC *proc)
{
PROC_QUEUE *waitQueue;
PGPROC *headProc;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!procIsWaiting(proc));
Assert(proc->resSlot == NULL);
groupWaitQueueValidate(group);
waitQueue = &group->waitProcs;
headProc = (PGPROC *) &waitQueue->links;
SHMQueueInsertBefore(&headProc->links, &proc->links);
groupWaitProcValidate(proc, waitQueue);
waitQueue->size++;
Assert(groupWaitQueueFind(group, proc));
}
/*
* Pop the top proc from the resgroup wait queue and return it.
*/
static PGPROC *
groupWaitQueuePop(ResGroupData *group)
{
PROC_QUEUE *waitQueue;
PGPROC *proc;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!groupWaitQueueIsEmpty(group));
groupWaitQueueValidate(group);
waitQueue = &group->waitProcs;
proc = (PGPROC *) waitQueue->links.next;
groupWaitProcValidate(proc, waitQueue);
Assert(groupWaitQueueFind(group, proc));
Assert(proc->resSlot == NULL);
SHMQueueDelete(&proc->links);
waitQueue->size--;
return proc;
}
/*
* Erase proc from the resgroup wait queue.
*/
static void
groupWaitQueueErase(ResGroupData *group, PGPROC *proc)
{
PROC_QUEUE *waitQueue;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!groupWaitQueueIsEmpty(group));
Assert(groupWaitQueueFind(group, proc));
Assert(proc->resSlot == NULL);
groupWaitQueueValidate(group);
waitQueue = &group->waitProcs;
groupWaitProcValidate(proc, waitQueue);
SHMQueueDelete(&proc->links);
waitQueue->size--;
}
/*
* Check whether the resgroup wait queue is empty.
*/
static bool
groupWaitQueueIsEmpty(const ResGroupData *group)
{
const PROC_QUEUE *waitQueue;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
groupWaitQueueValidate(group);
waitQueue = &group->waitProcs;
return waitQueue->size == 0;
}
#ifdef USE_ASSERT_CHECKING
/*
* Find proc in group's wait queue.
*
* Return true if found or false if not found.
*
* This functions is expensive so should only be used in debugging logic,
* in most cases procIsWaiting() shall be used.
*/
static bool
groupWaitQueueFind(ResGroupData *group, const PGPROC *proc)
{
PROC_QUEUE *waitQueue;
SHM_QUEUE *head;
PGPROC *iter;
Size offset;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
groupWaitQueueValidate(group);
waitQueue = &group->waitProcs;
head = &waitQueue->links;
offset = offsetof(PGPROC, links);
for (iter = (PGPROC *) SHMQueueNext(head, head, offset); iter;
iter = (PGPROC *) SHMQueueNext(head, &iter->links, offset))
{
if (iter == proc)
{
Assert(procIsWaiting(proc));
return true;
}
}
return false;
}
#endif/* USE_ASSERT_CHECKING */
/*
* Parse the query and check if this query should
* bypass the management of resource group.
*
* Currently, only SET/RESET/SHOW command can be bypassed
*/
static bool
shouldBypassQuery(const char *query_string)
{
MemoryContext oldcontext = NULL;
MemoryContext tmpcontext = NULL;
List *parsetree_list;
ListCell *parsetree_item;
Node *parsetree;
bool bypass;
if (gp_resource_group_bypass)
return true;
if (!query_string)
return false;
/*
* Switch to appropriate context for constructing parsetrees.
*
* It is possible that MessageContext is NULL, for example in a bgworker:
*
* debug_query_string = "select 1";
* StartTransactionCommand();
*
* This is not the recommended order of setting debug_query_string, but we
* should not put a constraint on the order by resource group anyway.
*/
if (MessageContext)
oldcontext = MemoryContextSwitchTo(MessageContext);
else
{
/* Create a temp memory context to prevent memory leaks */
tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
"resgroup temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldcontext = MemoryContextSwitchTo(tmpcontext);
}
parsetree_list = pg_parse_query(query_string);
MemoryContextSwitchTo(oldcontext);
if (parsetree_list == NULL)
return false;
/* Only bypass SET/RESET/SHOW command for now */
bypass = true;
foreach(parsetree_item, parsetree_list)
{
parsetree = (Node *) lfirst(parsetree_item);
if (nodeTag(parsetree) == T_RawStmt)
parsetree = ((RawStmt *)parsetree)->stmt;
if (nodeTag(parsetree) != T_VariableSetStmt &&
nodeTag(parsetree) != T_VariableShowStmt)
{
bypass = false;
break;
}
}
list_free_deep(parsetree_list);
if (tmpcontext)
MemoryContextDelete(tmpcontext);
return bypass;
}
/*
* Check whether the resource group has been dropped.
*/
static bool
groupIsDropped(ResGroupInfo *pGroupInfo)
{
Assert(pGroupInfo != NULL);
Assert(pGroupInfo->group != NULL);
return pGroupInfo->group->groupId != pGroupInfo->groupId;
}
/*
* Debug helper functions
*/
void
ResGroupDumpInfo(StringInfo str)
{
int i;
if (!IsResGroupEnabled())
return;
appendStringInfo(str, "{\"segid\":%d,", GpIdentity.segindex);
/* dump fields in pResGroupControl. */
appendStringInfo(str, "\"segmentsOnMaster\":%d,", pResGroupControl->segmentsOnMaster);
appendStringInfo(str, "\"loaded\":%s,", pResGroupControl->loaded ? "true" : "false");
appendStringInfo(str, "\"totalChunks\":%d,", pResGroupControl->totalChunks);
appendStringInfo(str, "\"freeChunks\":%d,", pg_atomic_read_u32(&pResGroupControl->freeChunks));
appendStringInfo(str, "\"chunkSizeInBits\":%d,", pResGroupControl->chunkSizeInBits);
/* dump each group */
appendStringInfo(str, "\"groups\":[");
for (i = 0; i < pResGroupControl->nGroups; i++)
{
resgroupDumpGroup(str, &pResGroupControl->groups[i]);
if (i < pResGroupControl->nGroups - 1)
appendStringInfo(str, ",");
}
appendStringInfo(str, "],");
/* dump slots */
resgroupDumpSlots(str);
appendStringInfo(str, ",");
/* dump freeslot links */
resgroupDumpFreeSlots(str);
appendStringInfo(str, "}");
}
static void
resgroupDumpGroup(StringInfo str, ResGroupData *group)
{
appendStringInfo(str, "{");
appendStringInfo(str, "\"group_id\":%u,", group->groupId);
appendStringInfo(str, "\"nRunning\":%d,", group->nRunning);
appendStringInfo(str, "\"nRunningBypassed\":%d,", group->nRunningBypassed);
appendStringInfo(str, "\"locked_for_drop\":%d,", group->lockedForDrop);
appendStringInfo(str, "\"memExpected\":%d,", group->memExpected);
appendStringInfo(str, "\"memQuotaGranted\":%d,", group->memQuotaGranted);
appendStringInfo(str, "\"memSharedGranted\":%d,", group->memSharedGranted);
appendStringInfo(str, "\"memQuotaUsed\":%d,", group->memQuotaUsed);
appendStringInfo(str, "\"memUsage\":%d,", group->memUsage);
appendStringInfo(str, "\"memSharedUsage\":%d,", group->memSharedUsage);
resgroupDumpWaitQueue(str, &group->waitProcs);
resgroupDumpCaps(str, (ResGroupCap*)(&group->caps));
appendStringInfo(str, "}");
}
static void
resgroupDumpWaitQueue(StringInfo str, PROC_QUEUE *queue)
{
PGPROC *proc;
appendStringInfo(str, "\"wait_queue\":{");
appendStringInfo(str, "\"wait_queue_size\":%d,", queue->size);
appendStringInfo(str, "\"wait_queue_content\":[");
proc = (PGPROC *)SHMQueueNext(&queue->links,
&queue->links,
offsetof(PGPROC, links));
if (!ShmemAddrIsValid(&proc->links))
{
appendStringInfo(str, "]},");
return;
}
while (proc)
{
appendStringInfo(str, "{");
appendStringInfo(str, "\"pid\":%d,", proc->pid);
appendStringInfo(str, "\"resWaiting\":%s,",
procIsWaiting(proc) ? "true" : "false");
appendStringInfo(str, "\"resSlot\":%d", slotGetId(proc->resSlot));
appendStringInfo(str, "}");
proc = (PGPROC *)SHMQueueNext(&queue->links,
&proc->links,
offsetof(PGPROC, links));
if (proc)
appendStringInfo(str, ",");
}
appendStringInfo(str, "]},");
}
static void
resgroupDumpCaps(StringInfo str, ResGroupCap *caps)
{
int i;
appendStringInfo(str, "\"caps\":[");
for (i = 1; i < RESGROUP_LIMIT_TYPE_COUNT; i++)
{
appendStringInfo(str, "{\"%d\":%d}", i, caps[i]);
if (i < RESGROUP_LIMIT_TYPE_COUNT - 1)
appendStringInfo(str, ",");
}
appendStringInfo(str, "]");
}
static void
resgroupDumpSlots(StringInfo str)
{
int i;
ResGroupSlotData* slot;
appendStringInfo(str, "\"slots\":[");
for (i = 0; i < RESGROUP_MAX_SLOTS; i++)
{
slot = &(pResGroupControl->slots[i]);
appendStringInfo(str, "{");
appendStringInfo(str, "\"slotId\":%d,", i);
appendStringInfo(str, "\"groupId\":%u,", slot->groupId);
appendStringInfo(str, "\"memQuota\":%d,", slot->memQuota);
appendStringInfo(str, "\"memUsage\":%d,", slot->memUsage);
appendStringInfo(str, "\"nProcs\":%d,", slot->nProcs);
appendStringInfo(str, "\"next\":%d,", slotGetId(slot->next));
resgroupDumpCaps(str, (ResGroupCap*)(&slot->caps));
appendStringInfo(str, "}");
if (i < RESGROUP_MAX_SLOTS - 1)
appendStringInfo(str, ",");
}
appendStringInfo(str, "]");
}
static void
resgroupDumpFreeSlots(StringInfo str)
{
ResGroupSlotData* head;
head = pResGroupControl->freeSlot;
appendStringInfo(str, "\"free_slot_list\":{");
appendStringInfo(str, "\"head\":%d", slotGetId(head));
appendStringInfo(str, "}");
}
/*
* Set resource group slot for current session.
*/
static void
sessionSetSlot(ResGroupSlotData *slot)
{
Assert(slot != NULL);
Assert(MySessionState->resGroupSlot == NULL);
/*
* SessionStateLock is required since runaway detector will traverse
* the current session array and check corresponding resGroupSlot with
* shared lock on SessionStateLock.
*/
LWLockAcquire(SessionStateLock, LW_EXCLUSIVE);
MySessionState->resGroupSlot = (void *) slot;
LWLockRelease(SessionStateLock);
}
/*
* Reset resource group slot for current session to NULL.
*/
static void
sessionResetSlot(void)
{
/*
* SessionStateLock is required since runaway detector will traverse
* the current session array and check corresponding resGroupSlot with
* shared lock on SessionStateLock.
*/
if (MySessionState != NULL)
{
LWLockAcquire(SessionStateLock, LW_EXCLUSIVE);
MySessionState->resGroupSlot = NULL;
LWLockRelease(SessionStateLock);
}
}
/*
* Get resource group slot of current session.
*/
static ResGroupSlotData *
sessionGetSlot(void)
{
if (MySessionState == NULL)
return NULL;
else
return (ResGroupSlotData *) MySessionState->resGroupSlot;
}
/*
* Operation for resource groups with vmtracker memory auditor
* when alter its memory limit.
*/
static void
groupMemOnAlterForVmtracker(Oid groupId, ResGroupData *group)
{
bool shouldNotify;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
shouldNotify = groupApplyMemCaps(group);
wakeupSlots(group, true);
if (shouldNotify)
notifyGroupsOnMem(groupId);
}
/*
* Operation for resource groups with vmtracker memory auditor
* when reclaiming its memory back to MEM POOL.
*/
static void
groupMemOnDropForVmtracker(Oid groupId, ResGroupData *group)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
mempoolRelease(groupId, group->memQuotaGranted + group->memSharedGranted);
group->memQuotaGranted = 0;
group->memSharedGranted = 0;
}
/*
* Operation for resource groups with vmtracker memory auditor
* when memory in MEM POOL is increased.
*/
static void
groupMemOnNotifyForVmtracker(ResGroupData *group)
{
int32 delta;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
if (Gp_role != GP_ROLE_DISPATCH)
return;
if (group->lockedForDrop)
return;
if (groupWaitQueueIsEmpty(group))
return;
delta = group->memExpected - group->memQuotaGranted - group->memSharedGranted;
if (delta <= 0)
return;
wakeupSlots(group, true);
}
/*
* Operation for resource groups with vmtracker memory auditor
* when dump memory statistics.
*/
static void
groupMemOnDumpForVmtracker(ResGroupData *group, StringInfo str)
{
appendStringInfo(str, "{");
appendStringInfo(str, "\"used\":%d, ",
VmemTracker_ConvertVmemChunksToMB(group->memUsage));
appendStringInfo(str, "\"available\":%d, ",
VmemTracker_ConvertVmemChunksToMB(
group->memQuotaGranted + group->memSharedGranted - group->memUsage));
appendStringInfo(str, "\"quota_used\":%d, ",
VmemTracker_ConvertVmemChunksToMB(group->memQuotaUsed));
appendStringInfo(str, "\"quota_available\":%d, ",
VmemTracker_ConvertVmemChunksToMB(
group->memQuotaGranted - group->memQuotaUsed));
appendStringInfo(str, "\"quota_granted\":%d, ",
VmemTracker_ConvertVmemChunksToMB(group->memQuotaGranted));
appendStringInfo(str, "\"quota_proposed\":%d, ",
VmemTracker_ConvertVmemChunksToMB(
groupGetMemQuotaExpected(&group->caps)));
appendStringInfo(str, "\"shared_used\":%d, ",
VmemTracker_ConvertVmemChunksToMB(group->memSharedUsage));
appendStringInfo(str, "\"shared_available\":%d, ",
VmemTracker_ConvertVmemChunksToMB(
group->memSharedGranted - group->memSharedUsage));
appendStringInfo(str, "\"shared_granted\":%d, ",
VmemTracker_ConvertVmemChunksToMB(group->memSharedGranted));
appendStringInfo(str, "\"shared_proposed\":%d",
VmemTracker_ConvertVmemChunksToMB(
groupGetMemSharedExpected(&group->caps)));
appendStringInfo(str, "}");
}
/*
* Operation for resource groups with cgroup memory auditor
* when alter its memory limit.
*/
static void
groupMemOnAlterForCgroup(Oid groupId, ResGroupData *group)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
/*
* If memGap is positive, it indicates this group should
* give back these many memory back to MEM POOL.
*
* If memGap is negative, it indicates this group should
* retrieve these many memory from MEM POOL.
*
* If memGap is zero, this group is holding the same memory
* as it expects.
*/
if (group->memGap == 0)
return;
if (group->memGap > 0)
groupApplyCgroupMemDec(group);
else
groupApplyCgroupMemInc(group);
}
/*
* Increase a resource group's cgroup memory limit
*
* This may not take effect immediately.
*/
static void
groupApplyCgroupMemInc(ResGroupData *group)
{
ResGroupCompType comp = RESGROUP_COMP_TYPE_MEMORY;
int32 memory_limit;
int32 memory_inc;
int fd;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(group->memGap < 0);
memory_inc = mempoolReserve(group->groupId, group->memGap * -1);
if (memory_inc <= 0)
return;
fd = ResGroupOps_LockGroup(group->groupId, comp, true);
memory_limit = ResGroupOps_GetMemoryLimit(group->groupId);
ResGroupOps_SetMemoryLimitByValue(group->groupId, memory_limit + memory_inc);
ResGroupOps_UnLockGroup(group->groupId, fd);
group->memGap += memory_inc;
}
/*
* Decrease a resource group's cgroup memory limit
*
* This will take effect immediately for now.
*/
static void
groupApplyCgroupMemDec(ResGroupData *group)
{
ResGroupCompType comp = RESGROUP_COMP_TYPE_MEMORY;
int32 memory_limit;
int32 memory_dec;
int fd;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(group->memGap > 0);
fd = ResGroupOps_LockGroup(group->groupId, comp, true);
memory_limit = ResGroupOps_GetMemoryLimit(group->groupId);
Assert(memory_limit > group->memGap);
memory_dec = group->memGap;
ResGroupOps_SetMemoryLimitByValue(group->groupId, memory_limit - memory_dec);
ResGroupOps_UnLockGroup(group->groupId, fd);
mempoolRelease(group->groupId, memory_dec);
notifyGroupsOnMem(group->groupId);
group->memGap -= memory_dec;
}
/*
* Operation for resource groups with cgroup memory auditor
* when reclaiming its memory back to MEM POOL.
*/
static void
groupMemOnDropForCgroup(Oid groupId, ResGroupData *group)
{
int32 memory_expected;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
memory_expected = groupGetMemExpected(&group->caps);
mempoolRelease(groupId, memory_expected + group->memGap);
}
/*
* Operation for resource groups with cgroup memory auditor
* when memory in MEM POOL is increased.
*/
static void
groupMemOnNotifyForCgroup(ResGroupData *group)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
if (group->memGap < 0)
groupApplyCgroupMemInc(group);
}
/*
* Operation for resource groups with cgroup memory auditor
* when dump memory statistics.
*/
static void
groupMemOnDumpForCgroup(ResGroupData *group, StringInfo str)
{
appendStringInfo(str, "{");
appendStringInfo(str, "\"used\":%d, ",
VmemTracker_ConvertVmemChunksToMB(
ResGroupOps_GetMemoryUsage(group->groupId) / ResGroupGetHostPrimaryCount()));
appendStringInfo(str, "\"limit_granted\":%d",
VmemTracker_ConvertVmemChunksToMB(
ResGroupOps_GetMemoryLimit(group->groupId) / ResGroupGetHostPrimaryCount()));
appendStringInfo(str, "}");
}
/*
* Parse cpuset to bitset
* If cpuset is "1,3-5", Bitmapset 1,3,4,5 are set.
*/
Bitmapset *
CpusetToBitset(const char *cpuset, int len)
{
int pos = 0, num1 = 0, num2 = 0;
enum Status
{
Initial,
Begin,
Number,
Interval,
Number2
};
enum Status s = Initial;
Bitmapset *bms = NULL;
if (cpuset == NULL || len <= 0)
return bms;
while (pos < len && cpuset[pos])
{
char c = cpuset[pos++];
if (c == ',')
{
if (s == Initial || s == Begin)
{
continue;
}
else if (s == Interval)
{
goto error_logic;
}
else if (s == Number)
{
bms = bms_union(bms, bms_make_singleton(num1));
num1 = 0;
s = Begin;
}
else if (s == Number2)
{
if (num1 > num2)
{
goto error_logic;
}
for (int i = num1; i <= num2; ++i)
{
bms = bms_union(bms, bms_make_singleton(i));
}
num1 = num2 = 0;
s = Begin;
}
}
else if (c == '-')
{
if (s != Number)
{
goto error_logic;
}
s = Interval;
}
else if (isdigit(c))
{
if (s == Initial || s == Begin)
{
s = Number;
}
else if (s == Interval)
{
s = Number2;
}
if (s == Number)
{
num1 = num1 * 10 + (c - '0');
}
else if (s == Number2)
{
num2 = num2 * 10 + (c - '0');
}
}
else if (c == '\n')
{
break;
}
else
{
goto error_logic;
}
}
if (s == Number)
{
bms = bms_union(bms, bms_make_singleton(num1));
}
else if (s == Number2)
{
if (num1 > num2)
{
goto error_logic;
}
for (int i = num1; i <= num2; ++i)
{
bms = bms_union(bms, bms_make_singleton(i));
}
}
else if (s == Initial || s == Interval)
{
goto error_logic;
}
return bms;
error_logic:
return NULL;
}
/*
* Check the value of cpuset is empty or not
*/
bool CpusetIsEmpty(const char *cpuset)
{
return strcmp(cpuset, DefaultCpuset) == 0;
}
/*
* Set cpuset value to default value -1.
*/
void SetCpusetEmpty(char *cpuset, int cpusetSize)
{
StrNCpy(cpuset, DefaultCpuset, cpusetSize);
}
/*
* Transform non-empty bitset to cpuset.
*
* This function does not check the cpu cores are available or not.
*/
void
BitsetToCpuset(const Bitmapset *bms,
char *cpuset,
int cpusetSize)
{
int len = 0;
int lastContinuousBit = -1;
int intervalStart = -1;
int num;
char buffer[32] = {0};
Assert(!bms_is_empty(bms));
cpuset[0] = '\0';
num = -1;
while ((num = bms_next_member(bms, num)) >= 0)
{
if (lastContinuousBit == -1)
{
intervalStart = lastContinuousBit = num;
}
else
{
if (num != lastContinuousBit + 1)
{
if (intervalStart == lastContinuousBit)
{
snprintf(buffer, sizeof(buffer), "%d,", intervalStart);
}
else
{
snprintf(buffer, sizeof(buffer), "%d-%d,", intervalStart, lastContinuousBit);
}
if (len + strlen(buffer) >= cpusetSize)
{
Assert(cpuset[0]);
return ;
}
strcpy(cpuset + len, buffer);
len += strlen(buffer);
intervalStart = lastContinuousBit = num;
}
else
{
lastContinuousBit = num;
}
}
}
if (intervalStart != -1)
{
if (intervalStart == lastContinuousBit)
{
snprintf(buffer, sizeof(buffer), "%d", intervalStart);
}
else
{
snprintf(buffer, sizeof(buffer), "%d-%d", intervalStart, lastContinuousBit);
}
if (len + strlen(buffer) >= cpusetSize)
{
Assert(cpuset[0]);
return ;
}
strcpy(cpuset + len, buffer);
len += strlen(buffer);
}
else
{
/* bms is non-empty, so it should never reach here */
pg_unreachable();
}
}
/*
* calculate the result of cpuset1 plus/minus cpuset2 and save in place
* if sub is true, the operation is minus
* if sub is false, the operation is plus
*/
void
cpusetOperation(char *cpuset1, const char *cpuset2,
int len, bool sub)
{
char cpuset[MaxCpuSetLength] = {0};
int defaultCore = -1;
Bitmapset *bms1 = CpusetToBitset(cpuset1, len);
Bitmapset *bms2 = CpusetToBitset(cpuset2, len);
if (sub)
{
bms1 = bms_del_members(bms1, bms2);
}
else
{
bms1 = bms_add_members(bms1, bms2);
}
if (!bms_is_empty(bms1))
{
BitsetToCpuset(bms1, cpuset1, len);
}
else
{
/* Get cpuset from cpuset/gpdb, and transform it into bitset */
ResGroupOps_GetCpuSet(RESGROUP_ROOT_ID, cpuset, MaxCpuSetLength);
Bitmapset *bmsDefault = CpusetToBitset(cpuset, MaxCpuSetLength);
/* get the minimum core number, in case of the zero core is not exist */
defaultCore = bms_next_member(bmsDefault, -1);
Assert(defaultCore >= 0);
snprintf(cpuset1, MaxCpuSetLength, "%d", defaultCore);
}
}
/*
* union cpuset2 to cpuset1
*/
void
CpusetUnion(char *cpuset1, const char *cpuset2, int len)
{
cpusetOperation(cpuset1, cpuset2, len, false);
}
/*
* subtract cpuset2 from cpuset1
*/
void
CpusetDifference(char *cpuset1, const char *cpuset2, int len)
{
cpusetOperation(cpuset1, cpuset2, len, true);
}
/*
* ensure that cpuset is available.
*/
bool
EnsureCpusetIsAvailable(int elevel)
{
if (!IsResGroupActivated())
{
ereport(elevel,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("resource group must be enabled to use cpuset feature")));
return false;
}
if (!gp_resource_group_enable_cgroup_cpuset)
{
ereport(elevel,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cgroup is not properly configured to use the cpuset feature"),
errhint("Extra cgroup configurations are required to enable this feature, "
"please refer to the Greenplum Documentations for details")));
return false;
}
return true;
}
/*
* Check whether current resource group's memory usage is in RedZone.
*/
bool
IsGroupInRedZone(void)
{
uint32 remainGlobalSharedMem;
uint32 safeChunksThreshold100;
ResGroupSlotData *slot = self->slot;
ResGroupData *group = self->group;
/*
* IsGroupInRedZone is called frequently, we should put the
* condition which returns with higher probability in front.
*
* safe: global shared memory is not in redzone
*/
remainGlobalSharedMem = (uint32) pg_atomic_read_u32(&pResGroupControl->freeChunks);
safeChunksThreshold100 = (uint32) pg_atomic_read_u32(&pResGroupControl->safeChunksThreshold100);
if (remainGlobalSharedMem * 100 >= safeChunksThreshold100)
return false;
AssertImply(slot != NULL, group != NULL);
if (!slot)
return false;
/* safe: slot memory is not used up */
if (slot->memQuota > slot->memUsage)
return false;
/* safe: group shared memory is not in redzone */
if (group->memSharedGranted > group->memSharedUsage)
return false;
/* memory usage in this group is in RedZone */
return true;
}
/*
* Dump memory information for current resource group.
* This is the output of resource group runaway.
*/
void
ResGroupGetMemoryRunawayInfo(StringInfo str)
{
ResGroupSlotData *slot = self->slot;
ResGroupData *group = self->group;
uint32 remainGlobalSharedMem = 0;
uint32 safeChunksThreshold100 = 0;
if (group)
{
Assert(selfIsAssigned());
remainGlobalSharedMem = (uint32) pg_atomic_read_u32(&pResGroupControl->freeChunks);
safeChunksThreshold100 = (uint32) pg_atomic_read_u32(&pResGroupControl->safeChunksThreshold100);
appendStringInfo(str,
"current group id is %u, "
"group memory usage %d MB, "
"group shared memory quota is %d MB, "
"slot memory quota is %d MB, "
"global freechunks memory is %u MB, "
"global safe memory threshold is %u MB",
group->groupId,
VmemTracker_ConvertVmemChunksToMB(group->memUsage),
VmemTracker_ConvertVmemChunksToMB(group->memSharedGranted),
VmemTracker_ConvertVmemChunksToMB(slot->memQuota),
VmemTracker_ConvertVmemChunksToMB(remainGlobalSharedMem),
VmemTracker_ConvertVmemChunksToMB(safeChunksThreshold100 / 100));
}
else
{
Assert(!selfIsAssigned());
appendStringInfo(str,
"Resource group memory information: "
"memory usage in current proc is %d MB",
VmemTracker_ConvertVmemChunksToMB(self->memUsage));
}
}
/*
* Return group id for a session
*/
Oid
SessionGetResGroupId(SessionState *session)
{
ResGroupSlotData *sessionSlot = (ResGroupSlotData *)session->resGroupSlot;
if (sessionSlot)
return sessionSlot->groupId;
else
return InvalidOid;
}
/*
* Return group global share memory for a session
*/
int32
SessionGetResGroupGlobalShareMemUsage(SessionState *session)
{
ResGroupSlotData *sessionSlot = (ResGroupSlotData *)session->resGroupSlot;
if (sessionSlot)
{
/* lock not needed here, we just need esimated result */
ResGroupData *group = sessionSlot->group;
return group->memSharedUsage - group->memSharedGranted;
}
else
{
/* session doesnot have group slot */
return 0;
}
}
/*
* move a proc to a resource group
*/
void
HandleMoveResourceGroup(void)
{
ResGroupSlotData *slot;
ResGroupData *group;
ResGroupData *oldGroup;
/* transaction has finished */
if (!selfIsAssigned())
return;
if (Gp_role == GP_ROLE_DISPATCH)
{
slot = (ResGroupSlotData *)MyProc->movetoResSlot;
group = slot->group;
MyProc->movetoResSlot = NULL;
/* unassign the old resource group and release the old slot */
UnassignResGroup(true);
PG_TRY();
{
sessionSetSlot(slot);
/* Add proc memory accounting info into group and slot */
selfAttachResGroup(group, slot);
/* Init self */
self->caps = slot->caps;
/* Add into cgroup */
ResGroupOps_AssignGroup(self->groupId, &(self->caps), MyProcPid);
}
PG_CATCH();
{
UnassignResGroup(false);
PG_RE_THROW();
}
PG_END_TRY();
pgstat_report_resgroup(self->groupId);
}
else if (Gp_role == GP_ROLE_EXECUTE)
{
Oid groupId = MyProc->movetoGroupId;
MyProc->movetoGroupId = InvalidOid;
slot = sessionGetSlot();
Assert(slot != NULL);
selfUnsetSlot();
selfUnsetGroup();
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
group = groupHashFind(groupId, true);
oldGroup = slot->group;
Assert(group != NULL);
Assert(oldGroup != NULL);
/*
* move the slot memory to the new group, only do it once
* if there're more than once slice.
*/
if (slot->groupId != groupId)
{
/* deduct the slot memory from the old group */
groupDecSlotMemUsage(oldGroup, slot);
oldGroup->memQuotaUsed -= slot->memQuota;
oldGroup->nRunning--;
/* reset the slot but don't touch the 'memUsage' */
slot->groupId = groupId;
slot->group = group;
slot->caps = group->caps;
slot->memQuota = slotGetMemQuotaOnQE(&group->caps, group);
/* add the slot memory to the new group */
mempoolAutoReserve(group, &group->caps);
groupIncSlotMemUsage(group, slot);
group->memQuotaUsed += slot->memQuota;
group->nRunning++;
Assert(group->memQuotaUsed <= group->memQuotaGranted);
}
/* add the memory of entryDB to slot and group */
if (IS_QUERY_DISPATCHER())
selfAttachResGroup(group, slot);
LWLockRelease(ResGroupLock);
selfSetGroup(group);
selfSetSlot(slot);
self->caps = group->caps;
/* finally we can say we are in a valid resgroup */
Assert(selfIsAssigned());
/* Add into cgroup */
ResGroupOps_AssignGroup(self->groupId, &(self->caps), MyProcPid);
}
}
static bool
hasEnoughMemory(int32 memUsed, int32 availMem)
{
return memUsed < availMem;
}
/*
* Check if there are enough memory to move the query to the destination group
*/
static void
moveQueryCheck(int sessionId, Oid groupId)
{
char *cmd;
CdbPgResults cdb_pgresults = {NULL, 0};
int32 sessionMem = ResGroupGetSessionMemUsage(sessionId);
int32 availMem = ResGroupGetGroupAvailableMem(groupId);
if (sessionMem < 0)
elog(ERROR, "the process to move has ended");
if (!hasEnoughMemory(sessionMem, availMem))
elog(ERROR, "group %d doesn't have enough memory on master, expect:%d, available:%d", groupId, sessionMem, availMem);
cmd = psprintf("SELECT session_mem, available_mem from pg_resgroup_check_move_query(%d, %d)", sessionId, groupId);
CdbDispatchCommand(cmd, DF_WITH_SNAPSHOT, &cdb_pgresults);
for (int i = 0; i < cdb_pgresults.numResults; i++)
{
int i_session_mem;
int i_available_mem;
struct pg_result *pgresult = cdb_pgresults.pg_results[i];
if (PQresultStatus(pgresult) != PGRES_TUPLES_OK)
{
cdbdisp_clearCdbPgResults(&cdb_pgresults);
elog(ERROR, "pg_resgroup_check_move_query: resultStatus not tuples_Ok: %s %s",
PQresStatus(PQresultStatus(pgresult)), PQresultErrorMessage(pgresult));
}
Assert(PQntuples(pgresult) == 1);
i_session_mem = PQfnumber(pgresult, "session_mem");
i_available_mem = PQfnumber(pgresult, "available_mem");
Assert(!PQgetisnull(pgresult, 0, i_session_mem));
Assert(!PQgetisnull(pgresult, 0, i_available_mem));
sessionMem = pg_atoi(PQgetvalue(pgresult, 0, i_session_mem), sizeof(int32), 0);
availMem = pg_atoi(PQgetvalue(pgresult, 0, i_available_mem), sizeof(int32), 0);
if (sessionMem <= 0)
continue;
if (!hasEnoughMemory(sessionMem, availMem))
elog(ERROR, "group %d doesn't have enough memory on segment, expect:%d, available:%d", groupId, sessionMem, availMem);
}
cdbdisp_clearCdbPgResults(&cdb_pgresults);
}
void
ResGroupMoveQuery(int sessionId, Oid groupId, const char *groupName)
{
ResGroupInfo groupInfo;
ResGroupData *group;
ResGroupSlotData *slot;
char *cmd;
Assert(pResGroupControl != NULL);
Assert(pResGroupControl->segmentsOnMaster > 0);
Assert(Gp_role == GP_ROLE_DISPATCH);
LWLockAcquire(ResGroupLock, LW_SHARED);
group = groupHashFind(groupId, false);
if (!group)
{
LWLockRelease(ResGroupLock);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
(errmsg("invalid resource group id: %d", groupId))));
}
LWLockRelease(ResGroupLock);
groupInfo.group = group;
groupInfo.groupId = groupId;
slot = groupAcquireSlot(&groupInfo, true);
if (slot == NULL)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
(errmsg("cannot get slot in resource group %d", groupId))));
PG_TRY();
{
moveQueryCheck(sessionId, groupId);
ResGroupSignalMoveQuery(sessionId, slot, groupId);
cmd = psprintf("SELECT pg_resgroup_move_query(%d, %s)",
sessionId,
quote_literal_cstr(groupName));
CdbDispatchCommand(cmd, 0, NULL);
}
PG_CATCH();
{
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
groupReleaseSlot(group, slot, true);
LWLockRelease(ResGroupLock);
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* get resource group id by session id
*/
Oid
ResGroupGetGroupIdBySessionId(int sessionId)
{
Oid groupId = InvalidOid;
SessionState *curSessionState;
LWLockAcquire(SessionStateLock, LW_SHARED);
curSessionState = AllSessionStateEntries->usedList;
while (curSessionState != NULL)
{
if (curSessionState->sessionId == sessionId)
{
ResGroupSlotData *slot = (ResGroupSlotData *)curSessionState->resGroupSlot;
if (slot != NULL)
groupId = slot->groupId;
break;
}
curSessionState = curSessionState->next;
}
LWLockRelease(SessionStateLock);
return groupId;
}
/*
* get the memory usage of a session on one segment
*/
int32
ResGroupGetSessionMemUsage(int sessionId)
{
int32 memUsage = -1;
SessionState *curSessionState;
LWLockAcquire(SessionStateLock, LW_SHARED);
curSessionState = AllSessionStateEntries->usedList;
while (curSessionState != NULL)
{
if (curSessionState->sessionId == sessionId)
{
ResGroupSlotData *slot = (ResGroupSlotData *)curSessionState->resGroupSlot;
memUsage = (slot == NULL) ? 0 : slot->memUsage;
break;
}
curSessionState = curSessionState->next;
}
LWLockRelease(SessionStateLock);
return memUsage;
}
/*
* get the memory available in one resource group
*/
int32
ResGroupGetGroupAvailableMem(Oid groupId)
{
ResGroupData *group;
int availMem;
LWLockAcquire(ResGroupLock, LW_SHARED);
group = groupHashFind(groupId, true);
Assert(group != NULL);
if (group->caps.memLimit == RESGROUP_UNLIMITED_MEMORY_LIMIT)
availMem = (uint32) pg_atomic_read_u32(&pResGroupControl->freeChunks);
else
availMem = slotGetMemQuotaExpected(&group->caps) +
group->memSharedGranted - group->memSharedUsage;
LWLockRelease(ResGroupLock);
return availMem;
}
相关信息
相关文章
greenplumn resgroup-ops-dummy 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 openharmony
-
8、 golang
-
10、 Vue中input框自动聚焦