greenplumn explain_gp 源码

  • 2022-08-18
  • 浏览 (276)

greenplumn explain_gp 代码

文件路径:/src/backend/commands/explain_gp.c

/*-------------------------------------------------------------------------
 *
 * explain_gp.c
 *	  Functions supporting the Greenplum extensions to EXPLAIN ANALYZE
 *
 * Portions Copyright (c) 2006-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/commands/explain_gp.c
 *
 *-------------------------------------------------------------------------
 */

#include <math.h>

#include "portability/instr_time.h"

#include "libpq-fe.h"
#include "libpq-int.h"
#include "cdb/cdbconn.h"		/* SegmentDatabaseDescriptor */
#include "cdb/cdbdisp.h"                /* CheckDispatchResult() */
#include "cdb/cdbdispatchresult.h"	/* CdbDispatchResults */
#include "cdb/cdbexplain.h"		/* me */
#include "cdb/cdbpathlocus.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"		/* GpIdentity.segindex */
#include "cdb/cdbendpoint.h"
#include "cdb/memquota.h"
#include "libpq/pqformat.h"		/* pq_beginmessage() etc. */
#include "miscadmin.h"
#include "utils/resscheduler.h"
#include "utils/tuplesort.h"
#include "utils/memutils.h"		/* MemoryContextGetPeakSpace() */
#include "utils/vmem_tracker.h"

#include "cdb/cdbexplain.h"             /* cdbexplain_recvExecStats */

/* Convert bytes into kilobytes */
#define kb(x) (floor((x + 1023.0) / 1024.0))

/* EXPLAIN ANALYZE statistics for one plan node of a slice */
typedef struct CdbExplain_StatInst
{
	NodeTag		pstype;			/* PlanState node type */

	/* fields from Instrumentation struct */
	instr_time	starttime;		/* Start time of current iteration of node */
	instr_time	counter;		/* Accumulated runtime for this node */
	double		firsttuple;		/* Time for first tuple of this cycle */
	double		startup;		/* Total startup time (in seconds) */
	double		total;			/* Total total time (in seconds) */
	double		ntuples;		/* Total tuples produced */
	double		ntuples2;
	double		nloops;			/* # of run cycles for this node */
	double		nfiltered1;
	double		nfiltered2;
	double		execmemused;	/* executor memory used (bytes) */
	double		workmemused;	/* work_mem actually used (bytes) */
	double		workmemwanted;	/* work_mem to avoid workfile i/o (bytes) */
	bool		workfileCreated;	/* workfile created in this node */
	instr_time	firststart;		/* Start time of first iteration of node */
	int			numPartScanned; /* Number of part tables scanned */

	TuplesortInstrumentation sortstats; /* Sort stats, if this is a Sort node */
	HashInstrumentation hashstats; /* Hash stats, if this is a Hash node */
	int			bnotes;			/* Offset to beginning of node's extra text */
	int			enotes;			/* Offset to end of node's extra text */
} CdbExplain_StatInst;


/* EXPLAIN ANALYZE statistics for one process working on one slice */
typedef struct CdbExplain_SliceWorker
{
	double		peakmemused;	/* bytes alloc in per-query mem context tree */
	double		vmem_reserved;	/* vmem reserved by a QE */
} CdbExplain_SliceWorker;


/* Header of EXPLAIN ANALYZE statistics message sent from qExec to qDisp */
typedef struct CdbExplain_StatHdr
{
	NodeTag		type;			/* T_CdbExplain_StatHdr */
	int			segindex;		/* segment id */
	int			nInst;			/* num of StatInst entries following StatHdr */
	int			bnotes;			/* offset to extra text area */
	int			enotes;			/* offset to end of extra text area */

	CdbExplain_SliceWorker worker;	/* qExec's overall stats for slice */

	/*
	 * During serialization, we use this as a temporary StatInst and save
	 * "one-at-a-time" StatInst into this variable. We then write this
	 * variable into buffer (serialize it) and then "recycle" the same inst
	 * for next plan node's StatInst. During deserialization, an Array
	 * [0..nInst-1] of StatInst entries is appended starting here.
	 */
	CdbExplain_StatInst inst[1];

	/* extra text is appended after that */
} CdbExplain_StatHdr;


/* Dispatch status summarized over workers in a slice */
typedef struct CdbExplain_DispatchSummary
{
	int			nResult;
	int			nOk;
	int			nError;
	int			nCanceled;
	int			nNotDispatched;
	int			nIgnorableError;
} CdbExplain_DispatchSummary;


/* One node's EXPLAIN ANALYZE statistics for all the workers of its segworker group */
typedef struct CdbExplain_NodeSummary
{
	/* Summary over all the node's workers */
	CdbExplain_Agg ntuples;
	CdbExplain_Agg execmemused;
	CdbExplain_Agg workmemused;
	CdbExplain_Agg workmemwanted;
	CdbExplain_Agg totalWorkfileCreated;
	/* Used for DynamicSeqScan, DynamicIndexScan and DynamicBitmapHeapScan */
	CdbExplain_Agg totalPartTableScanned;
	/* Summary of space used by sort */
	CdbExplain_Agg sortSpaceUsed[NUM_SORT_SPACE_TYPE][NUM_SORT_METHOD];

	/* insts array info */
	int			segindex0;		/* segment id of insts[0] */
	int			ninst;			/* num of StatInst entries in inst array */

	/* Array [0..ninst-1] of StatInst entries is appended starting here */
	CdbExplain_StatInst insts[1];	/* variable size - must be last */
} CdbExplain_NodeSummary;


/* One slice's statistics for all the workers of its segworker group */
typedef struct CdbExplain_SliceSummary
{
	ExecSlice  *slice;

	/* worker array */
	int			nworker;		/* num of SliceWorker slots in worker array */
	int			segindex0;		/* segment id of workers[0] */
	CdbExplain_SliceWorker *workers;	/* -> array [0..nworker-1] of
										 * SliceWorker */
	CdbExplain_Agg peakmemused; /* Summary of SliceWorker stats over all of
								 * the slice's workers */

	CdbExplain_Agg vmem_reserved;	/* vmem reserved by QEs */

	/* Rollup of per-node stats over all of the slice's workers and nodes */
	double		workmemused_max;
	double		workmemwanted_max;

	/* How many workers were dispatched and returned results? (0 if local) */
	CdbExplain_DispatchSummary dispatchSummary;
} CdbExplain_SliceSummary;


/* State for cdbexplain_showExecStats() */
typedef struct CdbExplain_ShowStatCtx
{
	StringInfoData extratextbuf;
	instr_time	querystarttime;

	/* Rollup of per-node stats over the entire query plan */
	double		workmemused_max;
	double		workmemwanted_max;

	bool		stats_gathered;
	/* Per-slice statistics are deposited in this SliceSummary array */
	int			nslice;			/* num of slots in slices array */
	CdbExplain_SliceSummary *slices;	/* -> array[0..nslice-1] of
										 * SliceSummary */
} CdbExplain_ShowStatCtx;


/* State for cdbexplain_sendStatWalker() and cdbexplain_collectStatsFromNode() */
typedef struct CdbExplain_SendStatCtx
{
	StringInfoData *notebuf;
	StringInfoData buf;
	CdbExplain_StatHdr hdr;
} CdbExplain_SendStatCtx;


/* State for cdbexplain_recvStatWalker() and cdbexplain_depositStatsToNode() */
typedef struct CdbExplain_RecvStatCtx
{
	/*
	 * iStatInst is the current StatInst serial during the depositing process
	 * for a slice. We walk the plan tree, and for each node we deposit stat
	 * from all the QEs of the segworker group for current slice. After we
	 * finish one node, we increase iStatInst, which means we are done with
	 * one plan node's stat across all segments and now moving forward to the
	 * next one. Once we are done processing all the plan node of a PARTICULAR
	 * slice, then we switch to the next slice, read the messages from all the
	 * QEs of the next slice (another segworker group) store them in the
	 * msgptrs, reset the iStatInst and then start parsing these messages and
	 * depositing them in the nodes of the new slice.
	 */
	int			iStatInst;

	/*
	 * nStatInst is the total number of StatInst for current slice. Typically
	 * this is the number of plan nodes in the current slice.
	 */
	int			nStatInst;

	/*
	 * segIndexMin is the min of segment index from which we collected message
	 * (i.e., saved msgptrs)
	 */
	int			segindexMin;

	/*
	 * segIndexMax is the max of segment index from which we collected message
	 * (i.e., saved msgptrs)
	 */
	int			segindexMax;

	/*
	 * We deposit stat for one slice at a time. sliceIndex saves the current
	 * slice
	 */
	int			sliceIndex;

	/*
	 * The number of msgptrs that we have saved for current slice. This is
	 * typically the number of QE processes
	 */
	int			nmsgptr;
	/* The actual messages. Contains an array of StatInst too */
	CdbExplain_StatHdr **msgptrs;
	CdbDispatchResults *dispatchResults;
	StringInfoData *extratextbuf;
	CdbExplain_ShowStatCtx *showstatctx;

	/* Rollup of per-node stats over all of the slice's workers and nodes */
	double		workmemused_max;
	double		workmemwanted_max;
} CdbExplain_RecvStatCtx;


/* State for cdbexplain_localStatWalker() */
typedef struct CdbExplain_LocalStatCtx
{
	CdbExplain_SendStatCtx send;
	CdbExplain_RecvStatCtx recv;
	CdbExplain_StatHdr *msgptrs[1];
} CdbExplain_LocalStatCtx;


static void
cdbexplain_showExecStatsEnd(struct PlannedStmt *stmt,
							struct CdbExplain_ShowStatCtx  *showstatctx,
                            struct EState                  *estate,
							ExplainState *es);

static void cdbexplain_showExecStats(struct PlanState *planstate,
									 ExplainState *es);
static CdbVisitOpt cdbexplain_localStatWalker(PlanState *planstate,
											  void *context);
static CdbVisitOpt cdbexplain_sendStatWalker(PlanState *planstate,
											 void *context);
static CdbVisitOpt cdbexplain_recvStatWalker(PlanState *planstate,
											 void *context);
static void cdbexplain_collectSliceStats(PlanState *planstate,
										 CdbExplain_SliceWorker *out_worker);
static void cdbexplain_depositSliceStats(CdbExplain_StatHdr *hdr,
										 CdbExplain_RecvStatCtx *recvstatctx);
static void cdbexplain_collectStatsFromNode(PlanState *planstate,
											CdbExplain_SendStatCtx *ctx);
static void cdbexplain_depositStatsToNode(PlanState *planstate,
										  CdbExplain_RecvStatCtx *ctx);
