greenplumn execUtils 源码

  • 2022-08-18
  • 浏览 (222)

greenplumn execUtils 代码

文件路径:/src/backend/executor/execUtils.c

/*-------------------------------------------------------------------------
 *
 * execUtils.c
 *	  miscellaneous executor utility routines
 *
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  src/backend/executor/execUtils.c
 *
 *-------------------------------------------------------------------------
 */
/*
 * INTERFACE ROUTINES
 *		CreateExecutorState		Create/delete executor working state
 *		FreeExecutorState
 *		CreateExprContext
 *		CreateStandaloneExprContext
 *		FreeExprContext
 *		ReScanExprContext
 *
 *		ExecAssignExprContext	Common code for plan node init routines.
 *		etc
 *
 *		ExecOpenScanRelation	Common code for scan node init routines.
 *
 *		ExecInitRangeTable		Set up executor's range-table-related data.
 *
 *		ExecGetRangeTableRelation		Fetch Relation for a rangetable entry.
 *
 *		executor_errposition	Report syntactic position of an error.
 *
 *		RegisterExprContextCallback    Register function shutdown callback
 *		UnregisterExprContextCallback  Deregister function shutdown callback
 *
 *		GetAttributeByName		Runtime extraction of columns from tuples.
 *		GetAttributeByNum
 *
 *	 NOTES
 *		This file has traditionally been the place to stick misc.
 *		executor support stuff that doesn't really go anyplace else.
 */

#include "postgres.h"
#include "pgstat.h"

#include "access/genam.h"
#include "access/heapam.h"
#include "access/parallel.h"
#include "access/relscan.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/transam.h"
#include "catalog/index.h"
#include "executor/execdebug.h"
#include "executor/execUtils.h"
#include "executor/executor.h"
#include "jit/jit.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "parser/parsetree.h"
#include "partitioning/partdesc.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/typcache.h"

#include "nodes/primnodes.h"
#include "nodes/execnodes.h"

#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/ml_ipc.h"
#include "cdb/cdbmotion.h"
#include "cdb/cdbsreh.h"
#include "cdb/memquota.h"
#include "executor/instrument.h"
#include "executor/spi.h"
#include "utils/elog.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "storage/ipc.h"
#include "cdb/cdbllize.h"
#include "utils/guc.h"
#include "utils/workfile_mgr.h"
#include "utils/metrics_utils.h"

#include "cdb/memquota.h"

static bool tlist_matches_tupdesc(PlanState *ps, List *tlist, Index varno, TupleDesc tupdesc);
static void ShutdownExprContext(ExprContext *econtext, bool isCommit);
static List *flatten_logic_exprs(Node *node);


/* ----------------------------------------------------------------
 *				 Executor state and memory management functions
 * ----------------------------------------------------------------
 */

/* ----------------
 *		CreateExecutorState
 *
 *		Create and initialize an EState node, which is the root of
 *		working storage for an entire Executor invocation.
 *
 * Principally, this creates the per-query memory context that will be
 * used to hold all working data that lives till the end of the query.
 * Note that the per-query context will become a child of the caller's
 * CurrentMemoryContext.
 * ----------------
 */
EState *
CreateExecutorState(void)
{
	EState	   *estate;
	MemoryContext qcontext;
	MemoryContext oldcontext;

	/*
	 * Create the per-query context for this Executor run.
	 */
	qcontext = AllocSetContextCreate(CurrentMemoryContext,
									 "ExecutorState",
									 ALLOCSET_DEFAULT_SIZES);
	MemoryContextDeclareAccountingRoot(qcontext);

	/*
	 * Make the EState node within the per-query context.  This way, we don't
	 * need a separate pfree() operation for it at shutdown.
	 */
	oldcontext = MemoryContextSwitchTo(qcontext);

	estate = makeNode(EState);

	/*
	 * Initialize all fields of the Executor State structure
	 */
	estate->es_direction = ForwardScanDirection;
	estate->es_snapshot = InvalidSnapshot;	/* caller must initialize this */
	estate->es_crosscheck_snapshot = InvalidSnapshot;	/* no crosscheck */
	estate->es_range_table = NIL;
	estate->es_range_table_array = NULL;
	estate->es_range_table_size = 0;
	estate->es_relations = NULL;
	estate->es_rowmarks = NULL;
	estate->es_plannedstmt = NULL;

	estate->es_junkFilter = NULL;

	estate->es_output_cid = (CommandId) 0;

	estate->es_result_relations = NULL;
	estate->es_num_result_relations = 0;
	estate->es_result_relation_info = NULL;

	estate->es_root_result_relations = NULL;
	estate->es_num_root_result_relations = 0;

	estate->es_tuple_routing_result_relations = NIL;

	estate->es_trig_target_relations = NIL;

	estate->es_param_list_info = NULL;
	estate->es_param_exec_vals = NULL;

	estate->es_queryEnv = NULL;

	estate->es_query_cxt = qcontext;

	estate->es_tupleTable = NIL;

	estate->es_processed = 0;

	estate->es_top_eflags = 0;
	estate->es_instrument = 0;
	estate->es_finished = false;

	estate->es_exprcontexts = NIL;

	estate->es_subplanstates = NIL;

	estate->es_auxmodifytables = NIL;

	estate->es_per_tuple_exprcontext = NULL;

	estate->es_sourceText = NULL;

	estate->es_use_parallel_mode = false;

	estate->es_jit_flags = 0;
	estate->es_jit = NULL;

	estate->es_sliceTable = NULL;
	estate->interconnect_context = NULL;
	estate->motionlayer_context = NULL;
	estate->es_interconnect_is_setup = false;
	estate->active_recv_id = -1;
	estate->es_got_eos = false;
	estate->cancelUnfinished = false;

	estate->dispatcherState = NULL;

	estate->currentSliceId = 0;
	estate->eliminateAliens = false;

	/*
	 * Return the executor state structure
	 */
	MemoryContextSwitchTo(oldcontext);

	return estate;
}

/* ----------------
 *		FreeExecutorState
 *
 *		Release an EState along with all remaining working storage.
 *
 * Note: this is not responsible for releasing non-memory resources, such as
 * open relations or buffer pins.  But it will shut down any still-active
 * ExprContexts within the EState and deallocate associated JITed expressions.
 * That is sufficient cleanup for situations where the EState has only been
 * used for expression evaluation, and not to run a complete Plan.
 *
 * This can be called in any memory context ... so long as it's not one
 * of the ones to be freed.
 *
 * In Greenplum, this also clears the PartitionState, even though that's a
 * non-memory resource, as that can be allocated for expression evaluation even
 * when there is no Plan.
 * ----------------
 */
void
FreeExecutorState(EState *estate)
{
	/*
	 * Shut down and free any remaining ExprContexts.  We do this explicitly
	 * to ensure that any remaining shutdown callbacks get called (since they
	 * might need to release resources that aren't simply memory within the
	 * per-query memory context).
	 */
	while (estate->es_exprcontexts)
	{
		/*
		 * XXX: seems there ought to be a faster way to implement this than
		 * repeated list_delete(), no?
		 */
		FreeExprContext((ExprContext *) linitial(estate->es_exprcontexts),
						true);
		/* FreeExprContext removed the list link for us */
	}

	estate->dispatcherState = NULL;

	/* release JIT context, if allocated */
	if (estate->es_jit)
	{
		jit_release_context(estate->es_jit);
		estate->es_jit = NULL;
	}

	/* release partition directory, if allocated */
	if (estate->es_partition_directory)
	{
		DestroyPartitionDirectory(estate->es_partition_directory);
		estate->es_partition_directory = NULL;
	}

	/*
	 * Free the per-query memory context, thereby releasing all working
	 * memory, including the EState node itself.
	 */
	MemoryContextDelete(estate->es_query_cxt);
}

/*
 * Internal implementation for CreateExprContext() and CreateWorkExprContext()
 * that allows control over the AllocSet parameters.
 */
static ExprContext *
CreateExprContextInternal(EState *estate, Size minContextSize,
						  Size initBlockSize, Size maxBlockSize)
{
	ExprContext *econtext;
	MemoryContext oldcontext;

	/* Create the ExprContext node within the per-query memory context */
	oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);

	econtext = makeNode(ExprContext);

	/* Initialize fields of ExprContext */
	econtext->ecxt_scantuple = NULL;
	econtext->ecxt_innertuple = NULL;
	econtext->ecxt_outertuple = NULL;

	econtext->ecxt_per_query_memory = estate->es_query_cxt;

	/*
	 * Create working memory for expression evaluation in this context.
	 */
	econtext->ecxt_per_tuple_memory =
		AllocSetContextCreate(estate->es_query_cxt,
							  "ExprContext",
							  minContextSize,
							  initBlockSize,
							  maxBlockSize);

	econtext->ecxt_param_exec_vals = estate->es_param_exec_vals;
	econtext->ecxt_param_list_info = estate->es_param_list_info;

	econtext->ecxt_aggvalues = NULL;
	econtext->ecxt_aggnulls = NULL;

	econtext->caseValue_datum = (Datum) 0;
	econtext->caseValue_isNull = true;

	econtext->domainValue_datum = (Datum) 0;
	econtext->domainValue_isNull = true;

	econtext->ecxt_estate = estate;

	econtext->ecxt_callbacks = NULL;

	/*
	 * Link the ExprContext into the EState to ensure it is shut down when the
	 * EState is freed.  Because we use lcons(), shutdowns will occur in
	 * reverse order of creation, which may not be essential but can't hurt.
	 */
	estate->es_exprcontexts = lcons(econtext, estate->es_exprcontexts);

	MemoryContextSwitchTo(oldcontext);

	return econtext;
}

