greenplumn pg_recvlogical 源码

  • 2022-08-18
  • 浏览 (228)

greenplumn pg_recvlogical 代码

文件路径:/src/bin/pg_basebackup/pg_recvlogical.c

/*-------------------------------------------------------------------------
 *
 * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
 *					  fashion and write it to a local file.
 *
 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *		  src/bin/pg_basebackup/pg_recvlogical.c
 *-------------------------------------------------------------------------
 */

#include "postgres_fe.h"

#include <dirent.h>
#include <sys/stat.h>
#include <unistd.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif

/* local includes */
#include "streamutil.h"

#include "access/xlog_internal.h"
#include "common/file_perm.h"
#include "common/fe_memutils.h"
#include "common/logging.h"
#include "getopt_long.h"
#include "libpq-fe.h"
#include "libpq/pqsignal.h"
#include "pqexpbuffer.h"


/* Time to sleep between reconnection attempts */
#define RECONNECT_SLEEP_TIME 5

/* Global Options */
static char *outfile = NULL;
static int	verbose = 0;
static int	noloop = 0;
static int	standby_message_timeout = 10 * 1000;	/* 10 sec = default */
static int	fsync_interval = 10 * 1000; /* 10 sec = default */
static XLogRecPtr startpos = InvalidXLogRecPtr;
static XLogRecPtr endpos = InvalidXLogRecPtr;
static bool do_create_slot = false;
static bool slot_exists_ok = false;
static bool do_start_slot = false;
static bool do_drop_slot = false;
static char *replication_slot = NULL;

/* filled pairwise with option, value. value may be NULL */
static char **options;
static size_t noptions = 0;
static const char *plugin = "test_decoding";

/* Global State */
static int	outfd = -1;
static volatile sig_atomic_t time_to_abort = false;
static volatile sig_atomic_t output_reopen = false;
static bool output_isfile;
static TimestampTz output_last_fsync = -1;
static bool output_needs_fsync = false;
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;

static void usage(void);
static void StreamLogicalLog(void);
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
							   bool keepalive, XLogRecPtr lsn);