static int cdbexplain_collectExtraText(PlanState *planstate,
									   StringInfo notebuf);

static void show_motion_keys(PlanState *planstate, List *hashExpr, int nkeys,
							 AttrNumber *keycols, const char *qlabel,
							 List *ancestors, ExplainState *es);
static void
gpexplain_formatSlicesOutput(struct CdbExplain_ShowStatCtx *showstatctx,
                             struct EState *estate,
                             ExplainState *es);

/*
 * cdbexplain_localExecStats
 *	  Called by qDisp to build NodeSummary and SliceSummary blocks
 *	  containing EXPLAIN ANALYZE statistics for a root slice that
 *	  has been executed locally in the qDisp process.  Attaches these
 *	  structures to the PlanState nodes' Instrumentation objects for
 *	  later use by cdbexplain_showExecStats().
 *
 * 'planstate' is the top PlanState node of the slice.
 * 'showstatctx' is a CdbExplain_ShowStatCtx object which was created by
 *		calling cdbexplain_showExecStatsBegin().
 */
void
cdbexplain_localExecStats(struct PlanState *planstate,
						  struct CdbExplain_ShowStatCtx *showstatctx)
{
	CdbExplain_LocalStatCtx ctx;

	Assert(Gp_role != GP_ROLE_EXECUTE);

	Assert(planstate && planstate->instrument && showstatctx);

	memset(&ctx, 0, sizeof(ctx));

	/* Set up send context area. */
	ctx.send.notebuf = &showstatctx->extratextbuf;

	/* Set up a temporary StatHdr for both collecting and depositing stats. */
	ctx.msgptrs[0] = &ctx.send.hdr;
	ctx.send.hdr.segindex = GpIdentity.segindex;
	ctx.send.hdr.nInst = 1;

	/* Set up receive context area referencing our temp StatHdr. */
	ctx.recv.nStatInst = ctx.send.hdr.nInst;
	ctx.recv.segindexMin = ctx.recv.segindexMax = ctx.send.hdr.segindex;

	ctx.recv.sliceIndex = LocallyExecutingSliceIndex(planstate->state);
	ctx.recv.msgptrs = ctx.msgptrs;
	ctx.recv.nmsgptr = 1;
	ctx.recv.dispatchResults = NULL;
	ctx.recv.extratextbuf = NULL;
	ctx.recv.showstatctx = showstatctx;

	/*
	 * Collect and redeposit statistics from each PlanState node in this
	 * slice. Any extra message text will be appended directly to
	 * extratextbuf.
	 */
	planstate_walk_node(planstate, cdbexplain_localStatWalker, &ctx);

	/* Obtain per-slice stats and put them in SliceSummary. */
	cdbexplain_collectSliceStats(planstate, &ctx.send.hdr.worker);
	cdbexplain_depositSliceStats(&ctx.send.hdr, &ctx.recv);
}								/* cdbexplain_localExecStats */


/*
 * cdbexplain_localStatWalker
 */
static CdbVisitOpt
cdbexplain_localStatWalker(PlanState *planstate, void *context)
{
	CdbExplain_LocalStatCtx *ctx = (CdbExplain_LocalStatCtx *) context;

	/* Collect stats into our temporary StatInst and caller's extratextbuf. */
	cdbexplain_collectStatsFromNode(planstate, &ctx->send);

	/* Redeposit stats back into Instrumentation, and attach a NodeSummary. */
	cdbexplain_depositStatsToNode(planstate, &ctx->recv);

	/* Don't descend across a slice boundary. */
	if (IsA(planstate, MotionState))
		return CdbVisit_Skip;

	return CdbVisit_Walk;
}								/* cdbexplain_localStatWalker */


/*
 * cdbexplain_sendExecStats
 *	  Called by qExec process to send EXPLAIN ANALYZE statistics to qDisp.
 *	  On the qDisp, libpq will recognize our special message type ('Y') and
 *	  attach the message to the current command's PGresult object.
 */
void
cdbexplain_sendExecStats(QueryDesc *queryDesc)
{
	EState	   *estate;
	PlanState  *planstate;
	CdbExplain_SendStatCtx ctx;
	StringInfoData notebuf;

	/* Header offset (where header begins in the message buffer) */
	int			hoff;

	Assert(Gp_role == GP_ROLE_EXECUTE);

	if (!queryDesc ||
		!queryDesc->estate)
		return;

	/* If executing a root slice (UPD/DEL/INS), start at top of plan tree. */
	estate = queryDesc->estate;
	if (LocallyExecutingSliceIndex(estate) == RootSliceIndex(estate))
		planstate = queryDesc->planstate;

	/* Non-root slice: Start at child of our sending Motion node. */
	else
	{
		planstate = &(getMotionState(queryDesc->planstate, LocallyExecutingSliceIndex(estate))->ps);
		Assert(planstate &&
			   IsA(planstate, MotionState) &&
			   planstate->lefttree);
		planstate = planstate->lefttree;
	}

	if (planstate == NULL)
		return;

	/* Start building the message header in our context area. */
	memset(&ctx, 0, sizeof(ctx));
	ctx.hdr.type = T_CdbExplain_StatHdr;
	ctx.hdr.segindex = GpIdentity.segindex;
	ctx.hdr.nInst = 0;

	/* Allocate a separate buffer where nodes can append extra message text. */
	initStringInfo(&notebuf);
	ctx.notebuf = &notebuf;

	/* Reserve buffer space for the message header (excluding 'inst' array). */
	pq_beginmessage(&ctx.buf, 'Y');

	/* Where the actual StatHdr begins */
	hoff = ctx.buf.len;

	/*
	 * Write everything until inst member including "CdbExplain_SliceWorker
	 * worker"
	 */
	appendBinaryStringInfo(&ctx.buf, (char *) &ctx.hdr, sizeof(ctx.hdr) - sizeof(ctx.hdr.inst));

	/* Append statistics from each PlanState node in this slice. */
	planstate_walk_node(planstate, cdbexplain_sendStatWalker, &ctx);

	/* Obtain per-slice stats and put them in StatHdr. */
	cdbexplain_collectSliceStats(planstate, &ctx.hdr.worker);

	/* Append the extra message text. */
	ctx.hdr.bnotes = ctx.buf.len - hoff;
	appendBinaryStringInfo(&ctx.buf, notebuf.data, notebuf.len);
	ctx.hdr.enotes = ctx.buf.len - hoff;
	pfree(notebuf.data);

	/*
	 * Move the message header into the buffer. Rewrite the updated header
	 * (with bnotes, enotes, nInst etc.) Note: this is the second time we are
	 * writing the header. The first write merely reserves space for the
	 * header
	 */
	memcpy(ctx.buf.data + hoff, (char *) &ctx.hdr, sizeof(ctx.hdr) - sizeof(ctx.hdr.inst));

#ifdef FAULT_INJECTOR
	/* Inject a fault before sending a message to qDisp process */
	SIMPLE_FAULT_INJECTOR("send_exec_stats");
#endif /* FAULT_INJECTOR */

	/* Send message to qDisp process. */
	pq_endmessage(&ctx.buf);
}								/* cdbexplain_sendExecStats */


/*
 * cdbexplain_sendStatWalker
 */
static CdbVisitOpt
cdbexplain_sendStatWalker(PlanState *planstate, void *context)
{
	CdbExplain_SendStatCtx *ctx = (CdbExplain_SendStatCtx *) context;
	CdbExplain_StatInst *si = &ctx->hdr.inst[0];

	/* Stuff stats into our temporary StatInst.  Add extra text to notebuf. */
	cdbexplain_collectStatsFromNode(planstate, ctx);

	/* Append StatInst instance to message. */
	appendBinaryStringInfo(&ctx->buf, (char *) si, sizeof(*si));
	ctx->hdr.nInst++;

	/* Don't descend across a slice boundary. */
	if (IsA(planstate, MotionState))
		return CdbVisit_Skip;

	return CdbVisit_Walk;
}								/* cdbexplain_sendStatWalker */


/*
 * cdbexplain_recvExecStats
 *	  Called by qDisp to transfer a slice's EXPLAIN ANALYZE statistics
 *	  from the CdbDispatchResults structures to the PlanState tree.
 *	  Recursively does the same for slices that are descendants of the
 *	  one specified.
 *
 * 'showstatctx' is a CdbExplain_ShowStatCtx object which was created by
 *		calling cdbexplain_showExecStatsBegin().
 */
void
cdbexplain_recvExecStats(struct PlanState *planstate,
						 struct CdbDispatchResults *dispatchResults,
						 int sliceIndex,
						 struct CdbExplain_ShowStatCtx *showstatctx)
{
	CdbDispatchResult *dispatchResultBeg;
	CdbDispatchResult *dispatchResultEnd;
	CdbExplain_RecvStatCtx ctx;
	CdbExplain_DispatchSummary ds;
	int			gpsegmentCount = getgpsegmentCount();
	int			iDispatch;
	int			nDispatch;
	int			imsgptr;

	if (!planstate ||
		!planstate->instrument ||
		!showstatctx)
		return;

	/*
	 * Note that the caller may free the CdbDispatchResults upon return, maybe
	 * before EXPLAIN ANALYZE examines the PlanState tree.  Consequently we
	 * must not return ptrs into the dispatch result buffers, but must copy
	 * any needed information into a sufficiently long-lived memory context.
	 */

	/* Initialize treewalk context. */
	memset(&ctx, 0, sizeof(ctx));
	ctx.dispatchResults = dispatchResults;
	ctx.extratextbuf = &showstatctx->extratextbuf;
	ctx.showstatctx = showstatctx;
	ctx.sliceIndex = sliceIndex;

	/* Find the slice's CdbDispatchResult objects. */
	dispatchResultBeg = cdbdisp_resultBegin(dispatchResults, sliceIndex);
	dispatchResultEnd = cdbdisp_resultEnd(dispatchResults, sliceIndex);
	nDispatch = dispatchResultEnd - dispatchResultBeg;

	/* Initialize worker counts. */
	memset(&ds, 0, sizeof(ds));
	ds.nResult = nDispatch;

