greenplumn gddfuncs 源码

  • 2022-08-18
  • 浏览 (332)

greenplumn gddfuncs 代码

文件路径:/src/backend/utils/gdd/gddfuncs.c

/*-------------------------------------------------------------------------
 *
 * gddfuncs.c
 *	  Global DeadLock Detector - Helper Functions
 *
 *
 * Copyright (c) 2018-Present VMware, Inc. or its affiliates.
 *
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbvars.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "storage/lock.h"
#include "storage/proc.h"
#include "utils/builtins.h"

typedef struct GddWaitStatusCtx GddWaitStatusCtx;

struct GddWaitStatusCtx
{
	LockData	*lockData;
	CdbPgResults cdb_pgresults;

	int			waiter;
	int			holder;

	int			seg;
	int			row;
};

static bool isGranted(LockInstanceData *lock);
static bool lockEqual(LockInstanceData *lock1, LockInstanceData *lock2);
static bool lockIsHoldTillEndXact(LockInstanceData *lock);

static bool
isGranted(LockInstanceData *lock)
{
	return lock->holdMask != 0;
}

static bool
lockEqual(LockInstanceData *lock1, LockInstanceData *lock2)
{
	LOCKTAG		*tag1 = &lock1->locktag;
	LOCKTAG		*tag2 = &lock2->locktag;

	return memcmp(tag1, tag2, sizeof(LOCKTAG)) == 0;
}

static bool
lockIsHoldTillEndXact(LockInstanceData *lock)
{
	if (lock->holdTillEndXact)
		return true;

	/* FIXME: other types */
	return false;
}

/*
 * gp_dist_wait_status - produce a view with one row per waiting relation
 */