static void
usage(void)
{
	printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
		   progname);
	printf(_("Usage:\n"));
	printf(_("  %s [OPTION]...\n"), progname);
	printf(_("\nAction to be performed:\n"));
	printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
	printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
	printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
	printf(_("\nOptions:\n"));
	printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
	printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
	printf(_("  -F  --fsync-interval=SECS\n"
			 "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
	printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
	printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
	printf(_("  -o, --option=NAME[=VALUE]\n"
			 "                         pass option NAME with optional value VALUE to the\n"
			 "                         output plugin\n"));
	printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
	printf(_("  -s, --status-interval=SECS\n"
			 "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
	printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
	printf(_("  -v, --verbose          output verbose messages\n"));
	printf(_("  -V, --version          output version information, then exit\n"));
	printf(_("  -?, --help             show this help, then exit\n"));
	printf(_("\nConnection options:\n"));
	printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
	printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
	printf(_("  -p, --port=PORT        database server port number\n"));
	printf(_("  -U, --username=NAME    connect as specified database user\n"));
	printf(_("  -w, --no-password      never prompt for password\n"));
	printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
	printf(_("\nReport bugs to <pgsql-bugs@lists.postgresql.org>.\n"));
}

/*
 * Send a Standby Status Update message to server.
 */
static bool
sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
{
	static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
	static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;

	char		replybuf[1 + 8 + 8 + 8 + 8 + 1];
	int			len = 0;

	/*
	 * we normally don't want to send superfluous feedback, but if it's
	 * because of a timeout we need to, otherwise wal_sender_timeout will kill
	 * us.
	 */
	if (!force &&
		last_written_lsn == output_written_lsn &&
		last_fsync_lsn != output_fsync_lsn)
		return true;

	if (verbose)
		pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
					(uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
					(uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
					replication_slot);

	replybuf[len] = 'r';
	len += 1;
	fe_sendint64(output_written_lsn, &replybuf[len]);	/* write */
	len += 8;
	fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
	len += 8;
	fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);	/* apply */
	len += 8;
	fe_sendint64(now, &replybuf[len]);	/* sendTime */
	len += 8;
	replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
	len += 1;

	startpos = output_written_lsn;
	last_written_lsn = output_written_lsn;
	last_fsync_lsn = output_fsync_lsn;

	if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
	{
		pg_log_error("could not send feedback packet: %s",
					 PQerrorMessage(conn));
		return false;
	}

	return true;
}

static void
disconnect_atexit(void)
{
	if (conn != NULL)
		PQfinish(conn);
}

static bool
OutputFsync(TimestampTz now)
{
	output_last_fsync = now;

	output_fsync_lsn = output_written_lsn;

	if (fsync_interval <= 0)
		return true;

	if (!output_needs_fsync)
		return true;

	output_needs_fsync = false;

	/* can only fsync if it's a regular file */
	if (!output_isfile)
		return true;

	if (fsync(outfd) != 0)
	{
		pg_log_error("could not fsync file \"%s\": %m", outfile);
		return false;
	}

	return true;
}

/*
 * Start the log streaming
 */
static void
StreamLogicalLog(void)
{
	PGresult   *res;
	char	   *copybuf = NULL;
	TimestampTz last_status = -1;
	int			i;
	PQExpBuffer query;

	output_written_lsn = InvalidXLogRecPtr;
	output_fsync_lsn = InvalidXLogRecPtr;

	query = createPQExpBuffer();

	/*
	 * Connect in replication mode to the server
	 */
	if (!conn)
		conn = GetConnection();
	if (!conn)
		/* Error message already written in GetConnection() */
		return;

	/*
	 * Start the replication
	 */
	if (verbose)
		pg_log_info("starting log streaming at %X/%X (slot %s)",
					(uint32) (startpos >> 32), (uint32) startpos,
					replication_slot);

	/* Initiate the replication stream at specified location */
	appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
					  replication_slot, (uint32) (startpos >> 32), (uint32) startpos);

	/* print options if there are any */
	if (noptions)
		appendPQExpBufferStr(query, " (");

	for (i = 0; i < noptions; i++)
	{
		/* separator */
		if (i > 0)
			appendPQExpBufferStr(query, ", ");

		/* write option name */
		appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);

		/* write option value if specified */
		if (options[(i * 2) + 1] != NULL)
			appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
	}

	if (noptions)
		appendPQExpBufferChar(query, ')');

	res = PQexec(conn, query->data);
	if (PQresultStatus(res) != PGRES_COPY_BOTH)
	{
		pg_log_error("could not send replication command \"%s\": %s",
					 query->data, PQresultErrorMessage(res));
		PQclear(res);
		goto error;
	}
	PQclear(res);
	resetPQExpBuffer(query);

	if (verbose)
		pg_log_info("streaming initiated");

	while (!time_to_abort)
	{
		int			r;
		int			bytes_left;
		int			bytes_written;
		TimestampTz now;
		int			hdr_len;
		XLogRecPtr	cur_record_lsn = InvalidXLogRecPtr;

		if (copybuf != NULL)
		{
			PQfreemem(copybuf);
			copybuf = NULL;
		}

		/*
		 * Potentially send a status message to the master
		 */
		now = feGetCurrentTimestamp();

		if (outfd != -1 &&
			feTimestampDifferenceExceeds(output_last_fsync, now,
										 fsync_interval))
		{
			if (!OutputFsync(now))
				goto error;
		}

		if (standby_message_timeout > 0 &&
			feTimestampDifferenceExceeds(last_status, now,
										 standby_message_timeout))
		{
			/* Time to send feedback! */
			if (!sendFeedback(conn, now, true, false))
				goto error;

			last_status = now;
		}

		/* got SIGHUP, close output file */
		if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
		{
			now = feGetCurrentTimestamp();
			if (!OutputFsync(now))
				goto error;
			close(outfd);
			outfd = -1;
		}
		output_reopen = false;

		/* open the output file, if not open yet */
		if (outfd == -1)
		{
			struct stat statbuf;

			if (strcmp(outfile, "-") == 0)
				outfd = fileno(stdout);
			else
				outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
							 S_IRUSR | S_IWUSR);
			if (outfd == -1)
			{
				pg_log_error("could not open log file \"%s\": %m", outfile);
				goto error;
			}

			if (fstat(outfd, &statbuf) != 0)
				pg_log_error("could not stat file \"%s\": %m", outfile);

			output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
		}

		r = PQgetCopyData(conn, &copybuf, 1);
		if (r == 0)
		{
			/*
			 * In async mode, and no data available. We block on reading but
			 * not more than the specified timeout, so that we can send a
			 * response back to the client.
			 */
			fd_set		input_mask;
			TimestampTz message_target = 0;
			TimestampTz fsync_target = 0;
			struct timeval timeout;
			struct timeval *timeoutptr = NULL;

			if (PQsocket(conn) < 0)
			{
				pg_log_error("invalid socket: %s", PQerrorMessage(conn));
				goto error;
			}

			FD_ZERO(&input_mask);
			FD_SET(PQsocket(conn), &input_mask);

			/* Compute when we need to wakeup to send a keepalive message. */
			if (standby_message_timeout)
				message_target = last_status + (standby_message_timeout - 1) *
					((int64) 1000);

			/* Compute when we need to wakeup to fsync the output file. */
			if (fsync_interval > 0 && output_needs_fsync)
				fsync_target = output_last_fsync + (fsync_interval - 1) *
					((int64) 1000);

			/* Now compute when to wakeup. */
			if (message_target > 0 || fsync_target > 0)
			{
				TimestampTz targettime;
				long		secs;
				int			usecs;

				targettime = message_target;

				if (fsync_target > 0 && fsync_target < targettime)
					targettime = fsync_target;

				feTimestampDifference(now,
									  targettime,
									  &secs,
									  &usecs);
				if (secs <= 0)
					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
				else
					timeout.tv_sec = secs;
				timeout.tv_usec = usecs;
				timeoutptr = &timeout;
			}

			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
			if (r == 0 || (r < 0 && errno == EINTR))
			{
				/*
				 * Got a timeout or signal. Continue the loop and either
				 * deliver a status packet to the server or just go back into
				 * blocking.
				 */
				continue;
			}
			else if (r < 0)
			{
				pg_log_error("select() failed: %m");
				goto error;
			}

			/* Else there is actually data on the socket */
			if (PQconsumeInput(conn) == 0)
			{
				pg_log_error("could not receive data from WAL stream: %s",
							 PQerrorMessage(conn));
				goto error;
			}
			continue;
		}

		/* End of copy stream */
		if (r == -1)
			break;

		/* Failure while reading the copy stream */
		if (r == -2)
		{
			pg_log_error("could not read COPY data: %s",
						 PQerrorMessage(conn));
			goto error;
		}

		/* Check the message type. */
		if (copybuf[0] == 'k')
		{
			int			pos;
			bool		replyRequested;
			XLogRecPtr	walEnd;
			bool		endposReached = false;

			/*
			 * Parse the keepalive message, enclosed in the CopyData message.
			 * We just check if the server requested a reply, and ignore the
			 * rest.
			 */
			pos = 1;			/* skip msgtype 'k' */
			walEnd = fe_recvint64(&copybuf[pos]);
			output_written_lsn = Max(walEnd, output_written_lsn);

			pos += 8;			/* read walEnd */

			pos += 8;			/* skip sendTime */

			if (r < pos + 1)
			{
				pg_log_error("streaming header too small: %d", r);
				goto error;
			}
			replyRequested = copybuf[pos];

			if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
			{
				/*
				 * If there's nothing to read on the socket until a keepalive
				 * we know that the server has nothing to send us; and if
				 * walEnd has passed endpos, we know nothing else can have
				 * committed before endpos.  So we can bail out now.
				 */
				endposReached = true;
			}

			/* Send a reply, if necessary */
			if (replyRequested || endposReached)
			{
				if (!flushAndSendFeedback(conn, &now))
					goto error;
				last_status = now;
			}

			if (endposReached)
			{
				prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
				time_to_abort = true;
				break;
			}

			continue;
		}
		else if (copybuf[0] != 'w')
		{
			pg_log_error("unrecognized streaming header: \"%c\"",
						 copybuf[0]);
			goto error;
		}

		/*
		 * Read the header of the XLogData message, enclosed in the CopyData
		 * message. We only need the WAL location field (dataStart), the rest
		 * of the header is ignored.
		 */
		hdr_len = 1;			/* msgtype 'w' */
		hdr_len += 8;			/* dataStart */
		hdr_len += 8;			/* walEnd */
		hdr_len += 8;			/* sendTime */
		if (r < hdr_len + 1)
		{
			pg_log_error("streaming header too small: %d", r);
			goto error;
		}

		/* Extract WAL location for this block */
		cur_record_lsn = fe_recvint64(&copybuf[1]);

		if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
		{
			/*
			 * We've read past our endpoint, so prepare to go away being
			 * cautious about what happens to our output data.
			 */
			if (!flushAndSendFeedback(conn, &now))
				goto error;
			prepareToTerminate(conn, endpos, false, cur_record_lsn);
			time_to_abort = true;
			break;
		}

		output_written_lsn = Max(cur_record_lsn, output_written_lsn);

		bytes_left = r - hdr_len;
		bytes_written = 0;

		/* signal that a fsync is needed */
		output_needs_fsync = true;

		while (bytes_left)
		{
			int			ret;

			ret = write(outfd,
						copybuf + hdr_len + bytes_written,
						bytes_left);

			if (ret < 0)
			{
				pg_log_error("could not write %u bytes to log file \"%s\": %m",
							 bytes_left, outfile);
				goto error;
			}

			/* Write was successful, advance our position */
			bytes_written += ret;
			bytes_left -= ret;
		}

		if (write(outfd, "\n", 1) != 1)
		{
			pg_log_error("could not write %u bytes to log file \"%s\": %m",
						 1, outfile);
			goto error;
		}

		if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
		{
			/* endpos was exactly the record we just processed, we're done */
			if (!flushAndSendFeedback(conn, &now))
				goto error;
			prepareToTerminate(conn, endpos, false, cur_record_lsn);
			time_to_abort = true;
			break;
		}
	}

	res = PQgetResult(conn);
	if (PQresultStatus(res) == PGRES_COPY_OUT)
	{
		/*
		 * We're doing a client-initiated clean exit and have sent CopyDone to
		 * the server. We've already sent replay confirmation and fsync'd so
		 * we can just clean up the connection now.
		 */
		goto error;
	}
	else if (PQresultStatus(res) != PGRES_COMMAND_OK)
	{
		pg_log_error("unexpected termination of replication stream: %s",
					 PQresultErrorMessage(res));
		goto error;
	}
	PQclear(res);

	if (outfd != -1 && strcmp(outfile, "-") != 0)
	{
		TimestampTz t = feGetCurrentTimestamp();

		/* no need to jump to error on failure here, we're finishing anyway */
		OutputFsync(t);

		if (close(outfd) != 0)
			pg_log_error("could not close file \"%s\": %m", outfile);
	}
	outfd = -1;
error:
	if (copybuf != NULL)
	{
		PQfreemem(copybuf);
		copybuf = NULL;
	}
	destroyPQExpBuffer(query);
	PQfinish(conn);
	conn = NULL;
}