	/* Find and validate the statistics returned from each qExec. */
	if (nDispatch > 0)
		ctx.msgptrs = (CdbExplain_StatHdr **) palloc0(nDispatch * sizeof(ctx.msgptrs[0]));
	for (iDispatch = 0; iDispatch < nDispatch; iDispatch++)
	{
		CdbDispatchResult *dispatchResult = &dispatchResultBeg[iDispatch];
		PGresult   *pgresult;
		CdbExplain_StatHdr *hdr;
		pgCdbStatCell *statcell;

		/* Update worker counts. */
		if (!dispatchResult->hasDispatched)
			ds.nNotDispatched++;
		else if (dispatchResult->wasCanceled)
			ds.nCanceled++;
		else if (dispatchResult->errcode)
			ds.nError++;
		else if (dispatchResult->okindex >= 0)
			ds.nOk++;			/* qExec returned successful completion */
		else
			ds.nIgnorableError++;	/* qExec returned an error that's likely a
									 * side-effect of another qExec's failure,
									 * e.g. an interconnect error */

		/* Find this qExec's last PGresult.  If none, skip to next qExec. */
		pgresult = cdbdisp_getPGresult(dispatchResult, -1);
		if (!pgresult)
			continue;

		/* Find our statistics in list of response messages.  If none, skip. */
		for (statcell = pgresult->cdbstats; statcell; statcell = statcell->next)
		{
			if (IsA((Node *) statcell->data, CdbExplain_StatHdr))
				break;
		}
		if (!statcell)
			continue;

		/* Validate the message header. */
		hdr = (CdbExplain_StatHdr *) statcell->data;
		if ((size_t) statcell->len < sizeof(*hdr) ||
			(size_t) statcell->len != (sizeof(*hdr) - sizeof(hdr->inst) +
									   hdr->nInst * sizeof(hdr->inst) +
									   hdr->enotes - hdr->bnotes) ||
			statcell->len != hdr->enotes ||
			hdr->segindex < -1 ||
			hdr->segindex >= gpsegmentCount)
		{
			ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
							errmsg_internal("Invalid execution statistics "
											"response returned from seg%d.  "
											"length=%d",
											hdr->segindex,
											statcell->len),
							errhint("Please verify that all instances are using "
									"the correct %s software version.",
									PACKAGE_NAME)
							));
		}

		/* Slice should have same number of plan nodes on every qExec. */
		if (iDispatch == 0)
			ctx.nStatInst = hdr->nInst;
		else
		{
			/* Check for stats corruption */
			if (ctx.nStatInst != hdr->nInst)
				ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
								errmsg("Invalid execution statistics "
									   "received stats node-count mismatch: cdbexplain_recvExecStats() ctx.nStatInst %d hdr->nInst %d", ctx.nStatInst, hdr->nInst),
								errhint("Please verify that all instances are using "
										"the correct %s software version.",
										PACKAGE_NAME)));

			Assert(ctx.nStatInst == hdr->nInst);
		}

		/* Save lowest and highest segment id for which we have stats. */
		if (iDispatch == 0)
			ctx.segindexMin = ctx.segindexMax = hdr->segindex;
		else if (ctx.segindexMax < hdr->segindex)
			ctx.segindexMax = hdr->segindex;
		else if (ctx.segindexMin > hdr->segindex)
			ctx.segindexMin = hdr->segindex;

		/* Save message ptr for easy reference. */
		ctx.msgptrs[ctx.nmsgptr] = hdr;
		ctx.nmsgptr++;
	}

	/* Attach NodeSummary to each PlanState node's Instrumentation node. */
	planstate_walk_node(planstate, cdbexplain_recvStatWalker, &ctx);

	/* Make sure we visited the right number of PlanState nodes. */
	Assert(ctx.iStatInst == ctx.nStatInst);

	/* Transfer per-slice stats from message headers to the SliceSummary. */
	for (imsgptr = 0; imsgptr < ctx.nmsgptr; imsgptr++)
		cdbexplain_depositSliceStats(ctx.msgptrs[imsgptr], &ctx);

	/* Transfer worker counts to SliceSummary. */
	showstatctx->slices[sliceIndex].dispatchSummary = ds;

	/* Signal that we've gathered all the statistics
	 * For some query, which has initplan on top of the plan,
	 * its `ANALYZE EXPLAIN` invoke `cdbexplain_recvExecStats`
	 * multi-times in different recursive routine to collect
	 * metrics on both initplan and plan. Thus, this variable
	 * should only assign on slice 0 after gather result done
	 * to promise all slices information have been collected.
	 */
	if (sliceIndex == 0)
		showstatctx->stats_gathered = true;

	/* Clean up. */
	if (ctx.msgptrs)
		pfree(ctx.msgptrs);
}								/* cdbexplain_recvExecStats */


/*
 * cdbexplain_recvStatWalker
 *	  Update the given PlanState node's Instrument node with statistics
 *	  received from qExecs.  Attach a CdbExplain_NodeSummary block to
 *	  the Instrument node.  At a MotionState node, descend to child slice.
 */
static CdbVisitOpt
cdbexplain_recvStatWalker(PlanState *planstate, void *context)
{
	CdbExplain_RecvStatCtx *ctx = (CdbExplain_RecvStatCtx *) context;

	/* If slice was dispatched to qExecs, and stats came back, grab 'em. */
	if (ctx->nmsgptr > 0)
	{
		/* Transfer received stats to Instrumentation, NodeSummary, etc. */
		cdbexplain_depositStatsToNode(planstate, ctx);

		/* Advance to next node's entry in all of the StatInst arrays. */
		ctx->iStatInst++;
	}

	/* Motion operator?  Descend to next slice. */
	if (IsA(planstate, MotionState))
	{
		cdbexplain_recvExecStats(planstate->lefttree,
								 ctx->dispatchResults,
								 ((Motion *) planstate->plan)->motionID,
								 ctx->showstatctx);
		return CdbVisit_Skip;
	}

	return CdbVisit_Walk;
}								/* cdbexplain_recvStatWalker */


/*
 * cdbexplain_collectSliceStats
 *	  Obtain per-slice statistical observations from the current slice
 *	  (which has just completed execution in the current process) and
 *	  store the information in the given SliceWorker struct.
 *
 * 'planstate' is the top PlanState node of the current slice.
 */
static void
cdbexplain_collectSliceStats(PlanState *planstate,
							 CdbExplain_SliceWorker *out_worker)
{
	EState	   *estate = planstate->state;

	/* Max bytes malloc'ed under executor's per-query memory context. */
	out_worker->peakmemused =
		(double) MemoryContextGetPeakSpace(estate->es_query_cxt);

	out_worker->vmem_reserved = (double) VmemTracker_GetMaxReservedVmemBytes();
}								/* cdbexplain_collectSliceStats */


/*
 * cdbexplain_depositSliceStats
 *	  Transfer a worker's per-slice stats contribution from StatHdr into the
 *	  SliceSummary array in the ShowStatCtx.  Transfer the rollup of per-node
 *	  stats from the RecvStatCtx into the SliceSummary.
 *
 * Kludge: In a non-parallel plan, slice numbers haven't been assigned, so we
 * may be called more than once with sliceIndex == 0: once for the outermost
 * query and once for each InitPlan subquery.  In this case we dynamically
 * expand the SliceSummary array.  CDB TODO: Always assign proper root slice
 * ids (in qDispSliceId field of SubPlan node); then remove this kludge.
 */
static void
cdbexplain_depositSliceStats(CdbExplain_StatHdr *hdr,
							 CdbExplain_RecvStatCtx *recvstatctx)
{
	int			sliceIndex = recvstatctx->sliceIndex;
	CdbExplain_ShowStatCtx *showstatctx = recvstatctx->showstatctx;
	CdbExplain_SliceSummary *ss = &showstatctx->slices[sliceIndex];
	CdbExplain_SliceWorker *ssw;
	int			iworker;

	Assert(sliceIndex >= 0 &&
		   sliceIndex < showstatctx->nslice);

	/* Kludge:	QD can have more than one 'Slice 0' if plan is non-parallel. */
	if (sliceIndex == 0 &&
		recvstatctx->dispatchResults == NULL &&
		ss->workers)
	{
		Assert(ss->nworker == 1 &&
			   recvstatctx->segindexMin == hdr->segindex &&
			   recvstatctx->segindexMax == hdr->segindex);

		/* Expand the SliceSummary array to make room for InitPlan subquery. */
		sliceIndex = showstatctx->nslice++;
		showstatctx->slices = (CdbExplain_SliceSummary *)
			repalloc(showstatctx->slices, showstatctx->nslice * sizeof(showstatctx->slices[0]));
		ss = &showstatctx->slices[sliceIndex];
		memset(ss, 0, sizeof(*ss));
	}

	/* Slice's first worker? */
	if (!ss->workers)
	{
		/* Allocate SliceWorker array and attach it to the SliceSummary. */
		ss->segindex0 = recvstatctx->segindexMin;
		ss->nworker = recvstatctx->segindexMax + 1 - ss->segindex0;
		ss->workers = (CdbExplain_SliceWorker *) palloc0(ss->nworker * sizeof(ss->workers[0]));
	}

	/* Save a copy of this SliceWorker instance in the worker array. */
	iworker = hdr->segindex - ss->segindex0;
	ssw = &ss->workers[iworker];
	Assert(iworker >= 0 && iworker < ss->nworker);
	Assert(ssw->peakmemused == 0);	/* each worker should be seen just once */
	*ssw = hdr->worker;

	/* Rollup of per-worker stats into SliceSummary */
	cdbexplain_agg_upd(&ss->peakmemused, hdr->worker.peakmemused, hdr->segindex);
	cdbexplain_agg_upd(&ss->vmem_reserved, hdr->worker.vmem_reserved, hdr->segindex);

	/* Rollup of per-node stats over all nodes of the slice into SliceSummary */
	ss->workmemused_max = recvstatctx->workmemused_max;
	ss->workmemwanted_max = recvstatctx->workmemwanted_max;

	/* Rollup of per-node stats over the whole query into ShowStatCtx. */
	showstatctx->workmemused_max = Max(showstatctx->workmemused_max, recvstatctx->workmemused_max);
	showstatctx->workmemwanted_max = Max(showstatctx->workmemwanted_max, recvstatctx->workmemwanted_max);
}								/* cdbexplain_depositSliceStats */


/*
 * cdbexplain_collectStatsFromNode
 *
 * Called by sendStatWalker and localStatWalker to obtain a node's statistics
 * and transfer them into the temporary StatHdr and StatInst in the SendStatCtx.
 * Also obtains the node's extra message text, which it appends to the caller's
 * cxt->nodebuf.
 */
