greenplumn test_parallel_retrieve_cursor_extended_query 源码

  • 2022-08-18
  • 浏览 (471)

greenplumn test_parallel_retrieve_cursor_extended_query 代码

文件路径:/src/test/isolation2/test_parallel_retrieve_cursor_extended_query.c

/*
 * src/test/examples/test_parallel_cursor_extended_query.c
 *
 * this program is to show extended query with the PARALLEL RETRIEVE CURSOR.
 */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include "libpq-fe.h"

#define MASTER_CONNECT_INDEX -1

static void
finish_conn_nicely(PGconn *master_conn, PGconn *endpoint_conns[], size_t endpoint_conns_num)
{
	int			i;

	if (master_conn)
		PQfinish(master_conn);

	for (i = 0; i < endpoint_conns_num; i++)
	{
		if (endpoint_conns[i])
			PQfinish(endpoint_conns[i]);
	}

	free(endpoint_conns);
	endpoint_conns_num = 0;
}

static void
check_prepare_conn(PGconn *conn, const char *dbName, int conn_idx)
{
	PGresult   *res;

	/* check to see that the backend connection was successfully made */
	if (PQstatus(conn) != CONNECTION_OK)
	{
		fprintf(stderr, "Connection to database \"%s\" failed: %s",
				dbName, PQerrorMessage(conn));
		exit(1);
	}

	if (conn_idx == MASTER_CONNECT_INDEX)
	{
		/*
		 * Set always-secure search path, so malicous users can't take
		 * control.
		 */
		res = PQexec(conn,
					 "SELECT pg_catalog.set_config('search_path', '', false)");
		if (PQresultStatus(res) != PGRES_TUPLES_OK)
		{
			fprintf(stderr, "SET failed: %s", PQerrorMessage(conn));
			PQclear(res);
			exit(1);
		}
		PQclear(res);
	}
}

/* execute sql and check it is a command without result set returned */
static int
exec_sql_without_resultset(PGconn *conn, const char *sql, int conn_idx)
{
	PGresult   *res1;

	if (conn_idx == MASTER_CONNECT_INDEX)
		printf("\nExec SQL on Master:\n\t> %s\n", sql);
	else
		printf("\nExec SQL on EndPoint[%d]:\n\t> %s\n", conn_idx, sql);

	res1 = PQexec(conn, sql);
	if (PQresultStatus(res1) != PGRES_COMMAND_OK)
	{
		fprintf(stderr, "execute sql failed: \"%s\"\nfailed %s", sql, PQerrorMessage(conn));
		PQclear(res1);
		return 1;
	}

	/*
	 * make sure to PQclear() a PGresult whenever it is no longer needed to
	 * avoid memory leaks
	 */
	PQclear(res1);
	return 0;
}

/* execute sql and print the result set */
static int
exec_sql_with_resultset_in_extended_query_protocol(PGconn *conn, const char *sql, int conn_idx)
{
	PGresult   *res1;
	int			nFields;
	int			i,
				j;
	const char *paramValues[1];

	paramValues[0] = "0";

	if (conn_idx == MASTER_CONNECT_INDEX)
		printf("\nExec SQL on Master:\n\t> %s\n", sql);
	else
		printf("\nExec SQL on EndPoint[%d]:\n\t> %s\n", conn_idx, sql);

	/*
	 * Just for simplicity, here for different connect index, call different
	 * API in the extended query protocol
	 */
	switch (conn_idx)
	{
		case 0:
			res1 = PQprepare(conn, "extend_query_cursor",
							 sql,
							 0, NULL);

			if (PQresultStatus(res1) != PGRES_TUPLES_OK && PQresultStatus(res1) != PGRES_COMMAND_OK)
			{
				fprintf(stdout, "PQprepare() didn't return properly: resultStatus: %d: \"%s\"\nfailed %s", PQresultStatus(res1), sql, PQerrorMessage(conn));
				PQclear(res1);
				return 1;
			}
			res1 = PQexecPrepared(conn, "extend_query_cursor", 0,
								  paramValues, NULL, NULL, 0);

			if (PQresultStatus(res1) != PGRES_TUPLES_OK)
			{
				fprintf(stderr, "PQexecPrepared() didn't return tuples properly: \"%s\"\nfailed %s", sql, PQerrorMessage(conn));
				PQclear(res1);
				return 1;
			}
			break;
		case 1:
			res1 = PQexecParams(conn,
								sql,
								0,	/* one param */
								NULL,	/* let the backend deduce param type */
								paramValues,
								NULL,	/* don't need param lengths since text */
								NULL,	/* default to all text params */
								0); /* ask for binary results */
			if (PQresultStatus(res1) != PGRES_TUPLES_OK)
			{
				fprintf(stderr, "PQexecParams() didn't return tuples properly: \"%s\"\nfailed %s", sql, PQerrorMessage(conn));
				PQclear(res1);
				return 1;
			}
			break;
		case 2:
		default:
			res1 = PQexec(conn, sql);
			if (PQresultStatus(res1) != PGRES_TUPLES_OK)
			{
				fprintf(stderr, "PQexec() didn't return tuples properly: \"%s\"\nfailed %s", sql, PQerrorMessage(conn));
				PQclear(res1);
				return 1;
			}
			break;
	}

	/* first, print out the attribute names */
	nFields = PQnfields(res1);
	for (i = 0; i < nFields; i++)
		printf("%-15s", PQfname(res1, i));
	printf("\n---------\n");

	/* next, print out the instances */
	for (i = 0; i < PQntuples(res1); i++)
	{
		for (j = 0; j < nFields; j++)
			printf("%-15s", PQgetvalue(res1, i, j));
		printf("\n");
	}

	PQclear(res1);
	return 0;
}