/* ----------------
 *		CreateExprContext
 *
 *		Create a context for expression evaluation within an EState.
 *
 * An executor run may require multiple ExprContexts (we usually make one
 * for each Plan node, and a separate one for per-output-tuple processing
 * such as constraint checking).  Each ExprContext has its own "per-tuple"
 * memory context.
 *
 * Note we make no assumption about the caller's memory context.
 * ----------------
 */
ExprContext *
CreateExprContext(EState *estate)
{
	return CreateExprContextInternal(estate, ALLOCSET_DEFAULT_SIZES);
}


/* ----------------
 *		CreateWorkExprContext
 *
 * Like CreateExprContext, but specifies the AllocSet sizes to be reasonable
 * in proportion to work_mem. If the maximum block allocation size is too
 * large, it's easy to skip right past work_mem with a single allocation.
 * ----------------
 */
ExprContext *
CreateWorkExprContext(EState *estate)
{
	Size minContextSize = ALLOCSET_DEFAULT_MINSIZE;
	Size initBlockSize = ALLOCSET_DEFAULT_INITSIZE;
	Size maxBlockSize = ALLOCSET_DEFAULT_MAXSIZE;

	/* choose the maxBlockSize to be no larger than 1/16 of work_mem */
	while (16 * maxBlockSize > work_mem * 1024L)
		maxBlockSize >>= 1;

	if (maxBlockSize < ALLOCSET_DEFAULT_INITSIZE)
		maxBlockSize = ALLOCSET_DEFAULT_INITSIZE;

	return CreateExprContextInternal(estate, minContextSize,
									 initBlockSize, maxBlockSize);
}

/* ----------------
 *		CreateStandaloneExprContext
 *
 *		Create a context for standalone expression evaluation.
 *
 * An ExprContext made this way can be used for evaluation of expressions
 * that contain no Params, subplans, or Var references (it might work to
 * put tuple references into the scantuple field, but it seems unwise).
 *
 * The ExprContext struct is allocated in the caller's current memory
 * context, which also becomes its "per query" context.
 *
 * It is caller's responsibility to free the ExprContext when done,
 * or at least ensure that any shutdown callbacks have been called
 * (ReScanExprContext() is suitable).  Otherwise, non-memory resources
 * might be leaked.
 * ----------------
 */
ExprContext *
CreateStandaloneExprContext(void)
{
	ExprContext *econtext;

	/* Create the ExprContext node within the caller's memory context */
	econtext = makeNode(ExprContext);

	/* Initialize fields of ExprContext */
	econtext->ecxt_scantuple = NULL;
	econtext->ecxt_innertuple = NULL;
	econtext->ecxt_outertuple = NULL;

	econtext->ecxt_per_query_memory = CurrentMemoryContext;

	/*
	 * Create working memory for expression evaluation in this context.
	 */
	econtext->ecxt_per_tuple_memory =
		AllocSetContextCreate(CurrentMemoryContext,
							  "ExprContext",
							  ALLOCSET_DEFAULT_SIZES);

	econtext->ecxt_param_exec_vals = NULL;
	econtext->ecxt_param_list_info = NULL;

	econtext->ecxt_aggvalues = NULL;
	econtext->ecxt_aggnulls = NULL;

	econtext->caseValue_datum = (Datum) 0;
	econtext->caseValue_isNull = true;

	econtext->domainValue_datum = (Datum) 0;
	econtext->domainValue_isNull = true;

	econtext->ecxt_estate = NULL;

	econtext->ecxt_callbacks = NULL;

	return econtext;
}

/* ----------------
 *		FreeExprContext
 *
 *		Free an expression context, including calling any remaining
 *		shutdown callbacks.
 *
 * Since we free the temporary context used for expression evaluation,
 * any previously computed pass-by-reference expression result will go away!
 *
 * If isCommit is false, we are being called in error cleanup, and should
 * not call callbacks but only release memory.  (It might be better to call
 * the callbacks and pass the isCommit flag to them, but that would require
 * more invasive code changes than currently seems justified.)
 *
 * Note we make no assumption about the caller's memory context.
 * ----------------
 */
void
FreeExprContext(ExprContext *econtext, bool isCommit)
{
	EState	   *estate;

	/* Call any registered callbacks */
	ShutdownExprContext(econtext, isCommit);
	/* And clean up the memory used */
	MemoryContextDelete(econtext->ecxt_per_tuple_memory);
	/* Unlink self from owning EState, if any */
	estate = econtext->ecxt_estate;
	if (estate)
		estate->es_exprcontexts = list_delete_ptr(estate->es_exprcontexts,
												  econtext);
	/* And delete the ExprContext node */
	pfree(econtext);
}

/*
 * ReScanExprContext
 *
 *		Reset an expression context in preparation for a rescan of its
 *		plan node.  This requires calling any registered shutdown callbacks,
 *		since any partially complete set-returning-functions must be canceled.
 *
 * Note we make no assumption about the caller's memory context.
 */
void
ReScanExprContext(ExprContext *econtext)
{
	/* Call any registered callbacks */
	ShutdownExprContext(econtext, true);
	/* And clean up the memory used */
	MemoryContextReset(econtext->ecxt_per_tuple_memory);
}

/*
 * Build a per-output-tuple ExprContext for an EState.
 *
 * This is normally invoked via GetPerTupleExprContext() macro,
 * not directly.
 */
ExprContext *
MakePerTupleExprContext(EState *estate)
{
	if (estate->es_per_tuple_exprcontext == NULL)
		estate->es_per_tuple_exprcontext = CreateExprContext(estate);

	return estate->es_per_tuple_exprcontext;
}


/* ----------------------------------------------------------------
 *				 miscellaneous node-init support functions
 *
 * Note: all of these are expected to be called with CurrentMemoryContext
 * equal to the per-query memory context.
 * ----------------------------------------------------------------
 */

/* ----------------
 *		ExecAssignExprContext
 *
 *		This initializes the ps_ExprContext field.  It is only necessary
 *		to do this for nodes which use ExecQual or ExecProject
 *		because those routines require an econtext. Other nodes that
 *		don't have to evaluate expressions don't need to do this.
 * ----------------
 */
void
ExecAssignExprContext(EState *estate, PlanState *planstate)
{
	planstate->ps_ExprContext = CreateExprContext(estate);
}

/* ----------------
 *		ExecGetResultType
 * ----------------
 */
TupleDesc
ExecGetResultType(PlanState *planstate)
{
	return planstate->ps_ResultTupleDesc;
}

/*
 * ExecGetResultSlotOps - information about node's type of result slot
 */
const TupleTableSlotOps *
ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
{
	if (planstate->resultopsset && planstate->resultops)
	{
		if (isfixed)
			*isfixed = planstate->resultopsfixed;
		return planstate->resultops;
	}

	if (isfixed)
	{
		if (planstate->resultopsset)
			*isfixed = planstate->resultopsfixed;
		else if (planstate->ps_ResultTupleSlot)
			*isfixed = TTS_FIXED(planstate->ps_ResultTupleSlot);
		else
			*isfixed = false;
	}

	if (!planstate->ps_ResultTupleSlot)
		return &TTSOpsVirtual;

	return planstate->ps_ResultTupleSlot->tts_ops;
}


/* ----------------
 *		ExecAssignProjectionInfo
 *
 * forms the projection information from the node's targetlist
 *
 * Notes for inputDesc are same as for ExecBuildProjectionInfo: supply it
 * for a relation-scan node, can pass NULL for upper-level nodes
 * ----------------
 */
void
ExecAssignProjectionInfo(PlanState *planstate,
						 TupleDesc inputDesc)
{
	planstate->ps_ProjInfo =
		ExecBuildProjectionInfo(planstate->plan->targetlist,
								planstate->ps_ExprContext,
								planstate->ps_ResultTupleSlot,
								planstate,
								inputDesc);
}


/* ----------------
 *		ExecConditionalAssignProjectionInfo
 *
 * as ExecAssignProjectionInfo, but store NULL rather than building projection
 * info if no projection is required
 * ----------------
 */
void
ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc,
									Index varno)
{
	if (tlist_matches_tupdesc(planstate,
							  planstate->plan->targetlist,
							  varno,
							  inputDesc))
	{
		planstate->ps_ProjInfo = NULL;
		planstate->resultopsset = planstate->scanopsset;
		planstate->resultopsfixed = planstate->scanopsfixed;
		planstate->resultops = planstate->scanops;
	}
	else
	{
		if (!planstate->ps_ResultTupleSlot)
		{
			ExecInitResultSlot(planstate, &TTSOpsVirtual);
			planstate->resultops = &TTSOpsVirtual;
			planstate->resultopsfixed = true;
			planstate->resultopsset = true;
		}
		ExecAssignProjectionInfo(planstate, inputDesc);
	}
}