static void
cdbexplain_collectStatsFromNode(PlanState *planstate, CdbExplain_SendStatCtx *ctx)
{
	CdbExplain_StatInst *si = &ctx->hdr.inst[0];
	Instrumentation *instr = planstate->instrument;

	Assert(instr);

	/* We have to finalize statistics, since ExecutorEnd hasn't been called. */
	InstrEndLoop(instr);

	/* Initialize the StatInst slot in the temporary StatHdr. */
	memset(si, 0, sizeof(*si));
	si->pstype = planstate->type;

	/* Add this node's extra message text to notebuf.  Store final stats. */
	si->bnotes = cdbexplain_collectExtraText(planstate, ctx->notebuf);
	si->enotes = ctx->notebuf->len;

	/* Make sure there is a '\0' between this node's message and the next. */
	if (si->bnotes < si->enotes)
		appendStringInfoChar(ctx->notebuf, '\0');

	/* Use the instrument's memory record if exists, or query the memory context. */
	if (instr->execmemused)
	{
		si->execmemused = instr->execmemused;
	}
	else if (planstate->node_context)
	{
		si->execmemused = (double) MemoryContextGetPeakSpace(planstate->node_context);
	}

	/* Transfer this node's statistics from Instrumentation into StatInst. */
	si->starttime = instr->starttime;
	si->counter = instr->counter;
	si->firsttuple = instr->firsttuple;
	si->startup = instr->startup;
	si->total = instr->total;
	si->ntuples = instr->ntuples;
	si->ntuples2 = instr->ntuples2;
	si->nloops = instr->nloops;
	si->nfiltered1 = instr->nfiltered1;
	si->nfiltered2 = instr->nfiltered2;
	si->workmemused = instr->workmemused;
	si->workmemwanted = instr->workmemwanted;
	si->workfileCreated = instr->workfileCreated;
	si->firststart = instr->firststart;
	si->numPartScanned = instr->numPartScanned;

	if (IsA(planstate, SortState))
	{
		SortState *sortstate = (SortState *) planstate;

		si->sortstats = sortstate->sortstats;
	}
	if (IsA(planstate, HashState))
	{
		HashState *hashstate = (HashState *) planstate;

		if (hashstate->hashtable)
			ExecHashGetInstrumentation(&si->hashstats, hashstate->hashtable);
	}
}								/* cdbexplain_collectStatsFromNode */


/*
 * CdbExplain_DepStatAcc
 *	  Segment statistic accumulator used by cdbexplain_depositStatsToNode().
 */
typedef struct CdbExplain_DepStatAcc
{
	/* vmax, vsum, vcnt, segmax */
	CdbExplain_Agg agg;
	/* max's received StatHdr */
	CdbExplain_StatHdr *rshmax;
	/* max's received inst in StatHdr */
	CdbExplain_StatInst *rsimax;
	/* max's inst in NodeSummary */
	CdbExplain_StatInst *nsimax;
	/* max run-time of all the segments */
	double		max_total;
	/* start time of the first iteration for node with maximum runtime */
	instr_time	firststart_of_max_total;
} CdbExplain_DepStatAcc;

static void
cdbexplain_depStatAcc_init0(CdbExplain_DepStatAcc *acc)
{
	cdbexplain_agg_init0(&acc->agg);
	acc->rshmax = NULL;
	acc->rsimax = NULL;
	acc->nsimax = NULL;
	acc->max_total = 0;
	INSTR_TIME_SET_ZERO(acc->firststart_of_max_total);
}								/* cdbexplain_depStatAcc_init0 */

static inline void
cdbexplain_depStatAcc_upd(CdbExplain_DepStatAcc *acc,
						  double v,
						  CdbExplain_StatHdr *rsh,
						  CdbExplain_StatInst *rsi,
						  CdbExplain_StatInst *nsi)
{
	if (cdbexplain_agg_upd(&acc->agg, v, rsh->segindex))
	{
		acc->rshmax = rsh;
		acc->rsimax = rsi;
		acc->nsimax = nsi;
	}

	if (acc->max_total < nsi->total)
	{
		acc->max_total = nsi->total;
		INSTR_TIME_ASSIGN(acc->firststart_of_max_total, nsi->firststart);
	}
}								/* cdbexplain_depStatAcc_upd */

static void
cdbexplain_depStatAcc_saveText(CdbExplain_DepStatAcc *acc,
							   StringInfoData *extratextbuf,
							   bool *saved_inout)
{
	CdbExplain_StatHdr *rsh = acc->rshmax;
	CdbExplain_StatInst *rsi = acc->rsimax;
	CdbExplain_StatInst *nsi = acc->nsimax;

	if (acc->agg.vcnt > 0 &&
		nsi->bnotes == nsi->enotes &&
		rsi->bnotes < rsi->enotes)
	{
		/* Locate extra message text in dispatch result buffer. */
		int			notelen = rsi->enotes - rsi->bnotes;
		const char *notes = (const char *) rsh + rsh->bnotes + rsi->bnotes;

		Assert(rsh->bnotes + rsi->enotes < rsh->enotes &&
			   notes[notelen] == '\0');

		/* Append to extratextbuf. */
		nsi->bnotes = extratextbuf->len;
		appendBinaryStringInfo(extratextbuf, notes, notelen);
		nsi->enotes = extratextbuf->len;

		/* Tell caller that some extra text has been saved. */
		if (saved_inout)
			*saved_inout = true;
	}
}								/* cdbexplain_depStatAcc_saveText */


/*
 * cdbexplain_depositStatsToNode
 *
 * Called by recvStatWalker and localStatWalker to update the given
 * PlanState node's Instrument node with statistics received from
 * workers or collected locally.  Attaches a CdbExplain_NodeSummary
 * block to the Instrument node.  If top node of slice, per-slice
 * statistics are transferred from the StatHdr to the SliceSummary.
 */