/*
 * Unfortunately we can't do sensible signal handling on windows...
 */
#ifndef WIN32

/*
 * When sigint is called, just tell the system to exit at the next possible
 * moment.
 */
static void
sigint_handler(int signum)
{
	time_to_abort = true;
}

/*
 * Trigger the output file to be reopened.
 */
static void
sighup_handler(int signum)
{
	output_reopen = true;
}
#endif


int
main(int argc, char **argv)
{
	static struct option long_options[] = {
/* general options */
		{"file", required_argument, NULL, 'f'},
		{"fsync-interval", required_argument, NULL, 'F'},
		{"no-loop", no_argument, NULL, 'n'},
		{"verbose", no_argument, NULL, 'v'},
		{"version", no_argument, NULL, 'V'},
		{"help", no_argument, NULL, '?'},
/* connection options */
		{"dbname", required_argument, NULL, 'd'},
		{"host", required_argument, NULL, 'h'},
		{"port", required_argument, NULL, 'p'},
		{"username", required_argument, NULL, 'U'},
		{"no-password", no_argument, NULL, 'w'},
		{"password", no_argument, NULL, 'W'},
/* replication options */
		{"startpos", required_argument, NULL, 'I'},
		{"endpos", required_argument, NULL, 'E'},
		{"option", required_argument, NULL, 'o'},
		{"plugin", required_argument, NULL, 'P'},
		{"status-interval", required_argument, NULL, 's'},
		{"slot", required_argument, NULL, 'S'},
/* action */
		{"create-slot", no_argument, NULL, 1},
		{"start", no_argument, NULL, 2},
		{"drop-slot", no_argument, NULL, 3},
		{"if-not-exists", no_argument, NULL, 4},
		{NULL, 0, NULL, 0}
	};
	int			c;
	int			option_index;
	uint32		hi,
				lo;
	char	   *db_name;

	pg_logging_init(argv[0]);
	progname = get_progname(argv[0]);
	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));

	if (argc > 1)
	{
		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
		{
			usage();
			exit(0);
		}
		else if (strcmp(argv[1], "-V") == 0 ||
				 strcmp(argv[1], "--version") == 0)
		{
			puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
			exit(0);
		}
	}

	while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
							long_options, &option_index)) != -1)
	{
		switch (c)
		{
/* general options */
			case 'f':
				outfile = pg_strdup(optarg);
				break;
			case 'F':
				fsync_interval = atoi(optarg) * 1000;
				if (fsync_interval < 0)
				{
					pg_log_error("invalid fsync interval \"%s\"", optarg);
					exit(1);
				}
				break;
			case 'n':
				noloop = 1;
				break;
			case 'v':
				verbose++;
				break;
/* connection options */
			case 'd':
				dbname = pg_strdup(optarg);
				break;
			case 'h':
				dbhost = pg_strdup(optarg);
				break;
			case 'p':
				if (atoi(optarg) <= 0)
				{
					pg_log_error("invalid port number \"%s\"", optarg);
					exit(1);
				}
				dbport = pg_strdup(optarg);
				break;
			case 'U':
				dbuser = pg_strdup(optarg);
				break;
			case 'w':
				dbgetpassword = -1;
				break;
			case 'W':
				dbgetpassword = 1;
				break;
/* replication options */
			case 'I':
				if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
				{
					pg_log_error("could not parse start position \"%s\"", optarg);
					exit(1);
				}
				startpos = ((uint64) hi) << 32 | lo;
				break;
			case 'E':
				if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
				{
					pg_log_error("could not parse end position \"%s\"", optarg);
					exit(1);
				}
				endpos = ((uint64) hi) << 32 | lo;
				break;
			case 'o':
				{
					char	   *data = pg_strdup(optarg);
					char	   *val = strchr(data, '=');

					if (val != NULL)
					{
						/* remove =; separate data from val */
						*val = '\0';
						val++;
					}

					noptions += 1;
					options = pg_realloc(options, sizeof(char *) * noptions * 2);

					options[(noptions - 1) * 2] = data;
					options[(noptions - 1) * 2 + 1] = val;
				}

				break;
			case 'P':
				plugin = pg_strdup(optarg);
				break;
			case 's':
				standby_message_timeout = atoi(optarg) * 1000;
				if (standby_message_timeout < 0)
				{
					pg_log_error("invalid status interval \"%s\"", optarg);
					exit(1);
				}
				break;
			case 'S':
				replication_slot = pg_strdup(optarg);
				break;
/* action */
			case 1:
				do_create_slot = true;
				break;
			case 2:
				do_start_slot = true;
				break;
			case 3:
				do_drop_slot = true;
				break;
			case 4:
				slot_exists_ok = true;
				break;

			default:

				/*
				 * getopt_long already emitted a complaint
				 */
				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
						progname);
				exit(1);
		}
	}

	/*
	 * Any non-option arguments?
	 */
	if (optind < argc)
	{
		pg_log_error("too many command-line arguments (first is \"%s\")",
					 argv[optind]);
		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
				progname);
		exit(1);
	}

	/*
	 * Required arguments
	 */
	if (replication_slot == NULL)
	{
		pg_log_error("no slot specified");
		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
				progname);
		exit(1);
	}

	if (do_start_slot && outfile == NULL)
	{
		pg_log_error("no target file specified");
		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
				progname);
		exit(1);
	}

	if (!do_drop_slot && dbname == NULL)
	{
		pg_log_error("no database specified");
		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
				progname);
		exit(1);
	}

	if (!do_drop_slot && !do_create_slot && !do_start_slot)
	{
		pg_log_error("at least one action needs to be specified");
		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
				progname);
		exit(1);
	}

	if (do_drop_slot && (do_create_slot || do_start_slot))
	{
		pg_log_error("cannot use --create-slot or --start together with --drop-slot");
		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
				progname);
		exit(1);
	}

	if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
	{
		pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
				progname);
		exit(1);
	}

	if (endpos != InvalidXLogRecPtr && !do_start_slot)
	{
		pg_log_error("--endpos may only be specified with --start");
		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
				progname);
		exit(1);
	}