static bool
tlist_matches_tupdesc(PlanState *ps, List *tlist, Index varno, TupleDesc tupdesc)
{
	int			numattrs = tupdesc->natts;
	int			attrno;
	ListCell   *tlist_item = list_head(tlist);

	/* Check the tlist attributes */
	for (attrno = 1; attrno <= numattrs; attrno++)
	{
		Form_pg_attribute att_tup = TupleDescAttr(tupdesc, attrno - 1);
		Var		   *var;

		if (tlist_item == NULL)
			return false;		/* tlist too short */
		var = (Var *) ((TargetEntry *) lfirst(tlist_item))->expr;
		if (!var || !IsA(var, Var))
			return false;		/* tlist item not a Var */
		/* if these Asserts fail, planner messed up */
		Assert(var->varno == varno);
		Assert(var->varlevelsup == 0);
		if (var->varattno != attrno)
			return false;		/* out of order */
		if (att_tup->attisdropped)
			return false;		/* table contains dropped columns */
		if (att_tup->atthasmissing)
			return false;		/* table contains cols with missing values */

		/*
		 * Note: usually the Var's type should match the tupdesc exactly, but
		 * in situations involving unions of columns that have different
		 * typmods, the Var may have come from above the union and hence have
		 * typmod -1.  This is a legitimate situation since the Var still
		 * describes the column, just not as exactly as the tupdesc does. We
		 * could change the planner to prevent it, but it'd then insert
		 * projection steps just to convert from specific typmod to typmod -1,
		 * which is pretty silly.
		 */
		if (var->vartype != att_tup->atttypid ||
			(var->vartypmod != att_tup->atttypmod &&
			 var->vartypmod != -1))
			return false;		/* type mismatch */

		tlist_item = lnext(tlist_item);
	}

	if (tlist_item)
		return false;			/* tlist too long */

	return true;
}

/* ----------------
 *		ExecFreeExprContext
 *
 * A plan node's ExprContext should be freed explicitly during executor
 * shutdown because there may be shutdown callbacks to call.  (Other resources
 * made by the above routines, such as projection info, don't need to be freed
 * explicitly because they're just memory in the per-query memory context.)
 *
 * However ... there is no particular need to do it during ExecEndNode,
 * because FreeExecutorState will free any remaining ExprContexts within
 * the EState.  Letting FreeExecutorState do it allows the ExprContexts to
 * be freed in reverse order of creation, rather than order of creation as
 * will happen if we delete them here, which saves O(N^2) work in the list
 * cleanup inside FreeExprContext.
 * ----------------
 */
void
ExecFreeExprContext(PlanState *planstate)
{
	/*
	 * Per above discussion, don't actually delete the ExprContext. We do
	 * unlink it from the plan node, though.
	 */
	planstate->ps_ExprContext = NULL;
}


/* ----------------------------------------------------------------
 *				  Scan node support
 * ----------------------------------------------------------------
 */

/* ----------------
 *		ExecAssignScanType
 * ----------------
 */
void
ExecAssignScanType(ScanState *scanstate, TupleDesc tupDesc)
{
	TupleTableSlot *slot = scanstate->ss_ScanTupleSlot;

	ExecSetSlotDescriptor(slot, tupDesc);
}

/* ----------------
 *		ExecCreateScanSlotFromOuterPlan
 * ----------------
 */
void
ExecCreateScanSlotFromOuterPlan(EState *estate,
								ScanState *scanstate,
								const TupleTableSlotOps *tts_ops)
{
	PlanState  *outerPlan;
	TupleDesc	tupDesc;

	outerPlan = outerPlanState(scanstate);
	tupDesc = ExecGetResultType(outerPlan);

	ExecInitScanTupleSlot(estate, scanstate, tupDesc, tts_ops);
}

/* ----------------------------------------------------------------
 *		ExecRelationIsTargetRelation
 *
 *		Detect whether a relation (identified by rangetable index)
 *		is one of the target relations of the query.
 *
 * Note: This is currently no longer used in core.  We keep it around
 * because FDWs may wish to use it to determine if their foreign table
 * is a target relation.
 * ----------------------------------------------------------------
 */
bool
ExecRelationIsTargetRelation(EState *estate, Index scanrelid)
{
	ResultRelInfo *resultRelInfos;
	int			i;

	resultRelInfos = estate->es_result_relations;
	for (i = 0; i < estate->es_num_result_relations; i++)
	{
		if (resultRelInfos[i].ri_RangeTableIndex == scanrelid)
			return true;
	}
	return false;
}

/* ----------------------------------------------------------------
 *		ExecOpenScanRelation
 *
 *		Open the heap relation to be scanned by a base-level scan plan node.
 *		This should be called during the node's ExecInit routine.
 * ----------------------------------------------------------------
 */
Relation
ExecOpenScanRelation(EState *estate, Index scanrelid, int eflags)
{
	Relation	rel;

	/* Open the relation. */
	rel = ExecGetRangeTableRelation(estate, scanrelid);

	/*
	 * Complain if we're attempting a scan of an unscannable relation, except
	 * when the query won't actually be run.  This is a slightly klugy place
	 * to do this, perhaps, but there is no better place.
	 */
	if ((eflags & (EXEC_FLAG_EXPLAIN_ONLY | EXEC_FLAG_WITH_NO_DATA)) == 0 &&
		!RelationIsScannable(rel))
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("materialized view \"%s\" has not been populated",
						RelationGetRelationName(rel)),
				 errhint("Use the REFRESH MATERIALIZED VIEW command.")));

	return rel;
}

/*
 * ExecInitRangeTable
 *		Set up executor's range-table-related data
 *
 * We build an array from the range table list to allow faster lookup by RTI.
 * (The es_range_table field is now somewhat redundant, but we keep it to
 * avoid breaking external code unnecessarily.)
 * This is also a convenient place to set up the parallel es_relations array.
 */
void
ExecInitRangeTable(EState *estate, List *rangeTable)
{
	Index		rti;
	ListCell   *lc;

	/* Remember the range table List as-is */
	estate->es_range_table = rangeTable;

	/* Set up the equivalent array representation */
	estate->es_range_table_size = list_length(rangeTable);
	estate->es_range_table_array = (RangeTblEntry **)
		palloc(estate->es_range_table_size * sizeof(RangeTblEntry *));
	rti = 0;
	foreach(lc, rangeTable)
	{
		estate->es_range_table_array[rti++] = lfirst_node(RangeTblEntry, lc);
	}

	/*
	 * Allocate an array to store an open Relation corresponding to each
	 * rangetable entry, and initialize entries to NULL.  Relations are opened
	 * and stored here as needed.
	 */
	estate->es_relations = (Relation *)
		palloc0(estate->es_range_table_size * sizeof(Relation));

	/*
	 * es_rowmarks is also parallel to the es_range_table_array, but it's
	 * allocated only if needed.
	 */
	estate->es_rowmarks = NULL;
}

/*
 * ExecGetRangeTableRelation
 *		Open the Relation for a range table entry, if not already done
 *
 * The Relations will be closed again in ExecEndPlan().
 */
Relation
ExecGetRangeTableRelation(EState *estate, Index rti)
{
	Relation	rel;

	Assert(rti > 0 && rti <= estate->es_range_table_size);


	rel = estate->es_relations[rti - 1];
	if (rel == NULL)
	{
		/* First time through, so open the relation */
		RangeTblEntry *rte = exec_rt_fetch(rti, estate);

		Assert(rte->rtekind == RTE_RELATION);

		/* GPDB: a QE process is not holding the locks yet, same as a parallel worker. */
		if (!IsParallelWorker() && Gp_role != GP_ROLE_EXECUTE)
		{
			/*
			 * In a normal query, we should already have the appropriate lock,
			 * but verify that through an Assert.  Since there's already an
			 * Assert inside table_open that insists on holding some lock, it
			 * seems sufficient to check this only when rellockmode is higher
			 * than the minimum.
			 */
			rel = table_open(rte->relid, NoLock);
			Assert(rte->rellockmode == AccessShareLock ||
				   CheckRelationLockedByMe(rel, rte->rellockmode, false));
		}
		else
		{
			/*
			 * If we are a parallel worker, we need to obtain our own local
			 * lock on the relation.  This ensures sane behavior in case the
			 * parent process exits before we do.
			 */
			rel = table_open(rte->relid, rte->rellockmode);
		}

		estate->es_relations[rti - 1] = rel;
	}

	return rel;
}

/*
 * UpdateChangedParamSet
 *		Add changed parameters to a plan node's chgParam set
 */
void
UpdateChangedParamSet(PlanState *node, Bitmapset *newchg)
{
	Bitmapset  *parmset;

	/*
	 * The plan node only depends on params listed in its allParam set. Don't
	 * include anything else into its chgParam set.
	 */
	parmset = bms_intersect(node->plan->allParam, newchg);

	/*
	 * Keep node->chgParam == NULL if there's not actually any members; this
	 * allows the simplest possible tests in executor node files.
	 */
	if (!bms_is_empty(parmset))
		node->chgParam = bms_join(node->chgParam, parmset);
	else
		bms_free(parmset);
}