Datum
gp_dist_wait_status(PG_FUNCTION_ARGS)
{
	FuncCallContext *funcctx;
	HeapTuple	tuple;
	Datum		result;
	Datum		values[10];
	bool		nulls[10];
	char        tnbuf[32];
	GddWaitStatusCtx *ctx;

	MemSet(values, 0, sizeof(values));
	MemSet(nulls, false, sizeof(nulls));

	if (SRF_IS_FIRSTCALL())
	{
		TupleDesc	tupdesc;
		MemoryContext oldcontext;

		/* create a function context for cross-call persistence */
		funcctx = SRF_FIRSTCALL_INIT();

		/*
		 * switch to memory context appropriate for multiple function calls
		 */
		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

		/* build tupdesc for result tuples */
		tupdesc = CreateTemplateTupleDesc(10);
		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "segid",
						   INT4OID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "waiter_dxid",
						   INT8OID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "holder_dxid",
						   INT8OID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "holdTillEndXact",
						   BOOLOID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 5, "waiter_lpid",
						   INT4OID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 6, "holder_lpid",
						   INT4OID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 7, "waiter_lockmode",
						   TEXTOID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 8, "waiter_locktype",
						   TEXTOID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 9, "waiter_sessiondid",
						   INT4OID, -1, 0);
		TupleDescInitEntry(tupdesc, (AttrNumber) 10, "holder_sessionid",
						   INT4OID, -1, 0);

		funcctx->tuple_desc = BlessTupleDesc(tupdesc);

		ctx = palloc(sizeof(*ctx));
		funcctx->user_fctx = ctx;

		ctx->cdb_pgresults.pg_results = NULL;
		ctx->cdb_pgresults.numResults = 0;
		ctx->waiter = 0;
		ctx->holder = 0;
		ctx->seg = 0;
		ctx->row = 0;

		/*
		 * We need to collect the wait status from all the segments,
		 * so a dispatch is needed on QD.
		 */

		if (Gp_role == GP_ROLE_DISPATCH)
		{
			int			i;

			CdbDispatchCommand("SELECT * FROM pg_catalog.gp_dist_wait_status()",
							   DF_NONE, &ctx->cdb_pgresults);

			if (ctx->cdb_pgresults.numResults == 0)
				elog(ERROR, "gp_dist_wait_status() didn't get back any data from the segDBs");

			for (i = 0; i < ctx->cdb_pgresults.numResults; i++)
			{
				/*
				 * Any error here should have propagated into errbuf,
				 * so we shouldn't ever see anything other that tuples_ok here.
				 * But, check to be sure.
				 */
				if (PQresultStatus(ctx->cdb_pgresults.pg_results[i]) != PGRES_TUPLES_OK)
				{
					cdbdisp_clearCdbPgResults(&ctx->cdb_pgresults);
					elog(ERROR,"gp_dist_wait_status(): resultStatus not tuples_Ok");
				}

				/*
				 * This query better match the tupledesc we just made above.
				 */
				if (PQnfields(ctx->cdb_pgresults.pg_results[i]) != tupdesc->natts)
					elog(ERROR, "unexpected number of columns returned from pg_lock_status() on segment (%d, expected %d)",
						 PQnfields(ctx->cdb_pgresults.pg_results[i]), tupdesc->natts);
			}
		}

		/*
		 * QD information must be collected after the dispatch.
		 * Thus the wait relations on QD returned by this function
		 * guarantee to be more later than the wait relations on QEs.
		 * And this information can help while reducing edges on QD.
		 *
		 * Rule: When we are to reduce a dotted edge on QD, we have to think
		 * more. If the lock-holding vertex(transaction) of this QD-edge,
		 * is blocked on any QE. We should not reduce the edge at least
		 * now.
		 *
		 * If any QEs wait relation is later than QD's, we cannot use the
		 * above rule.
		 */
		ctx->lockData = GetLockStatusData();

		MemoryContextSwitchTo(oldcontext);
	}

	funcctx = SRF_PERCALL_SETUP();
	ctx = funcctx->user_fctx;

	/*
	 * Find out all the waiting relations
	 *
	 * A relation is that:
	 * (waiter.granted == false &&
	 *  holder.granted == true &&
	 *  waiter.waitLockMode conflict with holder.holdMask &&
	 *  waiter.pid != holder.pid &&
	 *  waiter.locktype == holder.locktype &&
	 *  waiter.locktags == holder.locktags)
	 *
	 * The actual logic here is to perform a nested loop like this:
	 *
	 *     for (waiter = 0; waiter < nelements; waiter++)
	 *         for (holder = 0; holder < nelements; holder++)
	 *             if (is_a_relation(waiter, holder))
	 *                 record the relation;
	 *
	 * But as we can only return one relation per call, we need to
	 * simulate the loops manually.
	 *
	 * One transaction can wait for more than one transaction on the same
	 * segment. For example, Tx A, B, and C all open a relation with shared
	 * mode, and Tx D try to open the same relation with exclusive mode, D
	 * will be blocked. There are 3 waiting edges(D-->A, D--->B, D--->C).
	 */
	while (ctx->waiter < ctx->lockData->nelements)
	{
		int			waiter = ctx->waiter;
		LockInstanceData	   *w_lock = &ctx->lockData->locks[waiter];

		/* A waiter should have granted == false */
		if (isGranted(w_lock))
		{
			ctx->waiter++;
			ctx->holder = 0;
			continue;
		}

		while (ctx->holder < ctx->lockData->nelements)
		{
			TransactionId w_dxid;
			TransactionId h_dxid;
			const char   *locktypename;
			int			holder = ctx->holder++;
			LockInstanceData	   *h_lock = &ctx->lockData->locks[holder];

			if (holder == waiter)
				continue;
			/* A holder should have granted == true */
			if (!isGranted(h_lock))
				continue;
			if (w_lock->pid == h_lock->pid)
				continue;
			/* If waiter and holder have different lock methods, they must not be conflict */
			if (w_lock->locktag.locktag_lockmethodid != h_lock->locktag.locktag_lockmethodid)
				continue;
			/* If waiter and holder are not conflict, should skip this edge */
			if (!CheckWaitLockModeConflictHoldMask(w_lock->locktag,
					w_lock->waitLockMode, h_lock->holdMask))
				continue;
			if (!lockEqual(w_lock, h_lock))
				continue;

			/* A valid waiting relation is found */

			/* Find out dxid differently on QD and QE */
			w_dxid = w_lock->distribXid;
			h_dxid = h_lock->distribXid;

			values[0] = Int32GetDatum(GpIdentity.segindex);
			values[1] = Int64GetDatum(w_dxid);
			values[2] = Int64GetDatum(h_dxid);
			values[3] = BoolGetDatum(lockIsHoldTillEndXact(w_lock));
			values[4] = Int32GetDatum(w_lock->pid);
			values[5] = Int32GetDatum(h_lock->pid);
			values[6] = CStringGetTextDatum(GetLockmodeName(w_lock->locktag.locktag_lockmethodid, w_lock->waitLockMode));

			if (w_lock->locktag.locktag_type <= LOCKTAG_LAST_TYPE)
				locktypename = LockTagTypeNames[w_lock->locktag.locktag_type];
			else
			{
				snprintf(tnbuf, sizeof(tnbuf), "unknown %d",
						 (int) w_lock->locktag.locktag_type);
				locktypename = tnbuf;
			}
			values[7] = CStringGetTextDatum(locktypename);
			values[8] = Int32GetDatum(w_lock->mppSessionId);
			values[9] = Int32GetDatum(h_lock->mppSessionId);

			tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
			result = HeapTupleGetDatum(tuple);

			SRF_RETURN_NEXT(funcctx, result);
		}

		ctx->waiter++;
		ctx->holder = 0;
	}

	if (Gp_role == GP_ROLE_DISPATCH)
	{
		while (ctx->seg < ctx->cdb_pgresults.numResults)
		{
			int seg = ctx->seg;
			struct pg_result *pg_result = ctx->cdb_pgresults.pg_results[seg];

			while (ctx->row < PQntuples(pg_result))
			{
				int row = ctx->row++;

				values[0] = Int32GetDatum(atoi(PQgetvalue(pg_result, row, 0)));
				values[1] = Int64GetDatum(atol(PQgetvalue(pg_result, row, 1)));
				values[2] = Int64GetDatum(atol(PQgetvalue(pg_result, row, 2)));
				values[3] = BoolGetDatum(strncmp(PQgetvalue(pg_result, row, 3), "t", 1) == 0);
				values[4] = Int32GetDatum(atoi(PQgetvalue(pg_result, row, 4)));
				values[5] = Int32GetDatum(atoi(PQgetvalue(pg_result, row, 5)));
				values[6] = CStringGetTextDatum(PQgetvalue(pg_result, row, 6));
				values[7] = CStringGetTextDatum(PQgetvalue(pg_result, row, 7));
				values[8] = Int32GetDatum(atoi(PQgetvalue(pg_result, row, 8)));
				values[9] = Int32GetDatum(atoi(PQgetvalue(pg_result, row, 9)));

				tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
				result = HeapTupleGetDatum(tuple);
				SRF_RETURN_NEXT(funcctx, result);
			}

			ctx->seg++;
			ctx->row = 0;
		}
	}

	cdbdisp_clearCdbPgResults(&ctx->cdb_pgresults);

	SRF_RETURN_DONE(funcctx);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn gddbackend 源码

greenplumn gdddetector 源码

greenplumn gdddetector 源码

greenplumn gdddetectorpriv 源码

0  赞