greenplumn cdbcopy 源码

  • 2022-08-18
  • 浏览 (337)

greenplumn cdbcopy 代码

文件路径:/src/backend/cdb/cdbcopy.c

/*--------------------------------------------------------------------------
 *
 * cdbcopy.c
 * 	 Provides routines that executed a COPY command on an MPP cluster. These
 * 	 routines are called from the backend COPY command whenever MPP is in the
 * 	 default dispatch mode.
 *
 * Usage:
 *
 * CdbCopy cdbCopy = makeCdbCopy();
 *
 * PG_TRY();
 * {
 *     cdbCopyStart(cdbCopy, ...);
 *
 *     // process each row
 *     while (...)
 *     {
 *         cdbCopyGetData(cdbCopy, ...)
 *         or
 *         cdbCopySendData(cdbCopy, ...)
 *     }
 *     cdbCopyEnd(cdbCopy);
 * }
 * PG_CATCH();
 * {
 *     cdbCopyAbort(cdbCopy);
 * }
 * PG_END_TRY();
 *
 *
 * makeCdbCopy() creates a struct to hold information about the on-going COPY.
 * It does not change the state of the connection yet.
 *
 * cdbCopyStart() puts the connections in the gang into COPY mode. If an error
 * occurs during or after cdbCopyStart(), you must call cdbCopyAbort() to reset
 * the connections to normal state!
 *
 * cdbCopyGetData() and cdbCopySendData() call libpq's PQgetCopyData() and
 * PQputCopyData(), respectively. If an error occurs, it is thrown with ereport().
 *
 * When you're done, call cdbCopyEnd().
 *
 * Portions Copyright (c) 2005-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 *
 * IDENTIFICATION
 *	    src/backend/cdb/cdbcopy.c
*
*--------------------------------------------------------------------------
*/

#include "postgres.h"
#include "miscadmin.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "access/xact.h"
#include "cdb/cdbconn.h"
#include "cdb/cdbcopy.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbfts.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "mb/pg_wchar.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "storage/pmsignal.h"
#include "tcop/tcopprot.h"
#include "utils/faultinjector.h"
#include "utils/memutils.h"

#include <poll.h>

static void cdbCopyEndInternal(CdbCopy *c, char *abort_msg,
				   int64 *total_rows_completed_p,
				   int64 *total_rows_rejected_p);

static Gang *
getCdbCopyPrimaryGang(CdbCopy *c)
{
	if (!c || !c->dispatcherState)
		return NULL;

	return (Gang *)linitial(c->dispatcherState->allocatedGangs);
}

/*
 * Create a cdbCopy object that includes all the cdb
 * information and state needed by the backend COPY.
 */
CdbCopy *
makeCdbCopy(CopyState cstate, bool is_copy_in)
{
	CdbCopy		*c;
	GpPolicy	*policy;

	policy = cstate->rel->rd_cdbpolicy;
	Assert(policy);

	c = palloc0(sizeof(CdbCopy));

	/* fresh start */
	c->total_segs = 0;
	c->copy_in = is_copy_in;
	c->seglist = NIL;
	c->dispatcherState = NULL;
	initStringInfo(&(c->copy_out_buf));

	/*
	 * COPY replicated table TO file, pick only one replica, otherwise, duplicate
	 * rows will be copied.
	 */
	if (!is_copy_in && GpPolicyIsReplicated(policy) && !cstate->on_segment)
	{
		c->total_segs = 1;
		c->seglist = list_make1_int(gp_session_id % c->total_segs);
	}
	else
	{
		int			i;

		c->total_segs = policy->numsegments;

		for (i = 0; i < c->total_segs; i++)
			c->seglist = lappend_int(c->seglist, i);
	}

	cstate->cdbCopy = c;

	return c;
}


/*
 * starts a copy command on a specific segment database.
 *
 * may pg_throw via elog/ereport.
 */