/*
 * executor_errposition
 *		Report an execution-time cursor position, if possible.
 *
 * This is expected to be used within an ereport() call.  The return value
 * is a dummy (always 0, in fact).
 *
 * The locations stored in parsetrees are byte offsets into the source string.
 * We have to convert them to 1-based character indexes for reporting to
 * clients.  (We do things this way to avoid unnecessary overhead in the
 * normal non-error case: computing character indexes would be much more
 * expensive than storing token offsets.)
 */
void
executor_errposition(EState *estate, int location)
{
	int			pos;

	/* No-op if location was not provided */
	if (location < 0)
		return;
	/* Can't do anything if source text is not available */
	if (estate == NULL || estate->es_sourceText == NULL)
		return;
	/* Convert offset to character number */
	pos = pg_mbstrlen_with_len(estate->es_sourceText, location) + 1;
	/* And pass it to the ereport mechanism */
	errposition(pos);
}

/*
 * Register a shutdown callback in an ExprContext.
 *
 * Shutdown callbacks will be called (in reverse order of registration)
 * when the ExprContext is deleted or rescanned.  This provides a hook
 * for functions called in the context to do any cleanup needed --- it's
 * particularly useful for functions returning sets.  Note that the
 * callback will *not* be called in the event that execution is aborted
 * by an error.
 */
void
RegisterExprContextCallback(ExprContext *econtext,
							ExprContextCallbackFunction function,
							Datum arg)
{
	ExprContext_CB *ecxt_callback;

	/* Save the info in appropriate memory context */
	ecxt_callback = (ExprContext_CB *)
		MemoryContextAlloc(econtext->ecxt_per_query_memory,
						   sizeof(ExprContext_CB));

	ecxt_callback->function = function;
	ecxt_callback->arg = arg;

	/* link to front of list for appropriate execution order */
	ecxt_callback->next = econtext->ecxt_callbacks;
	econtext->ecxt_callbacks = ecxt_callback;
}

/*
 * Deregister a shutdown callback in an ExprContext.
 *
 * Any list entries matching the function and arg will be removed.
 * This can be used if it's no longer necessary to call the callback.
 */
void
UnregisterExprContextCallback(ExprContext *econtext,
							  ExprContextCallbackFunction function,
							  Datum arg)
{
	ExprContext_CB **prev_callback;
	ExprContext_CB *ecxt_callback;

	prev_callback = &econtext->ecxt_callbacks;

	while ((ecxt_callback = *prev_callback) != NULL)
	{
		if (ecxt_callback->function == function && ecxt_callback->arg == arg)
		{
			*prev_callback = ecxt_callback->next;
			pfree(ecxt_callback);
		}
		else
			prev_callback = &ecxt_callback->next;
	}
}

/*
 * Call all the shutdown callbacks registered in an ExprContext.
 *
 * The callback list is emptied (important in case this is only a rescan
 * reset, and not deletion of the ExprContext).
 *
 * If isCommit is false, just clean the callback list but don't call 'em.
 * (See comment for FreeExprContext.)
 */
static void
ShutdownExprContext(ExprContext *econtext, bool isCommit)
{
	ExprContext_CB *ecxt_callback;
	MemoryContext oldcontext;

	/* Fast path in normal case where there's nothing to do. */
	if (econtext->ecxt_callbacks == NULL)
		return;

	/*
	 * Call the callbacks in econtext's per-tuple context.  This ensures that
	 * any memory they might leak will get cleaned up.
	 */
	oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);

	/*
	 * Call each callback function in reverse registration order.
	 */
	while ((ecxt_callback = econtext->ecxt_callbacks) != NULL)
	{
		econtext->ecxt_callbacks = ecxt_callback->next;
		if (isCommit)
			ecxt_callback->function(ecxt_callback->arg);
		pfree(ecxt_callback);
	}

	MemoryContextSwitchTo(oldcontext);
}

/*
 * flatten_logic_exprs
 * This function is only used by ExecPrefetchJoinQual.
 * ExecPrefetchJoinQual need to prefetch subplan in join
 * qual that contains motion to materialize it to avoid
 * motion deadlock. This function is going to flatten
 * the bool exprs to avoid shortcut of bool logic.
 * An example is:
 * (a and b or c) or (d or e and f or g) and (h and i or j)
 * will be transformed to
 * (a, b, c, d, e, f, g, h, i, j).
 */
static List *
flatten_logic_exprs(Node *node)
{
	if (node == NULL)
		return NIL;

	if (IsA(node, BoolExpr))
	{
		BoolExpr *be = (BoolExpr *) node;
		return flatten_logic_exprs((Node *) (be->args));
	}

	if (IsA(node, List))
	{
		List     *es = (List *) node;
		List     *result = NIL;
		ListCell *lc = NULL;

		foreach(lc, es)
		{
			Node *n = (Node *) lfirst(lc);
			result = list_concat(result,
								 flatten_logic_exprs(n));
		}

		return result;
	}

	return list_make1(node);
}

/*
 * fake_outer_params
 *   helper function to fake the nestloop's nestParams
 *   so that prefetch inner or prefetch joinqual will
 *   not encounter NULL pointer reference issue. It is
 *   only invoked in ExecNestLoop and ExecPrefetchJoinQual
 *   when the join is a nestloop join.
 */
void
fake_outer_params(JoinState *node)
{
	ExprContext    *econtext = node->ps.ps_ExprContext;
	PlanState      *inner = innerPlanState(node);
	TupleTableSlot *outerTupleSlot = econtext->ecxt_outertuple;
	NestLoop       *nl = (NestLoop *) (node->ps.plan);
	ListCell       *lc = NULL;

	/* only nestloop contains nestParams */
	Assert(IsA(node->ps.plan, NestLoop));

	/* econtext->ecxt_outertuple must have been set fakely. */
	Assert(outerTupleSlot != NULL);
	/*
	 * fetch the values of any outer Vars that must be passed to the
	 * inner scan, and store them in the appropriate PARAM_EXEC slots.
	 */
	foreach(lc, nl->nestParams)
	{
		NestLoopParam *nlp = (NestLoopParam *) lfirst(lc);
		int			paramno = nlp->paramno;
		ParamExecData *prm;

		prm = &(econtext->ecxt_param_exec_vals[paramno]);
		/* Param value should be an OUTER_VAR var */
		Assert(IsA(nlp->paramval, Var));
		Assert(nlp->paramval->varno == OUTER_VAR);
		Assert(nlp->paramval->varattno > 0);
		prm->value = slot_getattr(outerTupleSlot,
								  nlp->paramval->varattno,
								  &(prm->isnull));
		/* Flag parameter value as changed */
		inner->chgParam = bms_add_member(inner->chgParam,
										 paramno);
	}
}

/*
 * Prefetch JoinQual or NonJoinQual to prevent motion hazard.
 *
 * A motion hazard is a deadlock between motions, a classic motion hazard in a
 * join executor is formed by its inner and outer motions, it can be prevented
 * by prefetching the inner plan, refer to motion_sanity_check() for details.
 *
 * A similar motion hazard can be formed by the outer motion and the join qual
 * motion(or non join qual motion).  A join executor fetches a outer tuple,
 * filters it with the qual, then repeat the process on all the outer tuples.
 * When there are motions in both outer plan and the join qual then below state
 * is possible:
 *
 * 0. processes A and B belong to the join slice, process C belongs to the
 *    outer slice, process D belongs to the JoinQual(NonJoinQual) slice;
 * 1. A has read the first outer tuple and is fetching tuples from D;
 * 2. D is waiting for ACK from B;
 * 3. B is fetching the first outer tuple from C;
 * 4. C is waiting for ACK from A;
 *
 * So a deadlock is formed A->D->B->C->A.  We can prevent it also by
 * prefetching the join qual or non join qual
 *
 * An example is demonstrated and explained in test case
 * src/test/regress/sql/deadlock2.sql.
 *
 * Return true if the JoinQual or NonJoinQual is prefetched.
 */
void
ExecPrefetchQual(JoinState *node, bool isJoinQual)
{
	EState	   *estate = node->ps.state;
	ExprContext *econtext = node->ps.ps_ExprContext;
	PlanState  *inner = innerPlanState(node);
	PlanState  *outer = outerPlanState(node);
	TupleTableSlot *innertuple = econtext->ecxt_innertuple;

	ListCell   *lc = NULL;
	List       *quals = NIL;

	ExprState  *qual;

	if (isJoinQual)
		qual = node->joinqual;
	else
		qual = node->ps.qual;

	Assert(qual);

	/* Outer tuples should not be fetched before us */
	Assert(econtext->ecxt_outertuple == NULL);

	/* Build fake inner & outer tuples */
	econtext->ecxt_innertuple = ExecInitNullTupleSlot(estate,
													  ExecGetResultType(inner),
													  &TTSOpsVirtual);
	econtext->ecxt_outertuple = ExecInitNullTupleSlot(estate,
													  ExecGetResultType(outer),
													  &TTSOpsVirtual);

	if (IsA(node->ps.plan, NestLoop))
	{
		NestLoop *nl = (NestLoop *) (node->ps.plan);
		if (nl->nestParams)
			fake_outer_params(node);
	}

	quals = flatten_logic_exprs((Node *) qual);

	/* Fetch subplan with the fake inner & outer tuples */
	foreach(lc, quals)
	{
		/*
		 * Force every qual is prefech because
		 * our target is to materialize motion node.
		 */
		ExprState  *clause = (ExprState *) lfirst(lc);
		(void) ExecQual(clause, econtext);
	}

	/* Restore previous state */
	econtext->ecxt_innertuple = innertuple;
	econtext->ecxt_outertuple = NULL;
}