static void
cdbexplain_depositStatsToNode(PlanState *planstate, CdbExplain_RecvStatCtx *ctx)
{
	Instrumentation *instr = planstate->instrument;
	CdbExplain_StatHdr *rsh;	/* The header (which includes StatInst) */
	CdbExplain_StatInst *rsi;	/* The current StatInst */

	/*
	 * Points to the insts array of node summary (CdbExplain_NodeSummary).
	 * Used for saving every rsi in the node summary (in addition to saving
	 * the max/avg).
	 */
	CdbExplain_StatInst *nsi;

	/*
	 * ns is the node summary across all QEs of the segworker group. It also
	 * contains detailed "unsummarized" raw stat for a node across all QEs in
	 * current segworker group (in the insts array)
	 */
	CdbExplain_NodeSummary *ns;
	CdbExplain_DepStatAcc ntuples;
	CdbExplain_DepStatAcc execmemused;
	CdbExplain_DepStatAcc workmemused;
	CdbExplain_DepStatAcc workmemwanted;
	CdbExplain_DepStatAcc totalWorkfileCreated;
	CdbExplain_DepStatAcc peakmemused;
	CdbExplain_DepStatAcc vmem_reserved;
	CdbExplain_DepStatAcc totalPartTableScanned;
	CdbExplain_DepStatAcc sortSpaceUsed[NUM_SORT_SPACE_TYPE][NUM_SORT_METHOD];
	int			imsgptr;
	int			nInst;

	Assert(instr &&
		   ctx->iStatInst < ctx->nStatInst);

	/* Allocate NodeSummary block. */
	nInst = ctx->segindexMax + 1 - ctx->segindexMin;
	ns = (CdbExplain_NodeSummary *) palloc0(sizeof(*ns) - sizeof(ns->insts) +
											nInst * sizeof(ns->insts[0]));
	ns->segindex0 = ctx->segindexMin;
	ns->ninst = nInst;

	/* Attach our new NodeSummary to the Instrumentation node. */
	instr->cdbNodeSummary = ns;

	/* Initialize per-node accumulators. */
	cdbexplain_depStatAcc_init0(&ntuples);
	cdbexplain_depStatAcc_init0(&execmemused);
	cdbexplain_depStatAcc_init0(&workmemused);
	cdbexplain_depStatAcc_init0(&workmemwanted);
	cdbexplain_depStatAcc_init0(&totalWorkfileCreated);
	cdbexplain_depStatAcc_init0(&totalPartTableScanned);
	for (int i = 0; i < NUM_SORT_METHOD; i++)
		for (int j = 0; j < NUM_SORT_SPACE_TYPE; j++)
			cdbexplain_depStatAcc_init0(&sortSpaceUsed[j][i]);

	/* Initialize per-slice accumulators. */
	cdbexplain_depStatAcc_init0(&peakmemused);
	cdbexplain_depStatAcc_init0(&vmem_reserved);

	/* Examine the statistics from each qExec. */
	for (imsgptr = 0; imsgptr < ctx->nmsgptr; imsgptr++)
	{
		/* Locate PlanState node's StatInst received from this qExec. */
		rsh = ctx->msgptrs[imsgptr];
		rsi = &rsh->inst[ctx->iStatInst];

		Assert(rsi->pstype == planstate->type &&
			   ns->segindex0 <= rsh->segindex &&
			   rsh->segindex < ns->segindex0 + ns->ninst);

		/* Locate this qExec's StatInst slot in node's NodeSummary block. */
		nsi = &ns->insts[rsh->segindex - ns->segindex0];

		/* Copy the StatInst to NodeSummary from dispatch result buffer. */
		*nsi = *rsi;

		/*
		 * Drop qExec's extra text.  We rescue it below if qExec is a winner.
		 * For local qDisp slice, ctx->extratextbuf is NULL, which tells us to
		 * leave the extra text undisturbed in its existing buffer.
		 */
		if (ctx->extratextbuf)
			nsi->bnotes = nsi->enotes = 0;

		/* Update per-node accumulators. */
		cdbexplain_depStatAcc_upd(&ntuples, rsi->ntuples, rsh, rsi, nsi);
		cdbexplain_depStatAcc_upd(&execmemused, rsi->execmemused, rsh, rsi, nsi);
		cdbexplain_depStatAcc_upd(&workmemused, rsi->workmemused, rsh, rsi, nsi);
		cdbexplain_depStatAcc_upd(&workmemwanted, rsi->workmemwanted, rsh, rsi, nsi);
		cdbexplain_depStatAcc_upd(&totalWorkfileCreated, (rsi->workfileCreated ? 1 : 0), rsh, rsi, nsi);
		cdbexplain_depStatAcc_upd(&totalPartTableScanned, rsi->numPartScanned, rsh, rsi, nsi);
		Assert(rsi->sortstats.sortMethod < NUM_SORT_METHOD);
		Assert(rsi->sortstats.spaceType < NUM_SORT_SPACE_TYPE);
		if (rsi->sortstats.sortMethod != SORT_TYPE_STILL_IN_PROGRESS)
		{
			cdbexplain_depStatAcc_upd(&sortSpaceUsed[rsi->sortstats.spaceType][rsi->sortstats.sortMethod],
									  (double) rsi->sortstats.spaceUsed, rsh, rsi, nsi);
		}

		/* Update per-slice accumulators. */
		cdbexplain_depStatAcc_upd(&peakmemused, rsh->worker.peakmemused, rsh, rsi, nsi);
		cdbexplain_depStatAcc_upd(&vmem_reserved, rsh->worker.vmem_reserved, rsh, rsi, nsi);
	}

	/* Save per-node accumulated stats in NodeSummary. */
	ns->ntuples = ntuples.agg;
	ns->execmemused = execmemused.agg;
	ns->workmemused = workmemused.agg;
	ns->workmemwanted = workmemwanted.agg;
	ns->totalWorkfileCreated = totalWorkfileCreated.agg;
	ns->totalPartTableScanned = totalPartTableScanned.agg;
	for (int i = 0; i < NUM_SORT_METHOD; i++)
		for (int j = 0; j < NUM_SORT_SPACE_TYPE; j++)
			ns->sortSpaceUsed[j][i] = sortSpaceUsed[j][i].agg;

	/* Roll up summary over all nodes of slice into RecvStatCtx. */
	ctx->workmemused_max = Max(ctx->workmemused_max, workmemused.agg.vmax);
	ctx->workmemwanted_max = Max(ctx->workmemwanted_max, workmemwanted.agg.vmax);

	instr->total = ntuples.max_total;
	INSTR_TIME_ASSIGN(instr->firststart, ntuples.firststart_of_max_total);

	/*
	 * Put winner's stats into qDisp PlanState's Instrument node.
	 */
	if (ntuples.agg.vcnt > 0)
	{
		instr->starttime = ntuples.nsimax->starttime;
		instr->counter = ntuples.nsimax->counter;
		instr->firsttuple = ntuples.nsimax->firsttuple;
		instr->startup = ntuples.nsimax->startup;
		instr->total = ntuples.nsimax->total;
		instr->ntuples = ntuples.nsimax->ntuples;
		instr->ntuples2 = ntuples.nsimax->ntuples2;
		instr->nloops = ntuples.nsimax->nloops;
		instr->nfiltered1 = ntuples.nsimax->nfiltered1;
		instr->nfiltered2 = ntuples.nsimax->nfiltered2;
		instr->execmemused = ntuples.nsimax->execmemused;
		instr->workmemused = ntuples.nsimax->workmemused;
		instr->workmemwanted = ntuples.nsimax->workmemwanted;
		instr->workfileCreated = ntuples.nsimax->workfileCreated;
		instr->firststart = ntuples.nsimax->firststart;
	}

	/* Save extra message text for the most interesting winning qExecs. */
	if (ctx->extratextbuf)
	{
		bool		saved = false;

		/* One worker which used or wanted the most work_mem */
		if (workmemwanted.agg.vmax >= workmemused.agg.vmax)
			cdbexplain_depStatAcc_saveText(&workmemwanted, ctx->extratextbuf, &saved);
		else if (workmemused.agg.vmax > 1.05 * cdbexplain_agg_avg(&workmemused.agg))
			cdbexplain_depStatAcc_saveText(&workmemused, ctx->extratextbuf, &saved);

		/* Worker which used the most executor memory (this node's usage) */
		if (execmemused.agg.vmax > 1.05 * cdbexplain_agg_avg(&execmemused.agg))
			cdbexplain_depStatAcc_saveText(&execmemused, ctx->extratextbuf, &saved);

		/*
		 * For the worker which had the highest peak executor memory usage
		 * overall across the whole slice, we'll report the extra message text
		 * from all of the nodes in the slice.  But only if that worker stands
		 * out more than 5% above the average.
		 */
		if (peakmemused.agg.vmax > 1.05 * cdbexplain_agg_avg(&peakmemused.agg))
			cdbexplain_depStatAcc_saveText(&peakmemused, ctx->extratextbuf, &saved);

		/*
		 * One worker which produced the greatest number of output rows.
		 * (Always give at least one node a chance to have its extra message
		 * text seen.  In case no node stood out above the others, make a
		 * repeatable choice based on the number of output rows.)
		 */
		if (!saved ||
			ntuples.agg.vmax > 1.05 * cdbexplain_agg_avg(&ntuples.agg))
			cdbexplain_depStatAcc_saveText(&ntuples, ctx->extratextbuf, &saved);
	}

	/*
	 * If this is a HashState, construct a SharedHashInfo with the stats from
	 * all the QEs. In PostgreSQL, SharedHashInfo is used to show stats of all
	 * the worker processes, we use it to show stats from all the QEs instead.
	 *
	 * GPDB_12_MERGE_FIXME: Should we do the same for Sort stats nowadays?
	 */
	if (IsA(planstate, HashState))
	{
		/* GPDB: Collect the results from all QE processes */
		HashState *hashstate = (HashState *) planstate;
		SharedHashInfo *shared_state;

		size_t		size;

		size = offsetof(SharedHashInfo, hinstrument) +
			ctx->nmsgptr * sizeof(HashInstrumentation);
		shared_state = palloc0(size);
		shared_state->num_workers = ctx->nmsgptr;

		/* Examine the statistics from each qExec. */
		for (imsgptr = 0; imsgptr < ctx->nmsgptr; imsgptr++)
		{
			/* Locate PlanState node's StatInst received from this qExec. */
			rsh = ctx->msgptrs[imsgptr];
			rsi = &rsh->inst[ctx->iStatInst];

			memcpy(&shared_state->hinstrument[imsgptr], &rsi->hashstats, sizeof(HashInstrumentation));
		}

		hashstate->shared_info = shared_state;
	}
}								/* cdbexplain_depositStatsToNode */


/*
 * cdbexplain_collectExtraText
 *	  Allow a node to supply additional text for its EXPLAIN ANALYZE report.
 *
 * Returns the starting offset of the extra message text from notebuf->data.
 * The caller can compute the length as notebuf->len minus the starting offset.
 * If the node did not provide any extra message text, the length will be 0.
 */
static int
cdbexplain_collectExtraText(PlanState *planstate, StringInfo notebuf)
{
	int			bnotes = notebuf->len;

	/*
	 * Invoke node's callback.  It may append to our notebuf and/or its own
	 * cdbexplainbuf; and store final statistics in its Instrumentation node.
	 */
	if (planstate->cdbexplainfun)
		planstate->cdbexplainfun(planstate, notebuf);

	/*
	 * Append contents of node's extra message buffer.  This allows nodes to
	 * contribute EXPLAIN ANALYZE info without having to set up a callback.
	 */
	if (planstate->cdbexplainbuf && planstate->cdbexplainbuf->len > 0)
	{
		/* If callback added to notebuf, make sure text ends with a newline. */
		if (bnotes < notebuf->len &&
			notebuf->data[notebuf->len - 1] != '\n')
			appendStringInfoChar(notebuf, '\n');

		appendBinaryStringInfo(notebuf, planstate->cdbexplainbuf->data,
							   planstate->cdbexplainbuf->len);

		resetStringInfo(planstate->cdbexplainbuf);
	}

	return bnotes;
}								/* cdbexplain_collectExtraText */


/*
 * cdbexplain_formatExtraText
 *	  Format extra message text into the EXPLAIN output buffer.
 */
static void
cdbexplain_formatExtraText(StringInfo str,
						   int indent,
						   int segindex,
						   const char *notes,
						   int notelen)
{
	const char *cp = notes;
	const char *ep = notes + notelen;

	/* Could be more than one line... */
	while (cp < ep)
	{
		const char *nlp = memchr(cp, '\n', ep - cp);
		const char *dp = nlp ? nlp : ep;

		/* Strip trailing whitespace. */
		while (cp < dp &&
			   isspace(dp[-1]))
			dp--;

		/* Add to output buffer. */
		if (cp < dp)
		{
			appendStringInfoSpaces(str, indent * 2);
			if (segindex >= 0)
			{
				appendStringInfo(str, "(seg%d) ", segindex);
				if (segindex < 10)
					appendStringInfoChar(str, ' ');
				if (segindex < 100)
					appendStringInfoChar(str, ' ');
			}
			appendBinaryStringInfo(str, cp, dp - cp);
			if (nlp)
				appendStringInfoChar(str, '\n');
		}

		if (!nlp)
			break;
		cp = nlp + 1;
	}
}								/* cdbexplain_formatExtraText */



/*
 * cdbexplain_formatMemory
 *	  Convert memory size to string from (double) bytes.
 *
 *		outbuf:  [output] pointer to a char buffer to be filled
 *		bufsize: [input] maximum number of characters to write to outbuf (must be set by the caller)
 *		bytes:	 [input] a value representing memory size in bytes to be written to outbuf
 */
static void
cdbexplain_formatMemory(char *outbuf, int bufsize, double bytes)
{
	Assert(outbuf != NULL && "CDBEXPLAIN: char buffer is null");
	Assert(bufsize > 0 && "CDBEXPLAIN: size of char buffer is zero");
	/* check if truncation occurs */
#ifdef USE_ASSERT_CHECKING
	int			nchars_written =
#endif							/* USE_ASSERT_CHECKING */
	snprintf(outbuf, bufsize, "%.0fK bytes", kb(bytes));

	Assert(nchars_written < bufsize &&
		   "CDBEXPLAIN:  size of char buffer is smaller than the required number of chars");
}								/* cdbexplain_formatMemory */



/*
 * cdbexplain_formatSeconds
 *	  Convert time in seconds to readable string
 *
 *		outbuf:  [output] pointer to a char buffer to be filled
 *		bufsize: [input] maximum number of characters to write to outbuf (must be set by the caller)
 *		seconds: [input] a value representing no. of seconds to be written to outbuf
 */
static void
cdbexplain_formatSeconds(char *outbuf, int bufsize, double seconds, bool unit)
{
	Assert(outbuf != NULL && "CDBEXPLAIN: char buffer is null");
	Assert(bufsize > 0 && "CDBEXPLAIN: size of char buffer is zero");
	double		ms = seconds * 1000.0;

	/* check if truncation occurs */
#ifdef USE_ASSERT_CHECKING
	int			nchars_written =
#endif							/* USE_ASSERT_CHECKING */
	snprintf(outbuf, bufsize, "%.*f%s",
			 (ms < 10.0 && ms != 0.0 && ms > -10.0) ? 3 : 0,
			 ms, (unit ? " ms" : ""));

	Assert(nchars_written < bufsize &&
		   "CDBEXPLAIN:  size of char buffer is smaller than the required number of chars");
}								/* cdbexplain_formatSeconds */