#ifndef WIN32
	pqsignal(SIGINT, sigint_handler);
	pqsignal(SIGHUP, sighup_handler);
#endif

	/*
	 * Obtain a connection to server. This is not really necessary but it
	 * helps to get more precise error messages about authentication, required
	 * GUC parameters and such.
	 */
	conn = GetConnection();
	if (!conn)
		/* Error message already written in GetConnection() */
		exit(1);
	atexit(disconnect_atexit);

	/*
	 * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
	 * replication connection.
	 */
	if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
		exit(1);

	if (db_name == NULL)
	{
		pg_log_error("could not establish database-specific replication connection");
		exit(1);
	}

	/*
	 * Set umask so that directories/files are created with the same
	 * permissions as directories/files in the source data directory.
	 *
	 * pg_mode_mask is set to owner-only by default and then updated in
	 * GetConnection() where we get the mode from the server-side with
	 * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
	 */
	umask(pg_mode_mask);

	/* Drop a replication slot. */
	if (do_drop_slot)
	{
		if (verbose)
			pg_log_info("dropping replication slot \"%s\"", replication_slot);

		if (!DropReplicationSlot(conn, replication_slot))
			exit(1);
	}

	/* Create a replication slot. */
	if (do_create_slot)
	{
		if (verbose)
			pg_log_info("creating replication slot \"%s\"", replication_slot);

		if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
								   false, false, slot_exists_ok))
			exit(1);
		startpos = InvalidXLogRecPtr;
	}

	if (!do_start_slot)
		exit(0);

	/* Stream loop */
	while (true)
	{
		StreamLogicalLog();
		if (time_to_abort)
		{
			/*
			 * We've been Ctrl-C'ed or reached an exit limit condition. That's
			 * not an error, so exit without an errorcode.
			 */
			exit(0);
		}
		else if (noloop)
		{
			pg_log_error("disconnected");
			exit(1);
		}
		else
		{
			/* translator: check source for value for %d */
			pg_log_info("disconnected; waiting %d seconds to try again",
						RECONNECT_SLEEP_TIME);
			pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
		}
	}
}