void
cdbCopyStart(CdbCopy *c, CopyStmt *stmt, int file_encoding)
{
	int			flags;

	stmt = copyObject(stmt);

	/*
	 * If the output needs to be in a different encoding, tell the segment.
	 * Normally, when we run normal queries, we keep the segment connections
	 * in database encoding, and do the encoding conversions in the QD, just
	 * before sending results to the client. But in COPY TO, we don't do
	 * any conversions to the data we receive from the segments, so they
	 * must produce the output in the correct encoding.
	 *
	 * We do this by adding "ENCODING 'xxx'" option to the options list of
	 * the CopyStmt that we dispatch.
	 */
	if (file_encoding != GetDatabaseEncoding())
	{
		bool		found;
		ListCell   *option;

		/*
		 * But first check if the encoding option is already in the options
		 * list (i.e the user specified it explicitly in the COPY command)
		 */
		found = false;
		foreach(option, stmt->options)
		{
			DefElem    *defel = (DefElem *) lfirst(option);

			if (strcmp(defel->defname, "encoding") == 0)
			{
				/*
				 * The 'file_encoding' came from the options, so they should match, but
				 * let's sanity-check.
				 */
				if (pg_char_to_encoding(defGetString(defel)) != file_encoding)
					elog(ERROR, "encoding option in original COPY command does not match encoding being dispatched");
				found = true;
			}
		}

		if (!found)
		{
			const char *encname = pg_encoding_to_char(file_encoding);

			stmt->options = lappend(stmt->options,
									makeDefElem("encoding",
												(Node *) makeString(pstrdup(encname)), -1));
		}
	}

	flags = DF_WITH_SNAPSHOT | DF_CANCEL_ON_ERROR;
	if (c->copy_in)
		flags |= DF_NEED_TWO_PHASE;

	CdbDispatchCopyStart(c, (Node *) stmt, flags);

	SIMPLE_FAULT_INJECTOR("cdb_copy_start_after_dispatch");
}

/*
 * sends data to a copy command on all segments.
 */
void
cdbCopySendDataToAll(CdbCopy *c, const char *buffer, int nbytes)
{
	Gang	   *gp = getCdbCopyPrimaryGang(c);

	Assert(gp);

	for (int i = 0; i < gp->size; ++i)
	{
		int			seg = gp->db_descriptors[i]->segindex;

		cdbCopySendData(c, seg, buffer, nbytes);
	}
}

/*
 * sends data to a copy command on a specific segment (usually
 * the hash result of the data value).
 */
void
cdbCopySendData(CdbCopy *c, int target_seg, const char *buffer,
				int nbytes)
{
	SegmentDatabaseDescriptor *q;
	Gang	   *gp;
	int			result;

	/*
	 * NOTE!! note that another DELIM was added, for the buf_converted in the
	 * code above. I didn't do it because it's broken right now
	 */

	gp = getCdbCopyPrimaryGang(c);
	Assert(gp);
	q = getSegmentDescriptorFromGang(gp, target_seg);

	/* transmit the COPY data */
	result = PQputCopyData(q->conn, buffer, nbytes);

	if (result != 1)
	{
		if (result == 0)
		{
			/* We don't use blocking mode, so this shouldn't happen */
			ereport(ERROR,
					(errcode(ERRCODE_IO_ERROR),
					 errmsg("could not send COPY data to segment %d, attempt blocked",
							target_seg)));
		}
		else
			ereport(ERROR,
					(errcode(ERRCODE_IO_ERROR),
					 errmsg("could not send COPY data to segment %d: %s",
							target_seg, PQerrorMessage(q->conn))));
	}
}

/*
 * gets a chunk of rows of data from a copy command.
 * returns boolean true if done. Caller should still
 * empty the leftovers in the outbuf in that case.
 */