/* execute sql and print the result set */
static int
exec_sql_with_resultset(PGconn *conn, const char *sql, int conn_idx)
{
	PGresult   *res1;
	int			nFields;
	int			i,
				j;

	if (conn_idx == MASTER_CONNECT_INDEX)
		printf("\nExec SQL on Master:\n\t> %s\n", sql);
	else
		printf("\nExec SQL on EndPoint[%d]:\n\t> %s\n", conn_idx, sql);;

	res1 = PQexec(conn, sql);
	if (PQresultStatus(res1) != PGRES_TUPLES_OK)
	{
		fprintf(stderr, "Query didn't return tuples properly: \"%s\"\nfailed %s", sql, PQerrorMessage(conn));
		PQclear(res1);
		return 1;
	}

	/* first, print out the attribute names */
	nFields = PQnfields(res1);
	for (i = 0; i < nFields; i++)
		printf("%-15s", PQfname(res1, i));
	printf("\n---------\n");

	/* next, print out the instances */
	for (i = 0; i < PQntuples(res1); i++)
	{
		for (j = 0; j < nFields; j++)
			printf("%-15s", PQgetvalue(res1, i, j));
		printf("\n");
	}

	PQclear(res1);
	return 0;
}

/* this function is in the master connection */
static int
exec_check_parallel_cursor(PGconn *master_conn, int isCheckFinish)
{
	int			result = 0;
	PGresult   *res1;
	const char *check_sql = "SELECT * FROM pg_catalog.gp_wait_parallel_retrieve_cursor('myportal', 0);";


	printf("\n------ Begin checking parallel retrieve cursor status ------\n");
	/* call wait mode monitor UDF and it will wait for finish retrieving. */
	if (!isCheckFinish)
	{
		result = exec_sql_with_resultset(master_conn, check_sql, MASTER_CONNECT_INDEX);
	}
	else
	{
		printf("\nExec SQL on Master:\n\t> %s\n", check_sql);
		res1 = PQexec(master_conn, check_sql);
		if (PQresultStatus(res1) != PGRES_TUPLES_OK)
		{
			fprintf(stderr, "\"%s\" didn't return tuples properly\n", check_sql);
			PQclear(res1);
			return 1;
		}
		/* firstly check that the row == 1 */
		int			ntup = PQntuples(res1);

		if (ntup != 1)
		{
			fprintf(stderr, "\"%s\" doesn't return correct row number\n", check_sql);
			return 1;
		}

		result = strcmp(PQgetvalue(res1, 0, 0), "t");
		PQclear(res1);
		if (result)
		{
			fprintf(stderr, "\"%s\" doesn't return 'true'\n", check_sql);
			return 1;
		}
	}

	printf("\n------ End checking parallel retrieve cursor status ------\n");
	return result;
}