/* ----------------------------------------------------------------
 *		CDB Slice Table utilities
 * ----------------------------------------------------------------
 */


static void
FillSliceGangInfo(ExecSlice *slice, PlanSlice *ps)
{
	int numsegments = ps->numsegments;
	DirectDispatchInfo *dd = &ps->directDispatch;

	switch (slice->gangType)
	{
		case GANGTYPE_UNALLOCATED:
			/*
			 * It's either the root slice or an InitPlan slice that runs in
			 * the QD process, or really unused slice.
			 */
			slice->planNumSegments = 1;
			break;
		case GANGTYPE_PRIMARY_WRITER:
		case GANGTYPE_PRIMARY_READER:
			slice->planNumSegments = numsegments;
			if (dd->isDirectDispatch)
			{
				slice->segments = list_copy(dd->contentIds);
			}
			else
			{
				int i;
				slice->segments = NIL;
				for (i = 0; i < numsegments; i++)
					slice->segments = lappend_int(slice->segments, i % getgpsegmentCount());
			}
			break;
		case GANGTYPE_ENTRYDB_READER:
			slice->planNumSegments = 1;
			slice->segments = list_make1_int(-1);
			break;
		case GANGTYPE_SINGLETON_READER:
			slice->planNumSegments = 1;
			slice->segments = list_make1_int(ps->segindex);
			break;
		default:
			elog(ERROR, "unexpected gang type");
	}
}

/*
 * Create the executor slice table.
 *
 * The planner constructed a slice table, in plannedstmt->slices. Turn that
 * into an "executor slice table", with slightly more information. The gangs
 * to execute the slices will be set up later.
 */
SliceTable *
InitSliceTable(EState *estate, PlannedStmt *plannedstmt)
{
	SliceTable *table;
	int			i;
	int			numSlices;
	MemoryContext oldcontext;

	numSlices = plannedstmt->numSlices;
	Assert(numSlices > 0);

	if (gp_max_slices > 0 && numSlices > gp_max_slices)
		ereport(ERROR,
				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
				 errmsg("at most %d slices are allowed in a query, current number: %d",
						gp_max_slices, numSlices),
				 errhint("rewrite your query or adjust GUC gp_max_slices")));

	oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);

	table = makeNode(SliceTable);
	table->instrument_options = INSTRUMENT_NONE;
	table->hasMotions = false;

	/*
	 * Initialize the executor slice table.
	 *
	 * We have most of the information in the planner slice table. In addition to that,
	 * we set up the parent-child relationships.
	 */
	table->slices = palloc0(sizeof(ExecSlice) * numSlices);
	for (i = 0; i < numSlices; i++)
	{
		ExecSlice  *currExecSlice = &table->slices[i];
		PlanSlice  *currPlanSlice = &plannedstmt->slices[i];
		int			parentIndex;
		int			rootIndex;

		currExecSlice->sliceIndex = i;
		currExecSlice->planNumSegments = currPlanSlice->numsegments;
		currExecSlice->segments = NIL;
		currExecSlice->primaryGang = NULL;
		currExecSlice->primaryProcesses = NIL;

		parentIndex = currPlanSlice->parentIndex;
		if (parentIndex < -1 || parentIndex >= numSlices)
			elog(ERROR, "invalid parent slice index %d", currPlanSlice->parentIndex);

		if (parentIndex >= 0)
		{
			ExecSlice *parentExecSlice = &table->slices[parentIndex];
			int			counter;

			/* Sending slice is a child of recv slice */
			parentExecSlice->children = lappend_int(parentExecSlice->children, currPlanSlice->sliceIndex);

			/* Find the root slice */
			rootIndex = i;
			counter = 0;
			while (plannedstmt->slices[rootIndex].parentIndex >= 0)
			{
				rootIndex = plannedstmt->slices[rootIndex].parentIndex;

				if (counter++ > numSlices)
					elog(ERROR, "circular parent-child relationship in slice table");
			}
			table->hasMotions = true;
		}
		else
			rootIndex = i;

		/* find root of this slice. All the parents should be initialized already */

		currExecSlice->parentIndex = parentIndex;
		currExecSlice->rootIndex = rootIndex;
		currExecSlice->gangType = currPlanSlice->gangType;

		FillSliceGangInfo(currExecSlice, currPlanSlice);
	}
	table->numSlices = numSlices;

	/*
	 * For CTAS although the data is distributed on part of the
	 * segments, the catalog changes must be dispatched to all the
	 * segments, so a full gang is required.
	 */
	if ((plannedstmt->intoClause != NULL || plannedstmt->copyIntoClause != NULL || plannedstmt->refreshClause))
	{
		if (table->slices[0].gangType == GANGTYPE_PRIMARY_WRITER)
		{
			int			numsegments = getgpsegmentCount();

			table->slices[0].segments = NIL;
			for (i = 0; i < numsegments; i++)
				table->slices[0].segments = lappend_int(table->slices[0].segments, i);
		}
	}

	MemoryContextSwitchTo(oldcontext);

	return table;
}

/*
 * A forgiving slice table indexer that returns the indexed Slice* or NULL
 */
ExecSlice *
getCurrentSlice(EState *estate, int sliceIndex)
{
	SliceTable *sliceTable = estate->es_sliceTable;

    if (sliceTable &&
		sliceIndex >= 0 &&
		sliceIndex < sliceTable->numSlices)
		return &sliceTable->slices[sliceIndex];

    return NULL;
}

/* Should the slice run on the QD?
 *
 * N.B. Not the same as !sliceRunsOnQE(slice), when slice is NULL.
 */
bool
sliceRunsOnQD(ExecSlice *slice)
{
	return (slice != NULL && slice->gangType == GANGTYPE_UNALLOCATED);
}


/* Should the slice run on a QE?
 *
 * N.B. Not the same as !sliceRunsOnQD(slice), when slice is NULL.
 */
bool
sliceRunsOnQE(ExecSlice *slice)
{
	return (slice != NULL && slice->gangType != GANGTYPE_UNALLOCATED);
}

/* Forward declarations */
static bool AssignWriterGangFirst(CdbDispatcherState *ds, SliceTable *sliceTable, int sliceIndex);
static void InventorySliceTree(CdbDispatcherState *ds, SliceTable *sliceTable, int sliceIndex);

/*
 * Function AssignGangs runs on the QD and finishes construction of the
 * global slice table for a plan by assigning gangs allocated by the
 * executor factory to the slices of the slice table.
 *
 * On entry, the executor slice table (at queryDesc->estate->es_sliceTable)
 * has been initialized and has correct (by InitSliceTable function)
 *
 * Gang assignment involves taking an inventory of the requirements of
 * each slice tree in the slice table, asking the executor factory to
 * allocate a minimal set of gangs that can satisfy any of the slice trees,
 * and associating the allocated gangs with slices in the slice table.
 *
 * On successful exit, the CDBProcess lists (primaryProcesses, mirrorProcesses)
 * and the Gang pointers (primaryGang, mirrorGang) are set correctly in each
 * slice in the slice table.
 */
void
AssignGangs(CdbDispatcherState *ds, QueryDesc *queryDesc)
{
	SliceTable	*sliceTable;
	EState		*estate;
	int			rootIdx;

	estate = queryDesc->estate;
	sliceTable = estate->es_sliceTable;
	rootIdx = RootSliceIndex(queryDesc->estate);

	/* cleanup processMap because initPlan and main Plan share the same slice table */
	for (int i = 0; i < sliceTable->numSlices; i++)
		sliceTable->slices[i].processesMap = NULL;

	AssignWriterGangFirst(ds, sliceTable, rootIdx);
	InventorySliceTree(ds, sliceTable, rootIdx);
}

/*
 * AssignWriterGangFirst() - Try to assign writer gang first.
 *
 * For the gang allocation, our current implementation required the first
 * allocated gang must be the writer gang.
 * This has several reasons:
 * - For lock holding, Because of our MPP structure, we assign a LockHolder
 *   for each segment when executing a query. lockHolder is the gang member that
 *   should hold and manage locks for this transaction. On the QEs, it should
 *   normally be the Writer gang member. More details please refer to
 *   lockHolderProcPtr in lock.c.
 * - For SharedSnapshot among session's gang processes on a particular segment.
 *   During initPostgres(), reader QE will try to lookup the shared slot written
 *   by writer QE. More details please reger to sharedsnapshot.c.
 *
 * Normally, the writer slice will be assign writer gang first when iterate the
 * slice table. But this is not true for writable CTE (with only one writer gang).
 * For below statement:
 *
 * WITH updated AS (update tbl set rank = 6 where id = 5 returning rank)
 * select * from tbl where rank in (select rank from updated);
 *                                           QUERY PLAN
 * ----------------------------------------------------------------------------------------------
 *  Gather Motion 3:1  (slice1; segments: 3)
 *    ->  Seq Scan on tbl
 *          Filter: (hashed SubPlan 1)
 *          SubPlan 1
 *            ->  Broadcast Motion 1:3  (slice2; segments: 1)
 *                  ->  Update on tbl
 *                        ->  Seq Scan on tbl
 *                              Filter: (id = 5)
 *  Slice 0: Dispatcher; root 0; parent -1; gang size 0
 *  Slice 1: Reader; root 0; parent 0; gang size 3
 *  Slice 2: Primary Writer; root 0; parent 1; gang size 1
 *
 * If we sill assign writer gang to Slice 1 here, the writer process will execute
 * on reader gang. So, find the writer slice and assign writer gang first.
 */