/*
 * cdbexplain_formatSeg
 *	  Convert segment id to string.
 *
 *		outbuf:  [output] pointer to a char buffer to be filled
 *		bufsize: [input] maximum number of characters to write to outbuf (must be set by the caller)
 *		segindex:[input] a value representing segment index to be written to outbuf
 *		nInst:	 [input] no. of stat instances
 */
static void
cdbexplain_formatSeg(char *outbuf, int bufsize, int segindex, int nInst)
{
	Assert(outbuf != NULL && "CDBEXPLAIN: char buffer is null");
	Assert(bufsize > 0 && "CDBEXPLAIN: size of char buffer is zero");

	if (nInst > 1 && segindex >= 0)
	{
		/* check if truncation occurs */
#ifdef USE_ASSERT_CHECKING
		int			nchars_written =
#endif							/* USE_ASSERT_CHECKING */
		snprintf(outbuf, bufsize, " (seg%d)", segindex);

		Assert(nchars_written < bufsize &&
			   "CDBEXPLAIN:  size of char buffer is smaller than the required number of chars");
	}
	else
	{
		outbuf[0] = '\0';
	}
}								/* cdbexplain_formatSeg */


/*
 * cdbexplain_showExecStatsBegin
 *	  Called by qDisp process to create a CdbExplain_ShowStatCtx structure
 *	  in which to accumulate overall statistics for a query.
 *
 * 'querystarttime' is the timestamp of the start of the query, in a
 *		platform-dependent format.
 *
 * Note this function is called before ExecutorStart(), so there is no EState
 * or SliceTable yet.
 */
struct CdbExplain_ShowStatCtx *
cdbexplain_showExecStatsBegin(struct QueryDesc *queryDesc,
							  instr_time querystarttime)
{
	CdbExplain_ShowStatCtx *ctx;
	int			nslice;

	Assert(Gp_role != GP_ROLE_EXECUTE);

	/* Allocate and zero the ShowStatCtx */
	ctx = (CdbExplain_ShowStatCtx *) palloc0(sizeof(*ctx));

	ctx->querystarttime = querystarttime;

	/* Determine number of slices.  (SliceTable hasn't been built yet.) */
	nslice = queryDesc->plannedstmt->numSlices;

	/* Allocate and zero the SliceSummary array. */
	ctx->nslice = nslice;
	ctx->slices = (CdbExplain_SliceSummary *) palloc0(nslice * sizeof(ctx->slices[0]));

	/* Allocate a buffer in which we can collect any extra message text. */
	initStringInfoOfSize(&ctx->extratextbuf, 4000);

	return ctx;
}								/* cdbexplain_showExecStatsBegin */

/*
 * nodeSupportWorkfileCaching
 *	 Return true if a given node supports workfile caching.
 */
static bool
nodeSupportWorkfileCaching(PlanState *planstate)
{
	return (IsA(planstate, SortState) ||
			IsA(planstate, HashJoinState) ||
			(IsA(planstate, AggState) &&((Agg *) planstate->plan)->aggstrategy == AGG_HASHED) ||
			IsA(planstate, MaterialState));
}

/*
 * cdbexplain_showExecStats
 *	  Called by qDisp process to format a node's EXPLAIN ANALYZE statistics.
 *
 * 'planstate' is the node whose statistics are to be displayed.
 * 'str' is the output buffer.
 * 'indent' is the root indentation for all the text generated for explain output
 * 'ctx' is a CdbExplain_ShowStatCtx object which was created by a call to
 *		cdbexplain_showExecStatsBegin().
 */
static void
cdbexplain_showExecStats(struct PlanState *planstate, ExplainState *es)
{
	struct CdbExplain_ShowStatCtx *ctx = es->showstatctx;
	Instrumentation *instr = planstate->instrument;
	CdbExplain_NodeSummary *ns = instr->cdbNodeSummary;
	instr_time	timediff;
	int			i;

	char		totalbuf[50];
	char		avgbuf[50];
	char		maxbuf[50];
	char		segbuf[50];
	char		startbuf[50];

	/* Might not have received stats from qExecs if they hit errors. */
	if (!ns)
		return;

	Assert(instr != NULL);

	/*
	 * Executor memory used by this individual node, if it allocates from a
	 * memory context of its own instead of sharing the per-query context.
	 */
	if (es->analyze && ns->execmemused.vcnt > 0)
	{
		if (es->format == EXPLAIN_FORMAT_TEXT)
		{
			appendStringInfoSpaces(es->str, es->indent * 2);
			appendStringInfo(es->str, "Executor Memory: %ldkB  Segments: %d  Max: %ldkB (segment %d)\n",
							 (long) kb(ns->execmemused.vsum),
							 ns->execmemused.vcnt,
							 (long) kb(ns->execmemused.vmax),
							 ns->execmemused.imax);
		}
		else
		{
			ExplainPropertyInteger("Executor Memory", "kB", kb(ns->execmemused.vsum), es);
			ExplainPropertyInteger("Executor Memory Segments", NULL, ns->execmemused.vcnt, es);
			ExplainPropertyInteger("Executor Max Memory", "kB", kb(ns->execmemused.vmax), es);
			ExplainPropertyInteger("Executor Max Memory Segment", NULL, ns->execmemused.imax, es);
		}
	}

	/*
	 * Actual work_mem used and wanted
	 */
	if (es->analyze && es->verbose && ns->workmemused.vcnt > 0)
	{
		if (es->format == EXPLAIN_FORMAT_TEXT)
		{
			appendStringInfoSpaces(es->str, es->indent * 2);
			appendStringInfo(es->str, "work_mem: %ldkB  Segments: %d  Max: %ldkB (segment %d)",
							 (long) kb(ns->workmemused.vsum),
							 ns->workmemused.vcnt,
							 (long) kb(ns->workmemused.vmax),
							 ns->workmemused.imax);

			/*
			 * Total number of segments in which this node reuses cached or
			 * creates workfiles.
			 */
			if (nodeSupportWorkfileCaching(planstate))
				appendStringInfo(es->str, "  Workfile: (%d spilling)",
								 ns->totalWorkfileCreated.vcnt);

			appendStringInfo(es->str, "\n");

			if (ns->workmemwanted.vcnt > 0)
			{
				appendStringInfoSpaces(es->str, es->indent * 2);
				cdbexplain_formatMemory(maxbuf, sizeof(maxbuf), ns->workmemwanted.vmax);
				if (ns->ninst == 1)
				{
					appendStringInfo(es->str,
								 "Work_mem wanted: %s to lessen workfile I/O.",
								 maxbuf);
				}
				else
				{
					cdbexplain_formatMemory(avgbuf, sizeof(avgbuf), cdbexplain_agg_avg(&ns->workmemwanted));
					cdbexplain_formatSeg(segbuf, sizeof(segbuf), ns->workmemwanted.imax, ns->ninst);
					appendStringInfo(es->str,
									 "Work_mem wanted: %s avg, %s max%s"
									 " to lessen workfile I/O affecting %d workers.",
									 avgbuf, maxbuf, segbuf, ns->workmemwanted.vcnt);
				}

				appendStringInfo(es->str, "\n");
			}
		}
		else
		{
			ExplainOpenGroup("work_mem", "work_mem", true, es);
			ExplainPropertyInteger("Used", "kB", kb(ns->workmemused.vsum), es);
			ExplainPropertyInteger("Segments", NULL, ns->workmemused.vcnt, es);
			ExplainPropertyInteger("Max Memory", "kB", kb(ns->workmemused.vmax), es);
			ExplainPropertyInteger("Max Memory Segment", NULL, ns->workmemused.imax, es);

			/*
			 * Total number of segments in which this node reuses cached or
			 * creates workfiles.
			 */
			if (nodeSupportWorkfileCaching(planstate))
				ExplainPropertyInteger("Workfile Spilling", NULL, ns->totalWorkfileCreated.vcnt, es);

			if (ns->workmemwanted.vcnt > 0)
			{
				ExplainPropertyInteger("Max Memory Wanted", "kB", kb(ns->workmemwanted.vmax), es);

				if (ns->ninst > 1)
				{
					ExplainPropertyInteger("Max Memory Wanted Segment", NULL, ns->workmemwanted.imax, es);
					ExplainPropertyInteger("Avg Memory Wanted", "kB", kb(cdbexplain_agg_avg(&ns->workmemwanted)), es);
					ExplainPropertyInteger("Segments Affected", NULL, ns->ninst, es);
				}
			}

			ExplainCloseGroup("work_mem", "work_mem", true, es);
		}
	}

	/*
	 * Print number of partitioned tables scanned for dynamic scans.
	 */
	if (0 <= ns->totalPartTableScanned.vcnt && (T_DynamicSeqScanState == planstate->type
												|| T_DynamicIndexScanState == planstate->type
												|| T_DynamicBitmapHeapScanState == planstate->type))
	{
		/*
		 * FIXME: Only displayed in TEXT format
		 * [#159443692]
		 */
		if (es->format == EXPLAIN_FORMAT_TEXT)
		{
			double		nPartTableScanned_avg = cdbexplain_agg_avg(&ns->totalPartTableScanned);

			if (0 == nPartTableScanned_avg)
			{
				if (T_DynamicBitmapHeapScanState == planstate->type)
				{
					appendStringInfoSpaces(es->str, es->indent * 2);
					appendStringInfo(es->str,
									 "Partitions scanned:  0 .\n");
				}
			}
			else
			{
				cdbexplain_formatSeg(segbuf, sizeof(segbuf), ns->totalPartTableScanned.imax, ns->ninst);

				appendStringInfoSpaces(es->str, es->indent * 2);

				/* only 1 segment scans partitions */
				if (1 == ns->totalPartTableScanned.vcnt)
				{
					/* rescan */
					if (1 < instr->nloops)
					{
						double		totalPartTableScannedPerRescan = ns->totalPartTableScanned.vmax / instr->nloops;

						appendStringInfo(es->str,
										 "Partitions scanned:  %.0f %s of %ld scans.\n",
										 totalPartTableScannedPerRescan,
										 segbuf,
										 instr->nloops);
					}
					else
					{
						appendStringInfo(es->str,
										 "Partitions scanned:  %.0f %s.\n",
										 ns->totalPartTableScanned.vmax,
										 segbuf);
					}
				}
				else
				{
					/* rescan */
					if (1 < instr->nloops)
					{
						double		totalPartTableScannedPerRescan = nPartTableScanned_avg / instr->nloops;
						double		maxPartTableScannedPerRescan = ns->totalPartTableScanned.vmax / instr->nloops;

						appendStringInfo(es->str,
										 "Partitions scanned:  Avg %.1f x %d workers of %ld scans."
										 "  Max %.0f parts%s.\n",
										 totalPartTableScannedPerRescan,
										 ns->totalPartTableScanned.vcnt,
										 instr->nloops,
										 maxPartTableScannedPerRescan,
										 segbuf
							);
					}
					else
					{
						appendStringInfo(es->str,
										 "Partitions scanned:  Avg %.1f x %d workers."
										 "  Max %.0f parts%s.\n",
										 nPartTableScanned_avg,
										 ns->totalPartTableScanned.vcnt,
										 ns->totalPartTableScanned.vmax,
										 segbuf);
					}
				}
			}
		}
	}

	bool 			haveExtraText = false;
	StringInfoData	extraData;

	initStringInfo(&extraData);

	for (i = 0; i < ns->ninst; i++)
	{
		CdbExplain_StatInst *nsi = &ns->insts[i];

		if (nsi->bnotes < nsi->enotes)
		{
			if (!haveExtraText)
			{
				ExplainOpenGroup("Extra Text", "Extra Text", false, es);
				ExplainOpenGroup("Segment", NULL, true, es);
				haveExtraText = true;
			}
			
			resetStringInfo(&extraData);

			cdbexplain_formatExtraText(&extraData,
									   0,
									   (ns->ninst == 1) ? -1
									   : ns->segindex0 + i,
									   ctx->extratextbuf.data + nsi->bnotes,
									   nsi->enotes - nsi->bnotes);
			ExplainPropertyStringInfo("Extra Text", es, "%s", extraData.data);
		}
	}

	if (haveExtraText)
	{
		ExplainCloseGroup("Segment", NULL, true, es);
		ExplainCloseGroup("Extra Text", "Extra Text", false, es);
	}
	pfree(extraData.data);

	/*
	 * Dump stats for all workers.
	 */
	if (gp_enable_explain_allstat && ns->segindex0 >= 0 && ns->ninst > 0)
	{
		if (es->format == EXPLAIN_FORMAT_TEXT)
		{
			/*
			 * create a header for all stats: separate each individual stat by an
			 * underscore, separate the grouped stats for each node by a slash
			 */
			appendStringInfoSpaces(es->str, es->indent * 2);
			appendStringInfoString(es->str,
								   "allstat: seg_firststart_total_ntuples");
		}
		else
			ExplainOpenGroup("Allstat", "Allstat", true, es);

		for (i = 0; i < ns->ninst; i++)
		{
			CdbExplain_StatInst *nsi = &ns->insts[i];

			if (INSTR_TIME_IS_ZERO(nsi->firststart))
				continue;

			/* Time from start of query on qDisp to worker's first result row */
			INSTR_TIME_SET_ZERO(timediff);
			INSTR_TIME_ACCUM_DIFF(timediff, nsi->firststart, ctx->querystarttime);

			if (es->format == EXPLAIN_FORMAT_TEXT)
			{
				cdbexplain_formatSeconds(startbuf, sizeof(startbuf),
										 INSTR_TIME_GET_DOUBLE(timediff), true);
				cdbexplain_formatSeconds(totalbuf, sizeof(totalbuf),
										 nsi->total, true);
				appendStringInfo(es->str,
								 "/seg%d_%s_%s_%.0f",
								 ns->segindex0 + i,
								 startbuf,
								 totalbuf,
								 nsi->ntuples);
			}
			else
			{
				cdbexplain_formatSeconds(startbuf, sizeof(startbuf),
										 INSTR_TIME_GET_DOUBLE(timediff), false);
				cdbexplain_formatSeconds(totalbuf, sizeof(totalbuf),
										 nsi->total, false);

				ExplainOpenGroup("Segment", NULL, false, es);
				ExplainPropertyInteger("Segment index", NULL, ns->segindex0 + i, es);
				ExplainPropertyText("Time To First Result", startbuf, es);
				ExplainPropertyText("Time To Total Result", totalbuf, es);
				ExplainPropertyFloat("Tuples", NULL, nsi->ntuples, 1, es);
				ExplainCloseGroup("Segment", NULL, false, es);
			}
		}

		if (es->format == EXPLAIN_FORMAT_TEXT)
			appendStringInfoString(es->str, "//end\n");
		else
			ExplainCloseGroup("Allstat", "Allstat", true, es);
	}
}								/* cdbexplain_showExecStats */