bool
cdbCopyGetData(CdbCopy *c, bool copy_cancel, uint64 *rows_processed)
{
	SegmentDatabaseDescriptor *q;
	Gang	   *gp;
	int			nbytes;

	/* clean out buf data */
	resetStringInfo(&c->copy_out_buf);

	gp = getCdbCopyPrimaryGang(c);

	/*
	 * MPP-7712: we used to issue the cancel-requests for each *row* we got
	 * back from each segment -- this is potentially millions of
	 * cancel-requests. Cancel requests consist of an out-of-band connection
	 * to the segment-postmaster, this is *not* a lightweight operation!
	 */
	if (copy_cancel)
	{
		ListCell   *cur;

		/* iterate through all the segments that still have data to give */
		foreach(cur, c->seglist)
		{
			int			source_seg = lfirst_int(cur);

			q = getSegmentDescriptorFromGang(gp, source_seg);

			/* send a query cancel request to that segdb */
			PQrequestCancel(q->conn);
		}
	}

	/*
	 * Collect data rows from the segments that still have rows to give until
	 * chunk minimum size is reached
	 */
	while (c->copy_out_buf.len < COPYOUT_CHUNK_SIZE)
	{
		ListCell   *cur;

		/* iterate through all the segments that still have data to give */
		foreach(cur, c->seglist)
		{
			int			source_seg = lfirst_int(cur);
			char	   *buffer;

			q = getSegmentDescriptorFromGang(gp, source_seg);

			/* get 1 row of COPY data */
			nbytes = PQgetCopyData(q->conn, &buffer, false);

			/*
			 * SUCCESS -- got a row of data
			 */
			if (nbytes > 0 && buffer)
			{
				/* append the data row to the data chunk */
				appendBinaryStringInfo(&(c->copy_out_buf), buffer, nbytes);

				/* increment the rows processed counter for the end tag */
				(*rows_processed)++;

				PQfreemem(buffer);
			}

			/*
			 * DONE -- Got all the data rows from this segment, or a cancel
			 * request.
			 *
			 * Remove the segment that completed sending data, from the list
			 * of in-progress segments.
			 *
			 * Note: After PQgetCopyData() returns -1, you need to call
			 * PGgetResult() to get any possible errors. But we don't do that
			 * here. That's done later, in the call to cdbCopyEnd() (or
			 * cdbCopyAbort(), if something went wrong.)
			 */
			else if (nbytes == -1)
			{
				c->seglist = list_delete_int(c->seglist, source_seg);

				if (list_length(c->seglist) == 0)
					return true;	/* all segments are done */

				/* start over from first seg as we just changed the seg list */
				break;
			}
			/*
			 * ERROR!
			 */
			else
			{
				/*
				 * should never happen since we are blocking. Don't bother to
				 * try again, exit with error.
				 */
				if (nbytes == 0)
					ereport(ERROR,
							(errcode(ERRCODE_IO_ERROR),
							 errmsg("could not send COPY data to segment %d, attempt blocked",
									source_seg)));

				if (nbytes == -2)
					ereport(ERROR,
							(errcode(ERRCODE_IO_ERROR),
							 errmsg("could not receive COPY data from segment %d: %s",
									source_seg, PQerrorMessage(q->conn))));
			}
		}

		if (c->copy_out_buf.len > COPYOUT_CHUNK_SIZE)
			break;
	}

	return false;
}

/*
 * Commands to end the cdbCopy.
 *
 * If an error occurrs, or if an error is reported by one of the segments,
 * cdbCopyEnd() throws it with ereport(), after closing the COPY and cleaning
 * up any resources associated with it.
 *
 * cdbCopyAbort() usually does not throw an error. It is used in error-recovery
 * codepaths, typically in a PG_CATCH() block, and the caller is about to
 * re-throw the original error that caused the abortion.
 */
void
cdbCopyAbort(CdbCopy *c)
{
	cdbCopyEndInternal(c, "aborting COPY in QE due to error in QD",
					   NULL, NULL);
}

/*
 * End the copy command on all segment databases,
 * and fetch the total number of rows completed by all QEs
 */
void
cdbCopyEnd(CdbCopy *c,
		   int64 *total_rows_completed_p,
		   int64 *total_rows_rejected_p)
{
	CHECK_FOR_INTERRUPTS();

	cdbCopyEndInternal(c, NULL,
					   total_rows_completed_p,
					   total_rows_rejected_p);
}