static bool
AssignWriterGangFirst(CdbDispatcherState *ds, SliceTable *sliceTable, int sliceIndex)
{
	ExecSlice	   *slice = &sliceTable->slices[sliceIndex];

	if (slice->gangType == GANGTYPE_PRIMARY_WRITER)
	{
		Assert(slice->primaryGang == NULL);
		Assert(slice->segments != NIL);
		slice->primaryGang = AllocateGang(ds, slice->gangType, slice->segments);
		setupCdbProcessList(slice);
		return true;
	}
	else
	{
		ListCell *cell;
		foreach(cell, slice->children)
		{
			int			childIndex = lfirst_int(cell);
			if (AssignWriterGangFirst(ds, sliceTable, childIndex))
				return true;
		}
	}
	return false;
}

/*
 * Helper for AssignGangs takes a simple inventory of the gangs required
 * by a slice tree.  Recursive.  Closely coupled with AssignGangs.	Not
 * generally useful.
 */
static void
InventorySliceTree(CdbDispatcherState *ds, SliceTable *sliceTable, int sliceIndex)
{
	ExecSlice *slice = &sliceTable->slices[sliceIndex];
	ListCell *cell;

	if (slice->gangType == GANGTYPE_UNALLOCATED)
	{
		slice->primaryGang = NULL;
		slice->primaryProcesses = getCdbProcessesForQD(true);
	}
	else if (!slice->primaryGang)
	{
		Assert(slice->segments != NIL);
		slice->primaryGang = AllocateGang(ds, slice->gangType, slice->segments);
		setupCdbProcessList(slice);
	}

	foreach(cell, slice->children)
	{
		int			childIndex = lfirst_int(cell);

		InventorySliceTree(ds, sliceTable, childIndex);
	}
}

/*
 * Choose the execution identity (who does this executor serve?).
 * There are types:
 *
 * 1. No-Op (ignore) -- this occurs when the specified direction is
 *	 NoMovementScanDirection or when Gp_role is GP_ROLE_DISPATCH
 *	 and the current slice belongs to a QE.
 *
 * 2. Executor serves a Root Slice -- this occurs when Gp_role is
 *   GP_ROLE_UTILITY or the current slice is a root.  It corresponds
 *   to the "normal" path through the executor in that we enter the plan
 *   at the top and count on the motion nodes at the fringe of the top
 *   slice to return without ever calling nodes below them.
 *
 * 3. Executor serves a Non-Root Slice on a QE -- this occurs when
 *   Gp_role is GP_ROLE_EXECUTE and the current slice is not a root
 *   slice. It corresponds to a QE running a slice with a motion node on
 *	 top.  The call, thus, returns no tuples (since they all go out
 *	 on the interconnect to the receiver version of the motion node),
 *	 but it does execute the indicated slice down to any fringe
 *	 motion nodes (as in case 2).
 */
GpExecIdentity
getGpExecIdentity(QueryDesc *queryDesc,
				  ScanDirection direction,
				  EState	   *estate)
{
	ExecSlice *currentSlice;

	currentSlice = getCurrentSlice(estate, LocallyExecutingSliceIndex(estate));
	if (currentSlice)
    {
        if (Gp_role == GP_ROLE_EXECUTE ||
            sliceRunsOnQD(currentSlice))
            currentSliceId = currentSlice->sliceIndex;
    }

	/* select the strategy */
	if (direction == NoMovementScanDirection)
	{
		return GP_IGNORE;
	}
	else if (Gp_role == GP_ROLE_DISPATCH && sliceRunsOnQE(currentSlice))
	{
		return GP_IGNORE;
	}
	else if (Gp_role == GP_ROLE_EXECUTE && LocallyExecutingSliceIndex(estate) != RootSliceIndex(estate))
	{
		return GP_NON_ROOT_ON_QE;
	}
	else
	{
		return GP_ROOT_SLICE;
	}
}

/*
 * End the gp-specific part of the executor.
 *
 * In here we collect the dispatch results if there are any, tear
 * down the interconnect if it is set-up.
 */
void mppExecutorFinishup(QueryDesc *queryDesc)
{
	EState	   *estate;
	ExecSlice  *currentSlice;
	int			primaryWriterSliceIndex;

	/* caller must have switched into per-query memory context already */
	estate = queryDesc->estate;

	currentSlice = getCurrentSlice(estate, LocallyExecutingSliceIndex(estate));
	primaryWriterSliceIndex = PrimaryWriterSliceIndex(estate);

	/* Teardown the Interconnect */
	if (estate->es_interconnect_is_setup)
	{
		TeardownInterconnect(estate->interconnect_context, false);
		estate->interconnect_context = NULL;
		estate->es_interconnect_is_setup = false;
	}

	/*
	 * If QD, wait for QEs to finish and check their results.
	 */
	if (estate->dispatcherState && estate->dispatcherState->primaryResults)
	{
		CdbDispatchResults *pr = NULL;
		CdbDispatcherState *ds = estate->dispatcherState;
		DispatchWaitMode waitMode = DISPATCH_WAIT_NONE;
		ErrorData *qeError = NULL;

		/*
		 * If we are finishing a query before all the tuples of the query
		 * plan were fetched we must call ExecSquelchNode before checking
		 * the dispatch results in order to tell the nodes below we no longer
		 * need any more tuples.
		 */
		if (!estate->es_got_eos)
		{
			ExecSquelchNode(queryDesc->planstate);
		}

		/*
		 * Wait for completion of all QEs.  We send a "graceful" query
		 * finish, not cancel signal.  Since the query has succeeded,
		 * don't confuse QEs by sending erroneous message.
		 */
		if (estate->cancelUnfinished)
			waitMode = DISPATCH_WAIT_FINISH;

		cdbdisp_checkDispatchResult(ds, waitMode);

		pr = cdbdisp_getDispatchResults(ds, &qeError);

		if (qeError)
		{
			estate->dispatcherState = NULL;
			FlushErrorState();
			ReThrowError(qeError);
		}

		/* collect pgstat from QEs for current transaction level */
		pgstat_combine_from_qe(pr, primaryWriterSliceIndex);

		/* get num of rows processed from writer QEs. */
		estate->es_processed +=
			cdbdisp_sumCmdTuples(pr, primaryWriterSliceIndex);

		/* sum up rejected rows if any (single row error handling only) */
		cdbdisp_sumRejectedRows(pr);

		/*
		 * Check and free the results of all gangs. If any QE had an
		 * error, report it and exit to our error handler via PG_THROW.
		 * NB: This call doesn't wait, because we already waited above.
		 */
		estate->dispatcherState = NULL;
		cdbdisp_destroyDispatcherState(ds);
	}
}

/*
 * Cleanup the gp-specific parts of the query executor.
 *
 * Will normally be called after an error from within a CATCH block.
 */
void mppExecutorCleanup(QueryDesc *queryDesc)
{
	CdbDispatcherState *ds;
	EState	   *estate;

	/* caller must have switched into per-query memory context already */
	estate = queryDesc->estate;
	ds = estate->dispatcherState;

	/* GPDB hook for collecting query info */
	if (query_info_collect_hook && QueryCancelCleanup)
		(*query_info_collect_hook)(METRICS_QUERY_CANCELING, queryDesc);
	/* Clean up the interconnect. */
	if (estate->es_interconnect_is_setup)
	{
		TeardownInterconnect(estate->interconnect_context, true);
		estate->es_interconnect_is_setup = false;
	}

	/*
	 * Request any commands still executing on qExecs to stop.
	 * Wait for them to finish and clean up the dispatching structures.
	 * Replace current error info with QE error info if more interesting.
	 */
	if (ds)
	{
		estate->dispatcherState = NULL;
		CdbDispatchHandleError(ds);
	}

	/* GPDB hook for collecting query info */
	if (query_info_collect_hook)
		(*query_info_collect_hook)(QueryCancelCleanup ? METRICS_QUERY_CANCELED : METRICS_QUERY_ERROR, queryDesc);
	
	ReportOOMConsumption();

	/**
	 * Since there was an error, clean up the function scan stack.
	 */
	if (!IsResManagerMemoryPolicyNone())
	{
		SPI_InitMemoryReservation();
	}
}

/**
 * This method is used to determine how much memory a specific operator
 * is supposed to use (in KB). 
 */