/*
 *	ExplainPrintExecStatsEnd
 *			External API wrapper for cdbexplain_showExecStatsEnd
 *
 * This is an externally exposed wrapper for cdbexplain_showExecStatsEnd such
 * that extensions, such as auto_explain, can leverage the Greenplum specific
 * parts of the EXPLAIN machinery.
 */
void
ExplainPrintExecStatsEnd(ExplainState *es, QueryDesc *queryDesc)
{
	cdbexplain_showExecStatsEnd(queryDesc->plannedstmt,
								queryDesc->showstatctx,
								queryDesc->estate, es);
}

/*
 * cdbexplain_showExecStatsEnd
 *	  Called by qDisp process to format the overall statistics for a query
 *	  into the caller's buffer.
 *
 * 'ctx' is the CdbExplain_ShowStatCtx object which was created by a call to
 *		cdbexplain_showExecStatsBegin() and contains statistics which have
 *		been accumulated over a series of calls to cdbexplain_showExecStats().
 *		Invalid on return (it is freed).
 *
 * This doesn't free the CdbExplain_ShowStatCtx object or buffers, because
 * they will be free'd shortly by the end of statement anyway.
 */
static void
cdbexplain_showExecStatsEnd(struct PlannedStmt *stmt,
							struct CdbExplain_ShowStatCtx *showstatctx,
							struct EState *estate,
							ExplainState *es)
{
	if (!es->summary)
		return;

    gpexplain_formatSlicesOutput(showstatctx, estate, es);

	if (!IsResManagerMemoryPolicyNone())
	{
		ExplainOpenGroup("Statement statistics", "Statement statistics", true, es);
		if (es->format == EXPLAIN_FORMAT_TEXT)
			appendStringInfo(es->str, "Memory used:  %ldkB\n", (long) kb(stmt->query_mem));
		else
			ExplainPropertyInteger("Memory used", "kB", kb(stmt->query_mem), es);

		if (showstatctx->workmemwanted_max > 0)
		{
			long mem_wanted;

			mem_wanted = (long) PolicyAutoStatementMemForNoSpill(stmt,
							(uint64) showstatctx->workmemwanted_max);

			/*
			 * Round up to a kilobyte in case we end up requiring less than
			 * that.
			 */
			if (mem_wanted <= 1024L)
				mem_wanted = 1L;
			else
				mem_wanted = mem_wanted / 1024L;

			if (es->format == EXPLAIN_FORMAT_TEXT)
				appendStringInfo(es->str, "Memory wanted:  %ldkB\n", mem_wanted);
			else
				ExplainPropertyInteger("Memory wanted", "kB", mem_wanted, es);
		}

		ExplainCloseGroup("Statement statistics", "Statement statistics", true, es);
	}
}								/* cdbexplain_showExecStatsEnd */

/*
 * Given a statistics context search for all the slice statistics
 * and format them to the correct layout
 */