/*
 * Fsync our output data, and send a feedback message to the server.  Returns
 * true if successful, false otherwise.
 *
 * If successful, *now is updated to the current timestamp just before sending
 * feedback.
 */
static bool
flushAndSendFeedback(PGconn *conn, TimestampTz *now)
{
	/* flush data to disk, so that we send a recent flush pointer */
	if (!OutputFsync(*now))
		return false;
	*now = feGetCurrentTimestamp();
	if (!sendFeedback(conn, *now, true, false))
		return false;

	return true;
}

/*
 * Try to inform the server about our upcoming demise, but don't wait around or
 * retry on failure.
 */
static void
prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
{
	(void) PQputCopyEnd(conn, NULL);
	(void) PQflush(conn);

	if (verbose)
	{
		if (keepalive)
			pg_log_info("endpos %X/%X reached by keepalive",
						(uint32) (endpos >> 32), (uint32) endpos);
		else
			pg_log_info("endpos %X/%X reached by record at %X/%X",
						(uint32) (endpos >> 32), (uint32) (endpos),
						(uint32) (lsn >> 32), (uint32) lsn);

	}
}

相关信息

greenplumn 源码目录

相关文章

greenplumn pg_basebackup 源码

greenplumn pg_receivewal 源码

greenplumn receivelog 源码

greenplumn receivelog 源码

greenplumn streamutil 源码

greenplumn streamutil 源码

greenplumn walmethods 源码

greenplumn walmethods 源码

0  赞