uint64 PlanStateOperatorMemKB(const PlanState *ps)
{
	Assert(ps);
	Assert(ps->plan);
	uint64 result = 0;
	if (ps->plan->operatorMemKB == 0)
	{
		/**
		 * There are some statements that do not go through the resource queue and these
		 * plans dont get decorated with the operatorMemKB. Someday, we should fix resource queues.
		 */
		result = work_mem;
	}
	else
	{
		result = ps->plan->operatorMemKB;
	}
	
	return result;
}

/**
 * Methods to find motionstate object within a planstate tree given a motion id (which is the same as slice index)
 */
typedef struct MotionStateFinderContext
{
	int motionId; /* Input */
	MotionState *motionState; /* Output */
} MotionStateFinderContext;

/**
 * Walker method that finds motion state node within a planstate tree.
 */
static CdbVisitOpt
MotionStateFinderWalker(PlanState *node,
				  void *context)
{
	Assert(context);
	MotionStateFinderContext *ctx = (MotionStateFinderContext *) context;

	if (IsA(node, MotionState))
	{
		MotionState *ms = (MotionState *) node;
		Motion *m = (Motion *) ms->ps.plan;
		if (m->motionID == ctx->motionId)
		{
			Assert(ctx->motionState == NULL);
			ctx->motionState = ms;
			return CdbVisit_Skip;	/* don't visit subtree */
		}
	}

	/* Continue walking */
	return CdbVisit_Walk;
}

/**
 * Given a slice index, find the motionstate that corresponds to this slice index. This will iterate over the planstate tree
 * to get the right node.
 */
MotionState *
getMotionState(struct PlanState *ps, int sliceIndex)
{
	Assert(ps);
	Assert(sliceIndex > -1);

	MotionStateFinderContext ctx;
	ctx.motionId = sliceIndex;
	ctx.motionState = NULL;
	planstate_walk_node(ps, MotionStateFinderWalker, &ctx);

	if (ctx.motionState == NULL)
		elog(ERROR, "could not find MotionState for slice %d in executor tree", sliceIndex);

	return ctx.motionState;
}

typedef struct MotionFinderContext
{
	plan_tree_base_prefix base; /* Required prefix for plan_tree_walker/mutator */
	int motionId; /* Input */
	Motion *motion; /* Output */
} MotionFinderContext;

/*
 * Walker to find a motion node that matches a particular motionID
 */
static bool
MotionFinderWalker(Plan *node,
				  void *context)
{
	Assert(context);
	MotionFinderContext *ctx = (MotionFinderContext *) context;


	if (node == NULL)
		return false;

	if (IsA(node, Motion))
	{
		Motion *m = (Motion *) node;
		if (m->motionID == ctx->motionId)
		{
			ctx->motion = m;
			return true;	/* found our node; no more visit */
		}
	}

	/* Continue walking */
	return plan_tree_walker((Node*)node, MotionFinderWalker, ctx, true);
}

/*
 * Given the Plan and a Slice index, find the motion node that is the root of the slice's subtree.
 */
Motion *findSenderMotion(PlannedStmt *plannedstmt, int sliceIndex)
{
	Assert(sliceIndex > -1);

	Plan *planTree = plannedstmt->planTree;
	MotionFinderContext ctx;
	ctx.base.node = (Node*)plannedstmt;
	ctx.motionId = sliceIndex;
	ctx.motion = NULL;
	MotionFinderWalker(planTree, &ctx);
	return ctx.motion;
}

typedef struct SubPlanFinderContext
{
	plan_tree_base_prefix base; /* Required prefix for plan_tree_walker/mutator */
	Bitmapset *bms_subplans; /* Bitmapset for relevant subplans in current slice */
} SubPlanFinderContext;

/*
 * Walker to find all the subplans in a PlanTree between 'node' and the next motion node
 */
static bool
SubPlanFinderWalker(Node *node, void *context)
{
	SubPlanFinderContext *ctx = (SubPlanFinderContext *) context;

	if (node == NULL)
		return false;

	/* don't recurse into other slices */
	if (IsA(node, Motion))
		return false;

	if (IsA(node, SubPlan))
	{
		SubPlan *subplan = (SubPlan *) node;
		int		plan_id = subplan->plan_id;

		if (!bms_is_member(plan_id, ctx->bms_subplans))
			ctx->bms_subplans = bms_add_member(ctx->bms_subplans, plan_id);
		else
			return false;
	}

	/* Continue walking */
	return plan_tree_walker(node, SubPlanFinderWalker, ctx, true);
}

/*
 * Given a plan and a root motion node find all the subplans
 * between 'root' and the next motion node in the tree
 */
Bitmapset *
getLocallyExecutableSubplans(PlannedStmt *plannedstmt, Plan *root_plan)
{
	SubPlanFinderContext ctx;

	ctx.base.node = (Node *) plannedstmt;
	ctx.bms_subplans = NULL;

	/*
	 * Note that we begin with plan_tree_walker(root_plan), not
	 * SubPlanFinderWalker(root_plan). SubPlanFinderWalker() will stop
	 * at a Motion, but a slice typically has a Motion at the top. We want
	 * to recurse into the children of the top Motion, as well as any
	 * initPlans, targetlist, and other fields on the Motion itself. They
	 * are all considered part of the sending slice.
	 */
	(void) plan_tree_walker((Node *) root_plan, SubPlanFinderWalker, &ctx, true);

	return ctx.bms_subplans;
}

/*
 * Copy PARAM_EXEC parameter values that were received from the QD into
 * our EState.
 */
void
InstallDispatchedExecParams(QueryDispatchDesc *ddesc, EState *estate)
{
	Assert(Gp_role == GP_ROLE_EXECUTE);

	if (ddesc->paramInfo == NULL)
		return;

	for (int i = 0; i < ddesc->paramInfo->nExecParams; i++)
	{
		SerializedParamExecData *sprm = &ddesc->paramInfo->execParams[i];
		ParamExecData *prmExec = &estate->es_param_exec_vals[i];

		if (!sprm->isvalid)
			continue;	/* not dispatched */

		prmExec->execPlan = NULL;
		prmExec->value = sprm->value;
		prmExec->isnull = sprm->isnull;
	}
}

/**
 * Provide index of locally executing slice
 */
int LocallyExecutingSliceIndex(EState *estate)
{
	Assert(estate);
	return (!estate->es_sliceTable ? 0 : estate->es_sliceTable->localSlice);
}

/**
 * Provide index of slice being executed on the primary writer gang
 */
int
PrimaryWriterSliceIndex(EState *estate)
{
	if (!estate->es_sliceTable)
		return 0;

	for (int i = 0; i < estate->es_sliceTable->numSlices; i++)
	{
		ExecSlice  *slice = &estate->es_sliceTable->slices[i];

		if (slice->gangType == GANGTYPE_PRIMARY_WRITER)
			return slice->sliceIndex;
	}

	return 0;
}

/**
 * Provide root slice of locally executing slice.
 */
int
RootSliceIndex(EState *estate)
{
	int			result = 0;

	if (estate->es_sliceTable)
	{
		ExecSlice *localSlice = &estate->es_sliceTable->slices[LocallyExecutingSliceIndex(estate)];

		result = localSlice->rootIndex;

		Assert(result >= 0 && estate->es_sliceTable->numSlices);
	}

	return result;
}


#ifdef USE_ASSERT_CHECKING
/**
 * Assert that slicetable is valid. Must be called after ExecInitMotion, which sets up the slice table
 */
void AssertSliceTableIsValid(SliceTable *st)
{
	if (!st)
		return;

	for (int i = 0; i < st->numSlices; i++)
	{
		ExecSlice *s = &st->slices[i];

		/* The n-th slice entry has sliceIndex of n */
		Assert(s->sliceIndex == i && "slice index incorrect");

		/*
		 * FIXME: Sometimes the planner produces a plan with unused SubPlans, which
		 * might contain Motion nodes. We remove unused SubPlans as part cdbllize(), but
		 * there is a scenario with Append nodes where they still occur.
		 * adjust_appendrel_attrs() makes copies of any SubPlans it encounters, which
		 * happens early in the planning, leaving any SubPlans in target list of the
		 * Append node to point to the original plan_id. The scan in cdbllize() doesn't
		 * eliminate such SubPlans. But set_plan_references() will replace any SubPlans
		 * in the Append's targetlist with references to the outputs of the child nodes,
		 * leaving the original SubPlan unused.
		 *
		 * For now, just tolerate unused slices.
		 */
		if (s->rootIndex == -1 && s->parentIndex == -1 && s->gangType == GANGTYPE_UNALLOCATED)
			continue;

		/* Parent slice index */
		if (s->sliceIndex == s->rootIndex)
		{
			/* Current slice is a root slice. It will have parent index -1.*/
			Assert(s->parentIndex == -1 && "expecting parent index of -1");
		}
		else
		{
			/* All other slices must have a valid parent index */
			Assert(s->parentIndex >= 0 && s->parentIndex < st->numSlices && "slice's parent index out of range");
		}

		/* Current slice's children must consider it the parent */
		ListCell *lc1 = NULL;
		foreach (lc1, s->children)
		{
			int childIndex = lfirst_int(lc1);
			Assert(childIndex >= 0 && childIndex < st->numSlices && "invalid child slice");
			ExecSlice *sc = &st->slices[childIndex];
			Assert(sc->parentIndex == s->sliceIndex && "slice's child does not consider it the parent");
		}

		/* Current slice must be in its parent's children list */
		if (s->parentIndex >= 0)
		{
			ExecSlice *sp = &st->slices[s->parentIndex];

			bool found = false;
			foreach (lc1, sp->children)
			{
				int childIndex = lfirst_int(lc1);
				Assert(childIndex >= 0 && childIndex < st->numSlices && "invalid child slice");
				ExecSlice *sc = &st->slices[childIndex];

				if (sc->sliceIndex == s->sliceIndex)
				{
					found = true;
					break;
				}
			}

			Assert(found && "slice's parent does not consider it a child");
		}
	}
}
#endif