static void
gpexplain_formatSlicesOutput(struct CdbExplain_ShowStatCtx *showstatctx,
                             struct EState *estate,
                             ExplainState *es)
{
	ExecSlice  *slice;
	int			sliceIndex;
	int			flag;
	double		total_memory_across_slices = 0;

	char		avgbuf[50];
	char		maxbuf[50];
	char		segbuf[50];

    if (showstatctx->nslice > 0)
        ExplainOpenGroup("Slice statistics", "Slice statistics", false, es);

    for (sliceIndex = 0; sliceIndex < showstatctx->nslice; sliceIndex++)
    {
        CdbExplain_SliceSummary *ss = &showstatctx->slices[sliceIndex];
        CdbExplain_DispatchSummary *ds = &ss->dispatchSummary;
        
        flag = es->str->len;
        if (es->format == EXPLAIN_FORMAT_TEXT)
        {

            appendStringInfo(es->str, "  (slice%d) ", sliceIndex);
            if (sliceIndex < 10)
                appendStringInfoChar(es->str, ' ');

            appendStringInfoString(es->str, "  ");
        }
        else 
        {
            ExplainOpenGroup("Slice", NULL, true, es);
            ExplainPropertyInteger("Slice", NULL, sliceIndex, es);
        }

        /* Worker counts */
        slice = getCurrentSlice(estate, sliceIndex);
        if (slice &&
			list_length(slice->segments) > 0 &&
			list_length(slice->segments) != ss->dispatchSummary.nOk)
        {
			int			nNotDispatched;
			StringInfoData workersInformationText;

			nNotDispatched = list_length(slice->segments) - ds->nResult + ds->nNotDispatched;

			es->str->data[flag] = (ss->dispatchSummary.nError > 0) ? 'X' : '_';

			initStringInfo(&workersInformationText);
			appendStringInfo(&workersInformationText, "Workers:");

            if (es->format == EXPLAIN_FORMAT_TEXT)
            {
                if (ds->nError == 1)
                {
                    appendStringInfo(&workersInformationText,
                                     " %d error;",
                                     ds->nError);
                }
                else if (ds->nError > 1)
                {
                    appendStringInfo(&workersInformationText,
                                     " %d errors;",
                                     ds->nError);
                }
            }
            else
            {
                ExplainOpenGroup("Workers", "Workers", true, es);
                if (ds->nError > 0)
                    ExplainPropertyInteger("Errors", NULL, ds->nError, es);
            }

            if (ds->nCanceled > 0)
            {
                if (es->format == EXPLAIN_FORMAT_TEXT)
                {
                    appendStringInfo(&workersInformationText,
                                     " %d canceled;",
                                     ds->nCanceled);
                }
                else
                {
                    ExplainPropertyInteger("Canceled", NULL, ds->nCanceled, es);
                }
            }

            if (nNotDispatched > 0)
            {
                if (es->format == EXPLAIN_FORMAT_TEXT)
                {
                    appendStringInfo(&workersInformationText,
                                     " %d not dispatched;",
                                     nNotDispatched);
                }
                else
                {
                    ExplainPropertyInteger("Not Dispatched", NULL, nNotDispatched, es);
                }
            }

            if (ds->nIgnorableError > 0)
            {
                if (es->format == EXPLAIN_FORMAT_TEXT)
                {
                    appendStringInfo(&workersInformationText,
                                     " %d aborted;",
                                     ds->nIgnorableError);
                }
                else
                {
                    ExplainPropertyInteger("Aborted", NULL, ds->nIgnorableError, es);
                }
            }

            if (ds->nOk > 0)
            {
                if (es->format == EXPLAIN_FORMAT_TEXT)
                {
                    appendStringInfo(&workersInformationText,
                                     " %d ok;",
                                     ds->nOk);
                }
                else
                {
                    ExplainPropertyInteger("Ok", NULL, ds->nOk, es);
                }
            }

            if (es->format == EXPLAIN_FORMAT_TEXT)
            {
                workersInformationText.len--;
                ExplainPropertyStringInfo("Workers", es, "%s.  ", workersInformationText.data);
            }
            else
            {
                ExplainCloseGroup("Workers", "Workers", true, es);
            }
        }

        /* Executor memory high-water mark */
        cdbexplain_formatMemory(maxbuf, sizeof(maxbuf), ss->peakmemused.vmax);
        if (ss->peakmemused.vcnt == 1)
        {
            if (es->format == EXPLAIN_FORMAT_TEXT)
            {
                const char *seg = segbuf;

                if (ss->peakmemused.imax >= 0)
                {
                    cdbexplain_formatSeg(segbuf, sizeof(segbuf), ss->peakmemused.imax, 999);
                }
                else if (slice && list_length(slice->segments) > 0)
                {
                    seg = " (entry db)";
                }
                else
                {
                    seg = "";
                }
                appendStringInfo(es->str,
                                 "Executor memory: %s%s.",
                                 maxbuf,
                                 seg);
            }
            else
            {
                ExplainPropertyInteger("Executor Memory", "kB", ss->peakmemused.vmax, es);
            }
        }
        else if (ss->peakmemused.vcnt > 1)
        {
            if (es->format == EXPLAIN_FORMAT_TEXT)
            {
                cdbexplain_formatMemory(avgbuf, sizeof(avgbuf), cdbexplain_agg_avg(&ss->peakmemused));
                cdbexplain_formatSeg(segbuf, sizeof(segbuf), ss->peakmemused.imax, ss->nworker);
                appendStringInfo(es->str,
                                 "Executor memory: %s avg x %d workers, %s max%s.",
                                 avgbuf,
                                 ss->peakmemused.vcnt,
                                 maxbuf,
                                 segbuf);
            }
            else
            {
                ExplainOpenGroup("Executor Memory", "Executor Memory", true, es);
                ExplainPropertyInteger("Average", "kB", cdbexplain_agg_avg(&ss->peakmemused), es);
                ExplainPropertyInteger("Workers", NULL, ss->peakmemused.vcnt, es);
                ExplainPropertyInteger("Maximum Memory Used", "kB", ss->peakmemused.vmax, es);
                ExplainCloseGroup("Executor Memory", "Executor Memory", true, es);
            }
        }

        if (EXPLAIN_MEMORY_VERBOSITY_SUPPRESS < explain_memory_verbosity)
        {
            /* Vmem reserved by QEs */
            cdbexplain_formatMemory(maxbuf, sizeof(maxbuf), ss->vmem_reserved.vmax);
            if (ss->vmem_reserved.vcnt == 1)
            {

                if (es->format == EXPLAIN_FORMAT_TEXT)
                {
                    const char *seg = segbuf;

                    if (ss->vmem_reserved.imax >= 0)
                    {
                        cdbexplain_formatSeg(segbuf, sizeof(segbuf), ss->vmem_reserved.imax, 999);
                    }
                    else if (slice && list_length(slice->segments) > 0)
                    {
                        seg = " (entry db)";
                    }
                    else
                    {
                        seg = "";
                    }
                    appendStringInfo(es->str,
                                     "  Vmem reserved: %s%s.",
                                     maxbuf,
                                     seg);
                }
                else
                {
                    ExplainPropertyInteger("Virtual Memory", "kB", ss->vmem_reserved.vmax, es);
                }
            }
            else if (ss->vmem_reserved.vcnt > 1)
            {
                if (es->format == EXPLAIN_FORMAT_TEXT)
                {
                    cdbexplain_formatMemory(avgbuf, sizeof(avgbuf), cdbexplain_agg_avg(&ss->vmem_reserved));
                    cdbexplain_formatSeg(segbuf, sizeof(segbuf), ss->vmem_reserved.imax, ss->nworker);
                    appendStringInfo(es->str,
                                     "  Vmem reserved: %s avg x %d workers, %s max%s.",
                                     avgbuf,
                                     ss->vmem_reserved.vcnt,
                                     maxbuf,
                                     segbuf);
                }
                else
                {
                    ExplainOpenGroup("Virtual Memory", "Virtual Memory", true, es);
                    ExplainPropertyInteger("Average", "kB", cdbexplain_agg_avg(&ss->vmem_reserved), es);
                    ExplainPropertyInteger("Workers", NULL, ss->vmem_reserved.vcnt, es);
                    ExplainPropertyInteger("Maximum Memory Used", "kB", ss->vmem_reserved.vmax, es);
                    ExplainCloseGroup("Virtual Memory", "Virtual Memory", true, es);
                }

            }
        }

        /* Work_mem used/wanted (max over all nodes and workers of slice) */
        if (ss->workmemused_max + ss->workmemwanted_max > 0)
        {
            if (es->format == EXPLAIN_FORMAT_TEXT)
            {
                cdbexplain_formatMemory(maxbuf, sizeof(maxbuf), ss->workmemused_max);
                appendStringInfo(es->str, "  Work_mem: %s max", maxbuf);
                if (ss->workmemwanted_max > 0)
                {
                    es->str->data[flag] = '*';	/* draw attention to this slice */
                    cdbexplain_formatMemory(maxbuf, sizeof(maxbuf), ss->workmemwanted_max);
                    appendStringInfo(es->str, ", %s wanted", maxbuf);
                }
                appendStringInfoChar(es->str, '.');
            }
            else
            {
                ExplainPropertyInteger("Work Maximum Memory", "kB", ss->workmemused_max, es);
            }
        }

        if (es->format == EXPLAIN_FORMAT_TEXT)
            appendStringInfoChar(es->str, '\n');

        ExplainCloseGroup("Slice", NULL, true, es);
    }

    if (showstatctx->nslice > 0)
        ExplainCloseGroup("Slice statistics", "Slice statistics", false, es);

    if (total_memory_across_slices > 0)
    {
        if (es->format == EXPLAIN_FORMAT_TEXT)
        {
            appendStringInfo(es->str, "Total memory used across slices: %.0fK bytes \n", total_memory_across_slices);
        }
        else
        {
            ExplainPropertyInteger("Total memory used across slices", "bytes", total_memory_across_slices, es);
        }
    }
}

/*
 * Show the hash and merge keys for a Motion node.
 */
static void
show_motion_keys(PlanState *planstate, List *hashExpr, int nkeys, AttrNumber *keycols,
			     const char *qlabel, List *ancestors, ExplainState *es)
{
	Plan	   *plan = planstate->plan;
	List	   *context;
	char	   *exprstr;
	bool		useprefix = list_length(es->rtable) > 1;
	int			keyno;
	List	   *result = NIL;

	if (!nkeys && !hashExpr)
		return;

	/* Set up deparse context */
	context = set_deparse_context_planstate(es->deparse_cxt,
											(Node *) planstate,
											ancestors);

    /* Merge Receive ordering key */
    for (keyno = 0; keyno < nkeys; keyno++)
    {
	    /* find key expression in tlist */
	    AttrNumber	keyresno = keycols[keyno];
	    TargetEntry *target = get_tle_by_resno(plan->targetlist, keyresno);

	    /* Deparse the expression, showing any top-level cast */
	    if (target)
	        exprstr = deparse_expression((Node *) target->expr, context,
								         useprefix, true);
        else
        {
            elog(WARNING, "Gather Motion %s error: no tlist item %d",
                 qlabel, keyresno);
            exprstr = "*BOGUS*";
        }

		result = lappend(result, exprstr);
    }

	if (list_length(result) > 0)
		ExplainPropertyList(qlabel, result, es);

    /* Hashed repartitioning key */
    if (hashExpr)
    {
	    /* Deparse the expression */
	    exprstr = deparse_expression((Node *)hashExpr, context, useprefix, true);
		ExplainPropertyText("Hash Key", exprstr, es);
    }
}

/*
 * Explain a parallel retrieve cursor,
 * indicate the endpoints exist on entry DB, or on some segments,
 * or on all segments.
 */
void ExplainParallelRetrieveCursor(ExplainState *es, QueryDesc* queryDesc)
{
	PlannedStmt *plan = queryDesc->plannedstmt;
	SliceTable *sliceTable = queryDesc->estate->es_sliceTable;
	StringInfoData            endpointInfoStr;
	enum EndPointExecPosition endPointExecPosition;

	initStringInfo(&endpointInfoStr);

	endPointExecPosition = GetParallelCursorEndpointPosition(plan);
	ExplainOpenGroup("Cursor", "Cursor", true, es);
	switch(endPointExecPosition)
	{
		case ENDPOINT_ON_ENTRY_DB:
		{
			appendStringInfo(&endpointInfoStr, "\"on coordinator\"");
			break;
		}
		case ENDPOINT_ON_SINGLE_QE:
		{
			appendStringInfo(
							 &endpointInfoStr, "\"on segment: contentid [%d]\"",
							 gp_session_id % plan->planTree->flow->numsegments);
			break;
		}
		case ENDPOINT_ON_SOME_QE:
		{
			ListCell * cell;
			bool isFirst = true;
			appendStringInfo(&endpointInfoStr, "on segments: contentid [");
			ExecSlice *slice = &sliceTable->slices[0];
			foreach(cell, slice->segments)
			{
				int contentid = lfirst_int(cell);
				appendStringInfo(&endpointInfoStr, (isFirst)?"%d":", %d", contentid);
				isFirst = false;
			}
			appendStringInfo(&endpointInfoStr, "]");
			break;
		}
		case ENDPOINT_ON_ALL_QE:
		{
			appendStringInfo(&endpointInfoStr, "on all %d segments", getgpsegmentCount());
			break;
		}
		default:
		{
			elog(ERROR, "invalid endpoint position : %d", endPointExecPosition);
			break;
		}
	}
	ExplainPropertyText("Endpoint", endpointInfoStr.data, es);
	ExplainCloseGroup("Cursor", "Cursor", true, es);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn aggregatecmds 源码

greenplumn alter 源码

greenplumn amcmds 源码

greenplumn analyze 源码

greenplumn analyzefuncs 源码

greenplumn analyzeutils 源码

greenplumn async 源码

greenplumn cluster 源码

greenplumn collationcmds 源码

greenplumn comment 源码

0  赞