static void
cdbCopyEndInternal(CdbCopy *c, char *abort_msg,
				   int64 *total_rows_completed_p,
				   int64 *total_rows_rejected_p)
{
	Gang	   *gp;
	int			num_bad_connections = 0;
	int64		total_rows_completed = 0;	/* total num rows completed by all
											 * QEs */
	int64		total_rows_rejected = 0;	/* total num rows rejected by all
											 * QEs */
	ErrorData *first_error = NULL;
	int			seg;
	struct pollfd	*pollRead;
	bool		io_errors = false;
	StringInfoData io_err_msg;
	List           *oidList = NIL;
	int				nest_level;

	SIMPLE_FAULT_INJECTOR("cdb_copy_end_internal_start");

	initStringInfo(&io_err_msg);

	/*
	 * Don't try to end a copy that already ended with the destruction of the
	 * writer gang. We know that this has happened if the CdbCopy's
	 * primary_writer is NULL.
	 *
	 * GPDB_91_MERGE_FIXME: ugh, this is nasty. We shouldn't be calling
	 * cdbCopyEnd twice on the same CdbCopy in the first place!
	 */
	gp = getCdbCopyPrimaryGang(c);
	if (!gp)
	{
		if (total_rows_completed_p != NULL)
			*total_rows_completed_p = 0;
		if (total_rows_rejected_p != NULL)
			*total_rows_completed_p = -1;
		return;
	}

	/*
	 * In COPY in mode, call PQputCopyEnd() to tell the segments that we're done.
	 */
	if (c->copy_in)
	{
		for (seg = 0; seg < gp->size; seg++)
		{
			SegmentDatabaseDescriptor *q = gp->db_descriptors[seg];
			int			result;

			elog(DEBUG1, "PQputCopyEnd seg %d    ", q->segindex);
			/* end this COPY command */
			result = PQputCopyEnd(q->conn, abort_msg);

			/* get command end status */
			if (result == -1)
			{
				/* error */
				appendStringInfo(&io_err_msg,
								 "Failed to send end-of-copy to segment %d: %s",
								 seg, PQerrorMessage(q->conn));
				io_errors = true;
			}
			if (result == 0)
			{
				/* attempt blocked */

				/*
				 * CDB TODO: Can this occur?  The libpq documentation says, "this
				 * case is only possible if the connection is in nonblocking
				 * mode... wait for write-ready and try again", i.e., the proper
				 * response would be to retry, not error out.
				 */
				appendStringInfo(&io_err_msg,
								 "primary segment %d, dbid %d, attempt blocked\n",
								 seg, q->segment_database_info->config->dbid);
				io_errors = true;
			}
		}
	}

	nest_level = GetCurrentTransactionNestLevel();

	pollRead = (struct pollfd *) palloc(sizeof(struct pollfd));
	for (seg = 0; seg < gp->size; seg++)
	{
		SegmentDatabaseDescriptor *q = gp->db_descriptors[seg];
		int			result;
		PGresult   *res;
		int64		segment_rows_completed = 0; /* # of rows completed by this QE */
		int64		segment_rows_rejected = 0;	/* # of rows rejected by this QE */

		pollRead->fd = PQsocket(q->conn);
		pollRead->events = POLLIN;
		pollRead->revents = 0;

		while (PQisBusy(q->conn) && PQstatus(q->conn) == CONNECTION_OK)
		{
			if ((Gp_role == GP_ROLE_DISPATCH) && CancelRequested())
			{
				PQrequestCancel(q->conn);
			}

			if (poll(pollRead, 1, 200) > 0)
			{
				break;
			}
		}

		forwardQENotices();

		/*
		 * Fetch any error status existing on completion of the COPY command.
		 * It is critical that for any connection that had an asynchronous
		 * command sent thru it, we call PQgetResult until it returns NULL.
		 * Otherwise, the next time a command is sent to that connection, it
		 * will return an error that there's a command pending.
		 */
		HOLD_INTERRUPTS();
		while ((res = PQgetResult(q->conn)) != NULL && PQstatus(q->conn) != CONNECTION_BAD)
		{
			elog(DEBUG1, "PQgetResult got status %d seg %d    ",
				 PQresultStatus(res), q->segindex);

			forwardQENotices();

			/* if the COPY command had a data error */
			if (PQresultStatus(res) == PGRES_FATAL_ERROR)
			{
				/*
				 * Always append error from the primary. Append error from
				 * mirror only if its primary didn't have an error.
				 *
				 * For now, we only report the first error we get from the
				 * QE's.
				 *
				 * We get the error message in pieces so that we could append
				 * whoami to the primary error message only.
				 */
				if (!first_error)
					first_error = cdbdisp_get_PQerror(res);
			}

			pgstat_combine_one_qe_result(&oidList, res, nest_level, q->segindex);

			if (q->conn->wrote_xlog)
			{
				MarkTopTransactionWriteXLogOnExecutor();

				/*
				* Reset the worte_xlog here. Since if the received pgresult not process
				* the xlog write message('x' message sends from QE in ReadyForQuery),
				* the value may still refer to previous dispatch statement. Which may
				* always mark current top transaction has wrote xlog on executor.
				*/
				q->conn->wrote_xlog = false;
			}

			/*
			 * If we are still in copy mode, tell QE to stop it.  COPY_IN
			 * protocol has a way to say 'end of copy' but COPY_OUT doesn't.
			 * We have no option but sending cancel message and consume the
			 * output until the state transition to non-COPY.
			 */
			if (PQresultStatus(res) == PGRES_COPY_IN)
			{
				elog(LOG, "Segment still in copy in, retrying the putCopyEnd");
				result = PQputCopyEnd(q->conn, NULL);
			}
			else if (PQresultStatus(res) == PGRES_COPY_OUT)
			{
				char	   *buffer = NULL;
				int			ret;

				elog(LOG, "Segment still in copy out, canceling QE");

				/*
				 * I'm a bit worried about sending a cancel, as if this is a
				 * success case the QE gets inconsistent state than QD.  But
				 * this code path is mostly for error handling and in a
				 * success case we wouldn't see COPY_OUT here. It's not clear
				 * what to do if this cancel failed, since this is not a path
				 * we can error out.  FATAL maybe the way, but I leave it for
				 * now.
				 */
				PQrequestCancel(q->conn);

				/*
				 * Need to consume data from the QE until cancellation is
				 * recognized. PQgetCopyData() returns -1 when the COPY is
				 * done, a non-zero result indicates data was returned and in
				 * that case we'll drop it immediately since we aren't
				 * interested in the contents.
				 */
				while ((ret = PQgetCopyData(q->conn, &buffer, false)) != -1)
				{
					if (ret > 0)
					{
						if (buffer)
							PQfreemem(buffer);
						continue;
					}

					/* An error occurred, log the error and break out */
					if (ret == -2)
					{
						ereport(WARNING,
								(errmsg("Error during cancellation: \"%s\"",
										PQerrorMessage(q->conn))));
						break;
					}
				}
				if (buffer)
					PQfreemem(buffer);
			}

			/* in SREH mode, check if this seg rejected (how many) rows */
			if (res->numRejected > 0)
				segment_rows_rejected = res->numRejected;

			/*
			 * When COPY FROM, need to calculate the number of this
			 * segment's completed rows
			 */
			if (res->numCompleted > 0)
				segment_rows_completed = res->numCompleted;

			/* free the PGresult object */
			PQclear(res);
		}
		RESUME_INTERRUPTS();

		/*
		 * add up the number of rows completed and rejected from this segment
		 * to the totals. Only count from primary segs.
		 */
		if (segment_rows_rejected > 0)
			total_rows_rejected += segment_rows_rejected;
		if (segment_rows_completed > 0)
			total_rows_completed += segment_rows_completed;

		/* Lost the connection? */
		if (PQstatus(q->conn) == CONNECTION_BAD)
		{
			/* command error */
			io_errors = true;
			appendStringInfo(&io_err_msg,
							 "Primary segment %d, dbid %d, with error: %s\n",
							 seg, q->segment_database_info->config->dbid,
							 PQerrorMessage(q->conn));

			/* Free the PGconn object. */
			PQfinish(q->conn);
			q->conn = NULL;

			/* Let FTS deal with it! */
			num_bad_connections++;
		}
	}

	CdbDispatchCopyEnd(c);

	/* If lost contact with segment db, try to reconnect. */
	if (num_bad_connections > 0)
	{
		elog(LOG, "error occurred while ending COPY: %s", io_err_msg.data);
		elog(LOG, "COPY signals FTS to probe segments");

		SendPostmasterSignal(PMSIGNAL_WAKEN_FTS);
		/*
		 * Before error out, we need to reset the session. Gang will be cleaned up
		 * when next transaction start, since it will find FTS version bump and
		 * call cdbcomponent_updateCdbComponents().
		 */
		resetSessionForPrimaryGangLoss();

		ereport(ERROR,
				(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
				 (errmsg("MPP detected %d segment failures, system is reconnected",
						 num_bad_connections))));
	}

	/*
	 * Unless we are aborting the COPY, report any errors with ereport()
	 */
	if (!abort_msg)
	{
		/* errors reported by the segments */
		if (first_error)
		{
			FlushErrorState();
			ReThrowError(first_error);
		}

		/* errors that occurred in the COPY itself */
		if (io_errors)
			ereport(ERROR,
					(errcode(ERRCODE_IO_ERROR),
					 errmsg("could not complete COPY on some segments"),
					 errdetail("%s", io_err_msg.data)));
	}

	if (total_rows_completed_p != NULL)
		*total_rows_completed_p = total_rows_completed;
	if (total_rows_rejected_p != NULL)
		*total_rows_rejected_p = total_rows_rejected;
	return;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbappendonlystorageformat 源码

greenplumn cdbappendonlystorageread 源码

greenplumn cdbappendonlystoragewrite 源码

greenplumn cdbappendonlyxlog 源码

greenplumn cdbbufferedappend 源码

greenplumn cdbbufferedread 源码

greenplumn cdbcat 源码

greenplumn cdbdistributedsnapshot 源码

greenplumn cdbdistributedxacts 源码

greenplumn cdbdistributedxid 源码

0  赞