/*
 *		GetAttributeByName
 *		GetAttributeByNum
 *
 *		These functions return the value of the requested attribute
 *		out of the given tuple Datum.
 *		C functions which take a tuple as an argument are expected
 *		to use these.  Ex: overpaid(EMP) might call GetAttributeByNum().
 *		Note: these are actually rather slow because they do a typcache
 *		lookup on each call.
 */
Datum
GetAttributeByName(HeapTupleHeader tuple, const char *attname, bool *isNull)
{
	AttrNumber	attrno;
	Datum		result;
	Oid			tupType;
	int32		tupTypmod;
	TupleDesc	tupDesc;
	HeapTupleData tmptup;
	int			i;

	if (attname == NULL)
		elog(ERROR, "invalid attribute name");

	if (isNull == NULL)
		elog(ERROR, "a NULL isNull pointer was passed");

	if (tuple == NULL)
	{
		/* Kinda bogus but compatible with old behavior... */
		*isNull = true;
		return (Datum) 0;
	}

	tupType = HeapTupleHeaderGetTypeId(tuple);
	tupTypmod = HeapTupleHeaderGetTypMod(tuple);
	tupDesc = lookup_rowtype_tupdesc(tupType, tupTypmod);

	attrno = InvalidAttrNumber;
	for (i = 0; i < tupDesc->natts; i++)
	{
		Form_pg_attribute att = TupleDescAttr(tupDesc, i);

		if (namestrcmp(&(att->attname), attname) == 0)
		{
			attrno = att->attnum;
			break;
		}
	}

	if (attrno == InvalidAttrNumber)
		elog(ERROR, "attribute \"%s\" does not exist", attname);

	/*
	 * heap_getattr needs a HeapTuple not a bare HeapTupleHeader.  We set all
	 * the fields in the struct just in case user tries to inspect system
	 * columns.
	 */
	tmptup.t_len = HeapTupleHeaderGetDatumLength(tuple);
	ItemPointerSetInvalid(&(tmptup.t_self));
	tmptup.t_tableOid = InvalidOid;
	tmptup.t_data = tuple;

	result = heap_getattr(&tmptup,
						  attrno,
						  tupDesc,
						  isNull);

	ReleaseTupleDesc(tupDesc);

	return result;
}

Datum
GetAttributeByNum(HeapTupleHeader tuple,
				  AttrNumber attrno,
				  bool *isNull)
{
	Datum		result;
	Oid			tupType;
	int32		tupTypmod;
	TupleDesc	tupDesc;
	HeapTupleData tmptup;

	if (!AttributeNumberIsValid(attrno))
		elog(ERROR, "invalid attribute number %d", attrno);

	if (isNull == NULL)
		elog(ERROR, "a NULL isNull pointer was passed");

	if (tuple == NULL)
	{
		/* Kinda bogus but compatible with old behavior... */
		*isNull = true;
		return (Datum) 0;
	}

	tupType = HeapTupleHeaderGetTypeId(tuple);
	tupTypmod = HeapTupleHeaderGetTypMod(tuple);
	tupDesc = lookup_rowtype_tupdesc(tupType, tupTypmod);

	/*
	 * heap_getattr needs a HeapTuple not a bare HeapTupleHeader.  We set all
	 * the fields in the struct just in case user tries to inspect system
	 * columns.
	 */
	tmptup.t_len = HeapTupleHeaderGetDatumLength(tuple);
	ItemPointerSetInvalid(&(tmptup.t_self));
	tmptup.t_tableOid = InvalidOid;
	tmptup.t_data = tuple;

	result = heap_getattr(&tmptup,
						  attrno,
						  tupDesc,
						  isNull);

	ReleaseTupleDesc(tupDesc);

	return result;
}

/*
 * Number of items in a tlist (including any resjunk items!)
 */
int
ExecTargetListLength(List *targetlist)
{
	/* This used to be more complex, but fjoins are dead */
	return list_length(targetlist);
}

/*
 * Number of items in a tlist, not including any resjunk items
 */
int
ExecCleanTargetListLength(List *targetlist)
{
	int			len = 0;
	ListCell   *tl;

	foreach(tl, targetlist)
	{
		TargetEntry *curTle = lfirst_node(TargetEntry, tl);

		if (!curTle->resjunk)
			len++;
	}
	return len;
}

/*
 * Return a relInfo's tuple slot for a trigger's OLD tuples.
 */
TupleTableSlot *
ExecGetTriggerOldSlot(EState *estate, ResultRelInfo *relInfo)
{
	if (relInfo->ri_TrigOldSlot == NULL)
	{
		Relation	rel = relInfo->ri_RelationDesc;
		MemoryContext oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);

		relInfo->ri_TrigOldSlot =
			ExecInitExtraTupleSlot(estate,
								   RelationGetDescr(rel),
								   table_slot_callbacks(rel));

		MemoryContextSwitchTo(oldcontext);
	}

	return relInfo->ri_TrigOldSlot;
}

/*
 * Return a relInfo's tuple slot for a trigger's NEW tuples.
 */
TupleTableSlot *
ExecGetTriggerNewSlot(EState *estate, ResultRelInfo *relInfo)
{
	if (relInfo->ri_TrigNewSlot == NULL)
	{
		Relation	rel = relInfo->ri_RelationDesc;
		MemoryContext oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);

		relInfo->ri_TrigNewSlot =
			ExecInitExtraTupleSlot(estate,
								   RelationGetDescr(rel),
								   table_slot_callbacks(rel));

		MemoryContextSwitchTo(oldcontext);
	}

	return relInfo->ri_TrigNewSlot;
}

/*
 * Return a relInfo's tuple slot for processing returning tuples.
 */
TupleTableSlot *
ExecGetReturningSlot(EState *estate, ResultRelInfo *relInfo)
{
	if (relInfo->ri_ReturningSlot == NULL)
	{
		Relation	rel = relInfo->ri_RelationDesc;
		MemoryContext oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);

		relInfo->ri_ReturningSlot =
			ExecInitExtraTupleSlot(estate,
								   RelationGetDescr(rel),
								   table_slot_callbacks(rel));

		MemoryContextSwitchTo(oldcontext);
	}

	return relInfo->ri_ReturningSlot;
}

/*
 * During attribute re-mapping for heterogeneous partitions, we use
 * this struct to identify which varno's attributes will be re-mapped.
 * Using this struct as a *context* during expression tree walking, we
 * can skip varattnos that do not belong to a given varno.
 */
typedef struct AttrMapContext
{
	const AttrNumber *newattno; /* The mapping table to remap the varattno */
	Index		varno;			/* Which rte's varattno to re-map */
} AttrMapContext;

/*
 * Remaps the varattno of a varattno in a Var node using an attribute map.
 */
static bool
change_varattnos_varno_walker(Node *node, const AttrMapContext *attrMapCxt)
{
	if (node == NULL)
		return false;
	if (IsA(node, Var))
	{
		Var		   *var = (Var *) node;

		if (var->varlevelsup == 0 && (var->varno == attrMapCxt->varno) &&
			var->varattno > 0)
		{
			/*
			 * ??? the following may be a problem when the node is multiply
			 * referenced though stringToNode() doesn't create such a node
			 * currently.
			 */
			Assert(attrMapCxt->newattno[var->varattno - 1] > 0);
			var->varattno = var->varoattno = attrMapCxt->newattno[var->varattno - 1];
		}
		return false;
	}
	return expression_tree_walker(node, change_varattnos_varno_walker,
								  (void *) attrMapCxt);
}

/*
 * Replace varattno values for a given varno RTE index in an expression
 * tree according to the given map array, that is, varattno N is replaced
 * by newattno[N-1].  It is caller's responsibility to ensure that the array
 * is long enough to define values for all user varattnos present in the tree.
 * System column attnos remain unchanged.
 *
 * Note that the passed node tree is modified in-place!
 */
void
change_varattnos_of_a_varno(Node *node, const AttrNumber *newattno, Index varno)
{
	AttrMapContext attrMapCxt;

	attrMapCxt.newattno = newattno;
	attrMapCxt.varno = varno;

	(void) change_varattnos_varno_walker(node, &attrMapCxt);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn execAmi 源码

greenplumn execCurrent 源码

greenplumn execExpr 源码

greenplumn execExprInterp 源码

greenplumn execGrouping 源码

greenplumn execIndexing 源码

greenplumn execJunk 源码

greenplumn execMain 源码

greenplumn execParallel 源码

greenplumn execPartition 源码

0  赞