int
main(int argc, char **argv)
{
	char	   *pghost,
			   *pgport,
			   *pgoptions,
			   *pgoptions_retrieve_mode,
			   *pgtty;
	char	   *dbName,
			   *dbUser;
	int			i;
	int			retVal;			/* return value for this func */

	PGconn	   *master_conn,
			  **endpoint_conns = NULL;
	size_t		endpoint_conns_num = 0;
	char	  **tokens = NULL,
			  **endpoint_names = NULL;


	/*
	 * PGresult   *res1, *res2;
	 */
	PGresult   *res1;

	if (argc != 3)
	{
		fprintf(stderr, "usage: %s dbUser dbName\n", argv[0]);
		fprintf(stderr, "      show how to use PARALLEL RETRIEVE CURSOR to parallelly retrieve data from multiple endpoints.\n");
		exit(1);
	}
	dbUser = argv[1];
	dbName = argv[2];

	/*
	 * begin, by setting the parameters for a backend connection if the
	 * parameters are null, then the system will try to use reasonable
	 * defaults by looking up environment variables or, failing that, using
	 * hardwired constants
	 */
	pghost = NULL;				/* host name of the backend */
	pgport = NULL;				/* port of the backend */
	pgoptions = NULL;			/* special options to start up the backend
								 * server */
	pgoptions_retrieve_mode = "-c gp_retrieve_conn=true";	/* specify this
															 * connection is for
															 * retrieve only */
	pgtty = NULL;				/* debugging tty for the backend */

	/* make a connection to the database */
	master_conn = PQsetdb(pghost, pgport, pgoptions, pgtty, dbName);
	check_prepare_conn(master_conn, dbName, MASTER_CONNECT_INDEX);

	/* do some preparation for test */
	if (exec_sql_without_resultset(master_conn, "DROP TABLE IF EXISTS public.tab_parallel_cursor;", MASTER_CONNECT_INDEX) != 0)
		goto LABEL_ERR;
	if (exec_sql_without_resultset(master_conn, "CREATE TABLE public.tab_parallel_cursor AS SELECT id FROM pg_catalog.generate_series(1,100) id;", MASTER_CONNECT_INDEX) != 0)
		goto LABEL_ERR;

	/*
	 * start a transaction block because PARALLEL RETRIEVE CURSOR only support
	 * WITHOUT HOLD option
	 */
	if (exec_sql_without_resultset(master_conn, "BEGIN;", MASTER_CONNECT_INDEX) != 0)
		goto LABEL_ERR;

	/* declare PARALLEL RETRIEVE CURSOR for this table */
	if (exec_sql_without_resultset(master_conn, "DECLARE myportal PARALLEL RETRIEVE CURSOR FOR select * from public.tab_parallel_cursor;", MASTER_CONNECT_INDEX) != 0)
		goto LABEL_ERR;

	/*
	 * get the endpoints info of this PARALLEL RETRIEVE CURSOR
	 */
	const char *sql1 = "select hostname,port,auth_token,endpointname from pg_catalog.gp_get_endpoints() where cursorname='myportal';";

	printf("\nExec SQL on the coordinator:\n\t> %s\n", sql1);
	res1 = PQexec(master_conn, sql1);
	if (PQresultStatus(res1) != PGRES_TUPLES_OK)
	{
		fprintf(stderr, "Cannot get the endpoint information for cursor myportal\n");
		PQclear(res1);
		goto LABEL_ERR;
	}
	/* firstly check that the endpoint info rows > 0 */
	int			ntup = PQntuples(res1);

	if (ntup <= 0)
	{
		fprintf(stderr, "No rows for the endpoint (cursor myportal)\n");
		goto LABEL_ERR;
	}

	endpoint_conns = malloc(ntup * sizeof(PGconn *));
	tokens = malloc(ntup * sizeof(char **));
	endpoint_names = malloc(ntup * sizeof(char *));
	endpoint_conns_num = ntup;

	/*
	 * create retrieve mode connection to endpoints according to the endpoints
	 * info fetched
	 */
	for (i = 0; i < ntup; i++)
	{
		char	   *host = PQgetvalue(res1, i, 0);
		char	   *port = PQgetvalue(res1, i, 1);

		tokens[i] = strdup(PQgetvalue(res1, i, 2));
		endpoint_names[i] = strdup(PQgetvalue(res1, i, 3));

		endpoint_conns[i] = PQsetdbLogin(host, port, pgoptions_retrieve_mode,
										 pgtty, dbName,
										 dbUser, tokens[i]);
		check_prepare_conn(endpoint_conns[i], dbName, i);
	}
	PQclear(res1);

	/*
	 * Now the endpoint becomes 'READY' after "DECLARE ... PARALLEL RETRIEVE
	 * CURSOR" returns, then we can retrieve the result of the endpoints in
	 * parallel. This section can be executed parallely on different host or
	 * in different threads/processes on the same host. For simplicity, here
	 * just use loop in one process.
	 */
	for (i = 0; i < endpoint_conns_num; i++)
	{
		char		sql[256];

		/*
		 * Run nowait mode monitor UDF to check the parallel retrieve cursor
		 * status at specific time in master connection, so that if any error
		 * occurs, it will detect ASAP.
		 */
		if (exec_check_parallel_cursor(master_conn, 0))
		{
			fprintf(stderr, "Error during check the PARALLEL RETRIEVE CURSOR\n");
			goto LABEL_ERR;
		}

		/*
		 * the endpoint is ready to be retrieved when 'DECLARE PARALLEL
		 * RETRIEVE CURSOR returns, here begin to retrieve
		 */
		printf("\n------ Begin retrieving data from Endpoint %d# ------\n", i);
		snprintf(sql, sizeof(sql), "RETRIEVE ALL FROM ENDPOINT %s;", endpoint_names[i]);
		if (exec_sql_with_resultset_in_extended_query_protocol(endpoint_conns[i], sql, i))
		{
			fprintf(stderr, "Error during retrieving result on endpoint.\n");
			goto LABEL_ERR;
		}

		printf("\n------ End retrieving data from Endpoint %d# ------.\n", i);
	}

	/* Check the status returns finished */
	if (exec_check_parallel_cursor(master_conn, 1))
	{
		fprintf(stderr, "Error during check the PARALLEL RETRIEVE CURSOR\n");
		goto LABEL_ERR;
	}

	/* close the cursor */
	if (exec_sql_without_resultset(master_conn, "CLOSE myportal;", MASTER_CONNECT_INDEX) != 0)
		goto LABEL_ERR;

	/* end the transaction */
	if (exec_sql_without_resultset(master_conn, "END;", MASTER_CONNECT_INDEX) != 0)
		goto LABEL_ERR;

	/*
	 * Test the fast path(PQfn) is forbidden in retrieve mode.
	 */
	{
		printf("\n------ Begin to test fast path(PQfn) in retrieve mode # ------.\n");
		Oid			foid;
		PQArgBlock	argv[2];
		PGresult   *res;
		int			ret;
		int			result_len;
		const char *query;

		query = "SELECT oid FROM pg_catalog.pg_proc WHERE proname LIKE 'mod' "
			"AND prosrc LIKE 'int4mod'";
		res = PQexec(master_conn, query);
		if (res == NULL)
		{
			printf("Failed in executing SQL: %s \n", query);
			goto LABEL_ERR;
		}

		if (PQresultStatus(res) != PGRES_TUPLES_OK)
		{
			PQclear(res);
			printf("Can not find the mod() function. \n");
			goto LABEL_ERR;
		}

		foid = (Oid) atoi(PQgetvalue(res, 0, 0));
		PQclear(res);

		if (foid == InvalidOid)
		{
			goto LABEL_ERR;
		}

		/*
		 * Calculate the mod of argv[0] and argv[1], mod(10, 3) via master
		 * connection, can get the correct result.
		 */
		argv[0].isint = 1;
		argv[0].len = 4;
		argv[0].u.integer = 10;

		argv[1].isint = 1;
		argv[1].len = 4;
		argv[1].u.integer = 3;

		res = PQfn(master_conn, foid, &ret, &result_len, 1, argv, 2);
		if (PQresultStatus(res) == PGRES_COMMAND_OK)
		{
			PQclear(res);
			printf("Get the result of mod(10, 3) : %d \n", ret);
		}
		else
		{
			printf("test PQfn error: %s \n", PQresultErrorMessage(res));
			PQclear(res);
			goto LABEL_ERR;
		}

		/*
		 * Call PQfn() via retrieve mode connection should be forbidden.
		 */
		res = PQfn(endpoint_conns[0], foid, &ret, &result_len, 1, argv, 2);
		if (PQresultStatus(res) == PGRES_COMMAND_OK)
		{
			printf("Call PQfn() via retrieve mode connection should be forbidden: %d \n", PQresultStatus(res));
			PQclear(res);
			goto LABEL_ERR;
		}
		printf("Expected forbidden: Call PQfn() via retrieve mode connection: %s \n", PQresultErrorMessage(res));

		PQclear(res);
		printf("\n------ End of testing fast path(PQfn) in retrieve mode # ------.\n");
	}

	/* fclose(debug); */
	retVal = 0;
	goto LABEL_FINISH;

LABEL_ERR:
	retVal = 1;

LABEL_FINISH:
	/* close the connections to the database and cleanup */
	finish_conn_nicely(master_conn, endpoint_conns, endpoint_conns_num);

	if (tokens)
	{
		int			i;

		for (i = 0; i < endpoint_conns_num; i++)
		{
			if (tokens[i])
				free(tokens[i]);
		}
		free(tokens);
	}

	if (endpoint_names)
	{
		for (i = 0; i < endpoint_conns_num; i++)
		{
			if (endpoint_names[i])
				free(endpoint_names[i]);
		}
		free(endpoint_names);
	}

	return retVal;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn extended_protocol_test 源码

greenplumn isolation2_main 源码

greenplumn isolation2_regress 源码

greenplumn test_parallel_retrieve_cursor_extended_query_error 源码

greenplumn workfile_mgr_test 源码

0  赞