greenplumn url_curl 源码

  • 2022-08-18
  • 浏览 (302)

greenplumn url_curl 代码

文件路径:/src/backend/access/external/url_curl.c

/*-------------------------------------------------------------------------
 *
 * url_curl.c
 *	  Core support for opening external relations via a URL with curl
 *
 * Portions Copyright (c) 2007-2008, Greenplum inc
 * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
 *
 * IDENTIFICATION
 *	  src/backend/access/external/url_curl.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/url.h"

#ifdef USE_CURL

#include <arpa/inet.h>

#include <curl/curl.h>
#include <time.h>

#include "cdb/cdbsreh.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "miscadmin.h"
#include "utils/guc.h"
#include "utils/resowner.h"
#include "utils/uri.h"

/*
 * This struct encapsulates the libcurl resources that need to be explicitly
 * cleaned up on error. We use the resource owner mechanism to make sure
 * these are not leaked. When a ResourceOwner is released, our hook will
 * walk the list of open curlhandles, and releases any that were owned by
 * the released resource owner.
 */
typedef struct curlhandle_t
{
	CURL	   *handle;		/* The curl handle */
	struct curl_slist *x_httpheader;	/* list of headers */
	bool		in_multi_handle;	/* T, if the handle is in global
									 * multi_handle */

	ResourceOwner owner;	/* owner of this handle */
	struct curlhandle_t *next;
	struct curlhandle_t *prev;
} curlhandle_t;

/*
 * Private state for a web external table, implemented with libcurl.
 *
 * This struct encapsulates working state of an open curl-flavored external
 * table. This is allocated in a suitable MemoryContext, and will therefore
 * be freed automatically on abort.
 */
typedef struct
{
	URL_FILE	common;
	bool		for_write;		/* 'f' when we SELECT, 't' when we INSERT	 */

	curlhandle_t *curl;		/* resources tracked by resource owner */

	char	   *curl_url;

	int64		seq_number;

	struct
	{
		char	   *ptr;		/* palloc-ed buffer */
		int			max;
		int			bot,
					top;
	} in;

	struct
	{
		char	   *ptr;		/* palloc-ed buffer */
		int			max;
		int			bot,
					top;
	} out;

	int			still_running;	/* Is background url fetch still in progress */
	int			error,
				eof;			/* error & eof flags */
	int			gp_proto;
	char	   *http_response;

	struct
	{
		int			datalen;	/* remaining datablock length */
	} block;

} URL_CURL_FILE;

#if BYTE_ORDER == BIG_ENDIAN
#define local_htonll(n)  (n)
#define local_ntohll(n)  (n)
#else
#define local_htonll(n)  ((((uint64) htonl(n)) << 32LL) | htonl((n) >> 32LL))
#define local_ntohll(n)  ((((uint64) ntohl(n)) << 32LL) | (uint32) ntohl(((uint64)n) >> 32LL))
#endif

#define HOST_NAME_SIZE 100
#define FDIST_TIMEOUT  408
#define MAX_TRY_WAIT_TIME 64

/*
 * SSL support GUCs - should be added soon. Until then we will use stubs
 *
 *  SSL Params
 *	extssl_protocol  CURL_SSLVERSION_TLSv1 				
 *  extssl_verifycert 	1
 *  extssl_verifyhost 	2
 *  extssl_no_verifycert 	0
 *  extssl_no_verifyhost 	0
 *  extssl_cert 		"gpfdists/client.crt"
 *  extssl_key 			"gpfdists/client.key"
 *  extssl_pass 		"?" 										
 *  extssl_crl 			NULL 									
 *  Misc Params
 *  extssl_libcurldebug 1 	
 */

const static int extssl_protocol  = CURL_SSLVERSION_TLSv1;
const static int extssl_verifycert = 1;
const static int extssl_verifyhost = 2;
const static int extssl_no_verifycert = 0;
const static int extssl_no_verifyhost = 0;
const char* extssl_cert = "gpfdists/client.crt";
const char* extssl_key = "gpfdists/client.key";
const char* extssl_ca = "gpfdists/root.crt";
const char* extssl_pass = NULL;
const char* extssl_crl = NULL;
static int extssl_libcurldebug = 1;
char extssl_key_full[MAXPGPATH] = {0};
char extssl_cer_full[MAXPGPATH] = {0};
char extssl_cas_full[MAXPGPATH] = {0};

/* Will hold the last curl error					*/
/* Currently it is in use only for SSL connection,	*/
/* but we should consider using it always			*/
static char curl_Error_Buffer[CURL_ERROR_SIZE];

static void gp_proto0_write_done(URL_CURL_FILE *file);
static void extract_http_domain(char* i_path, char* o_domain, int dlen);
static char * make_url(const char *url, bool is_ipv6);

/* we use a global one for convenience */
static CURLM *multi_handle = 0;

static int
fill_buffer(URL_CURL_FILE *curl, int want);

/*
 * A helper macro, to call curl_easy_setopt(), and ereport() if it fails.
 */
#define CURL_EASY_SETOPT(h, opt, val) \
	do { \
		int			e; \
\
		if ((e = curl_easy_setopt(h, opt, val)) != CURLE_OK) \
			elog(ERROR, "internal error: curl_easy_setopt \"%s\" error (%d - %s)", \
				 CppAsString(opt), e, curl_easy_strerror(e)); \
	} while(0)

/*
 * Linked list of open curl handles. These are allocated in TopMemoryContext,
 * and tracked by resource owners.
 */
static curlhandle_t *open_curl_handles;

static bool url_curl_resowner_callback_registered;

static curlhandle_t *
create_curlhandle(void)
{
	curlhandle_t *h;

	h = MemoryContextAlloc(TopMemoryContext, sizeof(curlhandle_t));
	h->handle = NULL;
	h->x_httpheader = NULL;
	h->in_multi_handle = false;

	h->owner = CurrentResourceOwner;
	h->prev = NULL;
	h->next = open_curl_handles;
	if (open_curl_handles)
		open_curl_handles->prev = h;
	open_curl_handles = h;

	return h;
}

static void
destroy_curlhandle(curlhandle_t *h)
{
	/* unlink from linked list first */
	if (h->prev)
		h->prev->next = h->next;
	else
		open_curl_handles = open_curl_handles->next;
	if (h->next)
		h->next->prev = h->prev;

	if (h->x_httpheader)
	{
		curl_slist_free_all(h->x_httpheader);
		h->x_httpheader = NULL;
	}

	if (h->handle)
	{
		/* If this handle was registered in the multi-handle, remove it */
		if (h->in_multi_handle)
		{
			CURLMcode e = curl_multi_remove_handle(multi_handle, h->handle);

			if (CURLM_OK != e)
				elog(LOG, "internal error curl_multi_remove_handle (%d - %s)", e, curl_easy_strerror(e));
			h->in_multi_handle = false;
		}

		/* cleanup */
		curl_easy_cleanup(h->handle);
		h->handle = NULL;
	}

	pfree(h);
}

/*
 * Close any open curl handles on abort.
 *
 * Note that this only releases the low-level curl objects, in the
 * curlhandle_t struct. The UTL_CURL_FILE struct itself is allocated
 * in a memory context, and will go away with the context.
 */
static void
url_curl_abort_callback(ResourceReleasePhase phase,
						bool isCommit,
						bool isTopLevel,
						void *arg)
{
	curlhandle_t *curr;
	curlhandle_t *next;

	if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
		return;

	next = open_curl_handles;
	while (next)
	{
		curr = next;
		next = curr->next;

		if (curr->owner == CurrentResourceOwner)
		{
			if (isCommit)
				elog(LOG, "url_curl reference leak: %p still referenced", curr);

			destroy_curlhandle(curr);
		}
	}
}

/*
 * header_callback
 *
 * when a header arrives from the server curl calls this routine. In here we
 * extract the information we are interested in from the header, and store it
 * in the passed in callback argument (URL_FILE *) which lives in our
 * application.
 */
static size_t
header_callback(void *ptr_, size_t size, size_t nmemb, void *userp)
{
    URL_CURL_FILE *url = (URL_CURL_FILE *) userp;
	char*		ptr = ptr_;
	int 		len = size * nmemb;
	int 		i;
	char 		buf[20];

	Assert(size == 1);

	/*
	 * parse the http response line (code and message) from
	 * the http header that we get. Basically it's the whole
	 * first line (e.g: "HTTP/1.0 400 time out"). We do this
	 * in order to capture any error message that comes from
	 * gpfdist, and later use it to report the error string in
	 * check_response() to the database user.
	 */
	if (url->http_response == 0)
	{
		int 	n = nmemb;
		char* 	p;

		if (n > 0 && 0 != (p = palloc(n+1)))
		{
			memcpy(p, ptr, n);
			p[n] = 0;

			if (n > 0 && (p[n-1] == '\r' || p[n-1] == '\n'))
				p[--n] = 0;

			if (n > 0 && (p[n-1] == '\r' || p[n-1] == '\n'))
				p[--n] = 0;

			url->http_response = p;
		}
	}

	/*
	 * extract the GP-PROTO value from the HTTP header.
	 */
	if (len > 10 && *ptr == 'X' && 0 == strncmp("X-GP-PROTO", ptr, 10))
	{
		ptr += 10;
		len -= 10;

		while (len > 0 && (*ptr == ' ' || *ptr == '\t'))
		{
			ptr++;
			len--;
		}

		if (len > 0 && *ptr == ':')
		{
			ptr++;
			len--;

			while (len > 0 && (*ptr == ' ' || *ptr == '\t'))
			{
				ptr++;
				len--;
			}

			for (i = 0; i < sizeof(buf) - 1 && i < len; i++)
				buf[i] = ptr[i];

			buf[i] = 0;
			url->gp_proto = strtol(buf, 0, 0);
		}
	}

	return size * nmemb;
}


/*
 * write_callback
 *
 * when data arrives from gpfdist server and curl is ready to write it
 * to our application, it calls this routine. In here we will store the
 * data in the application variable (URL_FILE *)file which is the passed
 * in the forth argument as a part of the callback settings.
 *
 * we return the number of bytes written to the application buffer
 */
static size_t
write_callback(char *buffer, size_t size, size_t nitems, void *userp)
{
    URL_CURL_FILE *curl = (URL_CURL_FILE *) userp;
	const int 	nbytes = size * nitems;
	int 		n;

	/*
	 * if insufficient space in buffer make more space
	 */
	if (curl->in.top + nbytes >= curl->in.max)
	{
		/* compact ? */
		if (curl->in.bot)
		{
			n = curl->in.top - curl->in.bot;
			memmove(curl->in.ptr, curl->in.ptr + curl->in.bot, n);
			curl->in.bot = 0;
			curl->in.top = n;
		}

		/* if still insufficient space in buffer, enlarge it */
		if (curl->in.top + nbytes >= curl->in.max)
		{
			char *newbuf;

			n = curl->in.top - curl->in.bot + nbytes + 1024;
			newbuf = repalloc(curl->in.ptr, n);

			curl->in.ptr = newbuf;
			curl->in.max = n;

			Assert(curl->in.top + nbytes < curl->in.max);
		}
	}

	/* enough space. copy buffer into curl->buf */
	memcpy(curl->in.ptr + curl->in.top, buffer, nbytes);
	curl->in.top += nbytes;

	return nbytes;
}

/*
 * check_response
 *
 * If got an HTTP response with an error code from the server (gpfdist), report
 * the error code and message it to the database user and abort operation.
 */
static int
check_response(URL_CURL_FILE *file, int *rc, char **response_string)
{
	long 		response_code;
	char*		effective_url = NULL;
	CURL* 		curl = file->curl->handle;
	char		buffer[30];

	/* get the response code from curl */
	if (curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_OK)
	{
		*rc = 500;
		*response_string = pstrdup("curl_easy_getinfo failed");
		return -1;
	}
	*rc = response_code;
	snprintf(buffer, sizeof buffer, "Response Code=%d", (int)response_code);
	*response_string = pstrdup(buffer);

	if (curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &effective_url) != CURLE_OK)
		return -1;
	if (effective_url == NULL)
		effective_url = "";

	if (! (200 <= response_code && response_code < 300))
	{
		if (response_code == 0)
		{
			long 		oserrno = 0;
			static char	connmsg[64];

			/* get the os level errno, and string representation of it */
			if (curl_easy_getinfo(curl, CURLINFO_OS_ERRNO, &oserrno) == CURLE_OK)
			{
				if (oserrno == EHOSTUNREACH)
				{
					return oserrno;
				}
				if (oserrno != 0)
					snprintf(connmsg, sizeof connmsg, "error code = %d (%s)",
							 (int) oserrno, strerror((int)oserrno));
			}

			ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("connection with gpfdist failed for \"%s\", effective url: \"%s\": %s; %s",
							file->common.url, effective_url,
							(oserrno != 0 ? connmsg : ""),
							(curl_Error_Buffer[0] != '\0' ? curl_Error_Buffer : ""))));
		}
		else if (response_code == FDIST_TIMEOUT)	// gpfdist server return timeout code
		{
			return FDIST_TIMEOUT;
		}
		else
		{
			/* we need to sleep 1 sec to avoid this condition:
			   1- seg X gets an error message from gpfdist
			   2- seg Y gets a 500 error
			   3- seg Y report error before seg X, and error message
			   in seg X is thrown away.
			*/
			pg_usleep(1000000);

			ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("http response code %ld from gpfdist (%s): %s",
							response_code, file->common.url,
							file->http_response ? file->http_response : "?")));
		}
	}

	return 0;
}

// callback for request /gpfdist/status for debugging purpose.
static size_t
log_http_body(char *buffer, size_t size, size_t nitems, void *userp)
{
	char body[256] = {0};
	int  nbytes = size * nitems;
	int  len = sizeof(body) - 1 > nbytes ? nbytes : sizeof(body) - 1;

	memcpy(body, buffer, len);

	elog(LOG, "gpfdist/status: %s", body);

	return nbytes;
}

// GET /gpfdist/status to get gpfdist status.
static void
get_gpfdist_status(URL_CURL_FILE *file)
{
	CURL * status_handle = NULL;
	char status_url[256];
	char domain[HOST_NAME_SIZE] = {0};
	CURLcode e;

	extract_http_domain(file->common.url, domain, HOST_NAME_SIZE);
	snprintf(status_url, sizeof(status_url), "http://%s/gpfdist/status", domain);

	do
	{
		if (! (status_handle = curl_easy_init()))
		{
		    elog(LOG, "internal error: get_gpfdist_status.curl_easy_init failed");
			break;
		}
		if (CURLE_OK != (e = curl_easy_setopt(status_handle, CURLOPT_TIMEOUT, 60L)))
		{
		    elog(LOG, "internal error: get_gpfdist_status.curl_easy_setopt CURLOPT_TIMEOUT error (%d - %s)",
		         e, curl_easy_strerror(e));
			break;
		}
		if (CURLE_OK != (e = curl_easy_setopt(status_handle, CURLOPT_URL, status_url)))
		{
		    elog(LOG, "internal error: get_gpfdist_status.curl_easy_setopt CURLOPT_URL error (%d - %s)",
		         e, curl_easy_strerror(e));
			break;
		}
		if (CURLE_OK != (e = curl_easy_setopt(status_handle, CURLOPT_WRITEFUNCTION, log_http_body)))
		{
			elog(LOG, "internal error: get_gpfdist_status.curl_easy_setopt CURLOPT_WRITEFUNCTION error (%d - %s)",
				 e, curl_easy_strerror(e));
			break;
		}
		if (CURLE_OK != (e = curl_easy_perform(status_handle)))
		{
			elog(LOG, "send status request failed: %s", curl_easy_strerror(e));
		}
	} while (0);

	curl_easy_cleanup(status_handle);
}

/* Return true to retry */
typedef bool (*perform_func)(URL_CURL_FILE *file);

static void
gp_perform_backoff_and_check_response(URL_CURL_FILE *file, perform_func perform)
{
	/* retry in case server return timeout error */
	unsigned int wait_time = 1;
	unsigned int retry_count = 0;
	/* retry at most 300s by default when any error happens */
	time_t start_time = time(NULL);
	time_t now;
	time_t end_time = start_time + gpfdist_retry_timeout;

	while (true)
	{
		if (!perform(file))
		{
			return;
		}
		/*
		 * Retry until end_time is reached
		 */
		now = time(NULL);
		if (now >= end_time)
		{
			elog(LOG, "abort writing data to gpfdist, wait_time = %d, duration = %ld, gpfdist_retry_timeout = %d",
				wait_time, now - start_time, gpfdist_retry_timeout);
			ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("error when connecting to gpfdist %s, quit after %d tries",
							file->curl_url, retry_count+1)));
		}
		else
		{
			unsigned int for_wait = 0;
			wait_time = wait_time > MAX_TRY_WAIT_TIME ? MAX_TRY_WAIT_TIME : wait_time;
			/* For last retry before end_time */
			wait_time = wait_time > end_time - now ? end_time - now : wait_time;
			elog(LOG, "failed to send request to gpfdist (%s), will retry after %d seconds", file->curl_url, wait_time);
			while (for_wait++ < wait_time)
			{
				pg_usleep(1000000);
				CHECK_FOR_INTERRUPTS();
			}
			wait_time = wait_time + wait_time;
			retry_count++;
		}
	}
}

static bool multi_perform_work(URL_CURL_FILE *file)
{
	int 		response_code;
	char	   *response_string = NULL;
	int e;
	char *effective_url;

	if (CURLE_OK != (e = curl_multi_add_handle(multi_handle, file->curl->handle)))
	{
		if (CURLM_CALL_MULTI_PERFORM != e)
			elog(ERROR, "internal error: curl_multi_add_handle failed (%d - %s)",
				 e, curl_easy_strerror(e));
	}
	file->curl->in_multi_handle = true;

	while (CURLM_CALL_MULTI_PERFORM ==
		   (e = curl_multi_perform(multi_handle, &file->still_running)));
	if (e != CURLE_OK)
		elog(ERROR, "internal error: curl_multi_perform failed (%d - %s)",
			 e, curl_easy_strerror(e));
	/* read some bytes to make sure the connection is established */
	fill_buffer(file, 1);

	/* check the connection for GET request */
	int code = check_response(file, &response_code, &response_string);
	switch (code)
	{
		case 0:
			return false;
		case EHOSTUNREACH:
			curl_easy_getinfo(file->curl->handle, CURLINFO_EFFECTIVE_URL, &effective_url);
			elog(LOG, "gpfdist request failed on seg%d, error: %s, effective url %s",
				GpIdentity.segindex, strerror(EHOSTUNREACH), effective_url);
			curl_multi_remove_handle(multi_handle, file->curl->handle);
			curl_multi_cleanup(multi_handle);
			multi_handle = curl_multi_init();
			return true;
		default:
			ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("could not open \"%s\" for reading", file->common.url),
					 errdetail("Unexpected response from gpfdist server: %d - %s",
							   response_code, response_string)));
	}
}

static bool easy_perform_work(URL_CURL_FILE *file)
{
	int 		response_code;
	char	   *response_string = NULL;
	/*
	 * Use backoff policy to call curl_easy_perform to fix following error
	 * when work load is high:
	 *	- 'could not connect to server'
	 *	- gpfdist return timeout (HTTP 408)
	 * By default it will wait at least gpfdist_retry_timeout seconds before abort.
	 */
	CURLcode e = curl_easy_perform(file->curl->handle);
	if (CURLE_OK != e)
	{
		elog(LOG, "%s response (%d - %s)", file->curl_url, e, curl_easy_strerror(e));
	}
	else
	{
		/* check the response from server */
		response_code = check_response(file, &response_code, &response_string);
		switch (response_code)
		{
			case 0:
				/* Success! */
				return false;
			case FDIST_TIMEOUT:
				elog(LOG, "%s timeout from gpfdist", file->curl_url);
				break;
			default:
				ereport(ERROR,
						(errcode(ERRCODE_CONNECTION_FAILURE),
						 errmsg("error while getting response from gpfdist on %s (code %d, msg %s)",
								file->curl_url, response_code, response_string)));
		}
		if (response_string)
			pfree(response_string);
		response_string = NULL;
	}
	return true;
}

/*
 * fill_buffer
 *
 * Attempt to fill the read buffer up to requested number of bytes.
 * We first check if we already have the number of bytes that we
 * want already in the buffer (from write_callback), and we do
 * a select on the socket only if we don't have enough.
 *
 * return 0 if successful; raises ERROR otherwise.
 */
static int
fill_buffer(URL_CURL_FILE *curl, int want)
{
	fd_set 	fdread;
	fd_set 	fdwrite;
	fd_set 	fdexcep;
	int 	maxfd = 0;
	struct 	timeval timeout;
	int 	nfds = 0, e = 0;
	int     timeout_count = 0;

	/* elog(NOTICE, "= still_running %d, bot %d, top %d, want %d",
	   file->u.curl.still_running, curl->in.bot, curl->in.top, want);
	*/

	/* attempt to fill buffer */
	while (curl->still_running && curl->in.top - curl->in.bot < want)
	{
		FD_ZERO(&fdread);
		FD_ZERO(&fdwrite);
		FD_ZERO(&fdexcep);

		CHECK_FOR_INTERRUPTS();

		/* set a suitable timeout to fail on */
		timeout.tv_sec = 5;
		timeout.tv_usec = 0;

		/* get file descriptors from the transfers */
		if (0 != (e = curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd)))
		{
			elog(ERROR, "internal error: curl_multi_fdset failed (%d - %s)",
						e, curl_easy_strerror(e));
		}

		if (maxfd == 0)
		{
			elog(LOG, "curl_multi_fdset set maxfd = %d", maxfd);
			curl->still_running = 0;
			break;
		}
		/* When libcurl returns -1 in max_fd, it is because libcurl currently does something
		 * that isn't possible for your application to monitor with a socket and unfortunately
		 * you can then not know exactly when the current action is completed using select().
		 * You then need to wait a while before you proceed and call curl_multi_perform anyway
		 */
		if (maxfd == -1)
		{
			elog(DEBUG2, "curl_multi_fdset set maxfd = %d", maxfd);
			pg_usleep(100000);
			// to call curl_multi_perform
			nfds = 1;
		}
		else
		{
			nfds = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
		}
		if (nfds == -1)
		{
			int save_errno = errno;
			if (errno == EINTR || errno == EAGAIN)
			{
				elog(DEBUG2, "select failed on curl_multi_fdset (maxfd %d) (%d - %s)", maxfd,
					 save_errno, strerror(save_errno));
				continue;
			}
			elog(ERROR, "internal error: select failed on curl_multi_fdset (maxfd %d) (%d - %s)",
				 maxfd, save_errno, strerror(save_errno));
		}
		else if (nfds == 0)
		{
			// timeout
			timeout_count++;

			if (timeout_count % 12 == 0)
			{
				elog(LOG, "segment has not received data from gpfdist for about 1 minute, waiting for %d bytes.",
					 (want - (curl->in.top - curl->in.bot)));
			}

			if (readable_external_table_timeout != 0 && timeout_count * 5 > readable_external_table_timeout)
			{
				elog(LOG, "bot = %d, top = %d, want = %d, maxfd = %d, nfds = %d, e = %d, "
						  "still_running = %d, for_write = %d, error = %d, eof = %d, datalen = %d",
						  curl->in.bot, curl->in.top, want, maxfd, nfds, e, curl->still_running,
						  curl->for_write, curl->error, curl->eof, curl->block.datalen);
				get_gpfdist_status(curl);
				ereport(ERROR,
						(errcode(ERRCODE_CONNECTION_FAILURE),
						 errmsg("segment has not received data from gpfdist for long time, cancelling the query.")));
				break;
			}
		}
		else if (nfds > 0)
		{
			/* timeout or readable/writable sockets */
			/* note we *could* be more efficient and not wait for
			 * CURLM_CALL_MULTI_PERFORM to clear here and check it on re-entry
			 * but that gets messy */
			while (CURLM_CALL_MULTI_PERFORM ==
				   (e = curl_multi_perform(multi_handle, &curl->still_running)));

			if (e != 0)
			{
				elog(ERROR, "internal error: curl_multi_perform failed (%d - %s)",
					 e, curl_easy_strerror(e));
			}
		}
		else
		{
			elog(ERROR, "select return unexpected result");
		}

		/* elog(NOTICE, "- still_running %d, bot %d, top %d, want %d",
		   file->u.curl.still_running, curl->in.bot, curl->in.top, want);
		*/
	}

	if (curl->still_running == 0)
	{
		elog(LOG, "quit fill_buffer due to still_running = 0, bot = %d, top = %d, want = %d, "
				"for_write = %d, error = %d, eof = %d, datalen = %d, maxfd = %d, nfds = %d, e = %d",
				curl->in.bot, curl->in.top, want, curl->for_write, curl->error,
				curl->eof, curl->block.datalen, maxfd, nfds, e);
	}


	return 0;
}


static void
set_httpheader(URL_CURL_FILE *fcurl, const char *name, const char *value)
{
	struct curl_slist *new_httpheader;
	char		tmp[1024];

	if (strlen(name) + strlen(value) + 5 > sizeof(tmp))
		elog(ERROR, "set_httpheader name/value is too long. name = %s, value=%s",
			 name, value);

	snprintf(tmp, sizeof(tmp), "%s: %s", name, value);

	new_httpheader = curl_slist_append(fcurl->curl->x_httpheader, tmp);
	if (new_httpheader == NULL)
		elog(ERROR, "could not set curl HTTP header \"%s\" to \"%s\"", name, value);

	fcurl->curl->x_httpheader = new_httpheader;
}

static void
replace_httpheader(URL_CURL_FILE *fcurl, const char *name, const char *value)
{
	struct curl_slist *new_httpheader;
	char		tmp[1024];

	if (strlen(name) + strlen(value) + 5 > sizeof(tmp))
		elog(ERROR, "replace_httpheader name/value is too long. name = %s, value=%s", name, value);

	sprintf(tmp, "%s: %s", name, value);

	/* Find existing header, if any */
	struct curl_slist *p = fcurl->curl->x_httpheader;
	while (p != NULL)
	{
		if (!strncmp(name, p->data, strlen(name)))
		{
			/*
			 * NOTE: p->data is not palloc'd! It is originally allocated
			 * by curl_slist_append, so use plain malloc/free here as well.
			 */
			char	   *dupdata = strdup(tmp);

			if (dupdata == NULL)
				elog(ERROR, "out of memory");

			free(p->data);
			p->data = dupdata;
			return;
		}
		p = p->next;
	}

	/* No existing header, add a new one */

	new_httpheader = curl_slist_append(fcurl->curl->x_httpheader, tmp);
	if (new_httpheader == NULL)
		elog(ERROR, "could not append HTTP header \"%s\"", name);
	fcurl->curl->x_httpheader = new_httpheader;
}

static char *
local_strstr(const char *str1, const char *str2)
{	
	char *cp = (char *) str1;
	char *s1, *s2;

	if ( !*str2 )
		return((char *)str1);

	while (*cp)
    {
		s1 = cp;
		s2 = (char *) str2;

		while (*s1 && (*s1==*s2))
			s1++, s2++;

		if (!*s2)
			return(cp);

		cp++;
	}

	return(NULL);
}

/*
 * make_url
 *				Address resolve a URL to contain only IP number
 *
 * Resolve the hostname in the URL to an IP number, and return a new URL with
 * the same scheme and parameters using the resolved IP address. If the passed
 * url is using an IP number, the return value will be a copy of the input.
 * The output is a palloced string, it's the callers responsibility to free it
 * when no longer needed. This function will error out in case a URL cannot be
 * formed, NULL or an empty string are never returned.
 */
static char *
make_url(const char *url, bool is_ipv6)
{
	char *authority_start = local_strstr(url, "//");
	char *authority_end;
	char *hostname_start;
	char *hostname_end;
	char hostname[HOST_NAME_SIZE];
	char *hostip = NULL;
	char portstr[9];
	int port = 80; /* default for http */
	bool  domain_resolved_to_ipv6 = false;
	StringInfoData buf;

	if (!authority_start)
		elog(ERROR, "illegal url '%s'", url);

	authority_start += 2;
	authority_end = strchr(authority_start, '/');
	if (!authority_end)
		authority_end = authority_start + strlen(authority_start);

	hostname_start = strchr(authority_start, '@');
	if (!(hostname_start && hostname_start < authority_end))
		hostname_start = authority_start;

	if (is_ipv6) /* IPV6 */
	{
		int len;

		hostname_end = strchr(hostname_start, ']');
		if (hostname_end == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_NAME),
					 errmsg("unexpected IPv6 format %s", url)));
		hostname_end += 1;

		if (hostname_end[0] == ':')
		{
			/* port number exists in this url. get it */
			len = authority_end - hostname_end;
			if (len > 8)
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("<port> substring size must not exceed 8 characters")));

			memcpy(portstr, hostname_end + 1, len);
			portstr[len] = '\0';
			port = atoi(portstr);
		}

		/* skippping the brackets */
		hostname_end -= 1;
		hostname_start += 1;
	}
	else
	{
		hostname_end = strchr(hostname_start, ':');
		if (!(hostname_end && hostname_end < authority_end))
		{
			hostname_end = authority_end;
		}
		else
		{
			/* port number exists in this url. get it */
			int len = authority_end - hostname_end;
			if (len > 8)
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("<port> substring size must not exceed 8 characters")));

			memcpy(portstr, hostname_end + 1, len);
			portstr[len] = '\0';
			port = atoi(portstr);
		}
	}

	if (!port)
		ereport(ERROR,
				(errcode(ERRCODE_SYNTAX_ERROR),
				 errmsg("<port> substring must contain only digits")));	

	if (hostname_end - hostname_start >= sizeof(hostname))
		elog(ERROR, "hostname too long for url '%s'", url);

	memcpy(hostname, hostname_start, hostname_end - hostname_start);
	hostname[hostname_end - hostname_start] = 0;

	hostip = getDnsAddress(hostname, port, ERROR);

	if (hostip == NULL)
		ereport(ERROR,
				(errcode(ERRCODE_CONNECTION_FAILURE),
				 errmsg("hostname cannot be resolved '%s'", url)));

	/*
	 * test for the case where the URL originally contained a domain name
	 * (so is_ipv6 was set to false) but the DNS resolution in getDnsAddress
	 * returned an IPv6 address so know we also have to put the square
	 * brackets [..] in the URL string.
	 */
	if (strchr(hostip, ':') != NULL && !is_ipv6)
		domain_resolved_to_ipv6 = true;

	initStringInfo(&buf);

	for (int i = 0; i < (hostname_start - url); i++)
		appendStringInfoChar(&buf, *(url + i));
	if (domain_resolved_to_ipv6)
		appendStringInfoChar(&buf, '[');
	appendStringInfoString(&buf, hostip);
	if (domain_resolved_to_ipv6)
		appendStringInfoChar(&buf, ']');
	appendStringInfoString(&buf, url + (strlen(hostname) + (hostname_start - url)));

	return buf.data;
}

/*
 * extract_http_domain
 *
 * extracts the domain string from a http url
 */
static void
extract_http_domain(char *i_path, char *o_domain, int dlen)
{
	int domsz, cpsz;
	char* p_st = (char*)local_strstr(i_path, "//");
	p_st = p_st + 2;
	char* p_en = strchr(p_st, '/');

	domsz = p_en - p_st;
	cpsz = ( domsz < dlen ) ? domsz : dlen;
	memcpy(o_domain, p_st, cpsz);
}

static bool
url_has_ipv6_format (char *url)
{
	bool is6 = false;
	char *ipv6 = local_strstr(url, "://[");

	if ( ipv6 )
		ipv6 = strchr(ipv6, ']');
	if ( ipv6 )
		is6 = true;
		
	return is6;
}

static int
is_file_exists(const char* filename)
{
	FILE* file;
	file = fopen(filename, "r");
	if (file)
	{
		fclose(file);
		return 1;
	}
	return 0;
}

URL_FILE *
url_curl_fopen(char *url, bool forwrite, extvar_t *ev, CopyState pstate)
{
	URL_CURL_FILE *file;
	int         ip_mode;
	int 		e;
	bool		is_ipv6 = url_has_ipv6_format(url);
	char	   *tmp;

	/* Reset curl_Error_Buffer */
	curl_Error_Buffer[0] = '\0';

	Assert(IS_HTTP_URI(url) || IS_GPFDIST_URI(url) || IS_GPFDISTS_URI(url));

	if (!url_curl_resowner_callback_registered)
	{
		RegisterResourceReleaseCallback(url_curl_abort_callback, NULL);
		url_curl_resowner_callback_registered = true;
	}

	tmp = make_url(url, is_ipv6);

	file = (URL_CURL_FILE *) palloc0(sizeof(URL_CURL_FILE));
	file->common.type = CFTYPE_CURL;
	file->common.url = pstrdup(url);
	file->for_write = forwrite;
	file->curl = create_curlhandle();

	/*
	 * We need to call is_url_ipv6 for the case where inside make_url
	 * function a domain name was transformed to an IPv6 address.
	 */
	if (!is_ipv6)
		is_ipv6 = url_has_ipv6_format(tmp);

	if (!IS_GPFDISTS_URI(url))
		file->curl_url = tmp;
	else
	{
		/*
		 * SSL support addition
		 *
		 * negotiation will fail if verifyhost is on, so we *must*
		 * not resolve the hostname in this case. I have decided
		 * to not resolve it anyway and let libcurl do the work.
		 */
		file->curl_url = pstrdup(file->common.url);
		pfree(tmp);
	}

	if (IS_GPFDIST_URI(file->curl_url) || IS_GPFDISTS_URI(file->curl_url))
	{
		/* replace gpfdist:// with http:// or gpfdists:// with https://
		 * by overriding 'dist' with 'http' */
		unsigned int tmp_len = strlen(file->curl_url) + 1;
		memmove(file->curl_url, file->curl_url + 3, tmp_len - 3);
		memcpy(file->curl_url, "http", 4);
		pstate->header_line = 0;
	}

	/* initialize a curl session and get a libcurl handle for it */
	if (! (file->curl->handle = curl_easy_init()))
		elog(ERROR, "internal error: curl_easy_init failed");

	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_URL, file->curl_url);

	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_VERBOSE, 0L /* FALSE */);

	/* set callback for each header received from server */
	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_HEADERFUNCTION, header_callback);

	/* 'file' is the application variable that gets passed to header_callback */
	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_WRITEHEADER, file);

	/* set callback for each data block arriving from server to be written to application */
	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_WRITEFUNCTION, write_callback);

	/* 'file' is the application variable that gets passed to write_callback */
	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_WRITEDATA, file);

	if (!is_ipv6)
		ip_mode = CURL_IPRESOLVE_V4;
	else
		ip_mode = CURL_IPRESOLVE_V6;
	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_IPRESOLVE, ip_mode);

	/*
	 * set up a linked list of http headers. start with common headers
	 * needed for read and write operations, and continue below with
	 * more specifics
	 */			
	Assert(file->curl->x_httpheader == NULL);

	/*
	 * support multihomed http use cases. see MPP-11874
	 */
	if (IS_HTTP_URI(url))
	{
		char domain[HOST_NAME_SIZE] = {0};

		extract_http_domain(file->common.url, domain, HOST_NAME_SIZE);
		set_httpheader(file, "Host", domain);
	}

	set_httpheader(file, "X-GP-XID", ev->GP_XID);
	set_httpheader(file, "X-GP-CID", ev->GP_CID);
	set_httpheader(file, "X-GP-SN", ev->GP_SN);
	set_httpheader(file, "X-GP-SEGMENT-ID", ev->GP_SEGMENT_ID);
	set_httpheader(file, "X-GP-SEGMENT-COUNT", ev->GP_SEGMENT_COUNT);
	set_httpheader(file, "X-GP-LINE-DELIM-STR", ev->GP_LINE_DELIM_STR);
	set_httpheader(file, "X-GP-LINE-DELIM-LENGTH", ev->GP_LINE_DELIM_LENGTH);

	if (forwrite)
	{
		// TIMEOUT for POST only, GET is single HTTP request,
		// probablity take long time.
		elog(LOG, "gpfdist_retry_timeout = %d", gpfdist_retry_timeout);
		CURL_EASY_SETOPT(file->curl->handle, CURLOPT_TIMEOUT, (long)gpfdist_retry_timeout);

		/*init sequence number*/
		file->seq_number = 1;

		/* write specific headers */
		set_httpheader(file, "X-GP-PROTO", "0");
		set_httpheader(file, "X-GP-SEQ", "1");
		set_httpheader(file, "Content-Type", "text/xml");
	}
	else
	{
		/* read specific - (TODO: unclear why some of these are needed) */
		set_httpheader(file, "X-GP-PROTO", "1");
		set_httpheader(file, "X-GP-MASTER_HOST", ev->GP_MASTER_HOST);
		set_httpheader(file, "X-GP-MASTER_PORT", ev->GP_MASTER_PORT);
		set_httpheader(file, "X-GP-CSVOPT", ev->GP_CSVOPT);
		set_httpheader(file, "X-GP_SEG_PG_CONF", ev->GP_SEG_PG_CONF);
		set_httpheader(file, "X-GP_SEG_DATADIR", ev->GP_SEG_DATADIR);
		set_httpheader(file, "X-GP-DATABASE", ev->GP_DATABASE);
		set_httpheader(file, "X-GP-USER", ev->GP_USER);
		set_httpheader(file, "X-GP-SEG-PORT", ev->GP_SEG_PORT);
		set_httpheader(file, "X-GP-SESSION-ID", ev->GP_SESSION_ID);
	}
		
	{
		/*
		 * MPP-13031
		 * copy #transform fragment, if present, into X-GP-TRANSFORM header
		 */
		char* p = local_strstr(file->common.url, "#transform=");
		if (p && p[11])
			set_httpheader(file, "X-GP-TRANSFORM", p + 11);
	}

	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_HTTPHEADER, file->curl->x_httpheader);

	if (!multi_handle)
	{
		if (! (multi_handle = curl_multi_init()))
			elog(ERROR, "internal error: curl_multi_init failed");
	}

	/*
	 * SSL configuration
	 */
	if (IS_GPFDISTS_URI(url))
	{
		Assert(PointerIsValid(DataDir));
		elog(LOG,"trying to load certificates from %s", DataDir);

		/* curl will save its last error in curlErrorBuffer */
		CURL_EASY_SETOPT(file->curl->handle, CURLOPT_ERRORBUFFER, curl_Error_Buffer);

		/* cert is stored PEM coded in file... */
		CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSLCERTTYPE, "PEM");

		/* set the cert for client authentication */
		if (extssl_cert != NULL)
		{
			memset(extssl_cer_full, 0, MAXPGPATH);
			snprintf(extssl_cer_full, MAXPGPATH, "%s/%s", DataDir, extssl_cert);

			if (!is_file_exists(extssl_cer_full))
				ereport(ERROR,
						(errcode_for_file_access(),
						 errmsg("could not open certificate file \"%s\": %m",
								extssl_cer_full)));

			CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSLCERT, extssl_cer_full);
		}

		/* set the key passphrase */
		if (extssl_pass != NULL)
			CURL_EASY_SETOPT(file->curl->handle, CURLOPT_KEYPASSWD, extssl_pass);

		CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSLKEYTYPE,"PEM");

		/* set the private key (file or ID in engine) */
		if (extssl_key != NULL)
		{
			memset(extssl_key_full, 0, MAXPGPATH);
			snprintf(extssl_key_full, MAXPGPATH, "%s/%s", DataDir, extssl_key);

			if (!is_file_exists(extssl_key_full))
				ereport(ERROR,
						(errcode_for_file_access(),
						 errmsg("could not open private key file \"%s\": %m",
								extssl_key_full)));

			CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSLKEY, extssl_key_full);
		}

		/* set the file with the CA certificates, for validating the server */
		if (extssl_ca != NULL)
		{
			memset(extssl_cas_full, 0, MAXPGPATH);
			snprintf(extssl_cas_full, MAXPGPATH, "%s/%s", DataDir, extssl_ca);

			if (!is_file_exists(extssl_cas_full))
				ereport(ERROR,
						(errcode_for_file_access(),
						 errmsg("could not open private key file \"%s\": %m",
								extssl_cas_full)));

			CURL_EASY_SETOPT(file->curl->handle, CURLOPT_CAINFO, extssl_cas_full);
		}

		/* set cert verification */
		CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSL_VERIFYPEER,
				(long)(verify_gpfdists_cert ? extssl_verifycert : extssl_no_verifycert));

		/* set host verification */
		CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSL_VERIFYHOST,
				(long)(verify_gpfdists_cert ? extssl_verifyhost : extssl_no_verifyhost));

		/* set protocol */
		CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSLVERSION, extssl_protocol);

		/* disable session ID cache */
		CURL_EASY_SETOPT(file->curl->handle, CURLOPT_SSL_SESSIONID_CACHE, 0);

		/* set debug */
		if (CURLE_OK != (e = curl_easy_setopt(file->curl->handle, CURLOPT_VERBOSE, (long)extssl_libcurldebug)))
		{
			if (extssl_libcurldebug)
			{
				elog(INFO, "internal error: curl_easy_setopt CURLOPT_VERBOSE error (%d - %s)",
					 e, curl_easy_strerror(e));
			}
		}
	}

	/* Allocate input and output buffers. */
	file->in.ptr = palloc(1024);		/* 1 kB buffer initially */
	file->in.max = 1024;
	file->in.bot = file->in.top = 0;

	if (forwrite)
	{
		int			bufsize = writable_external_table_bufsize * 1024;

		file->out.ptr = (char *) palloc(bufsize);
		file->out.max = bufsize;
		file->out.bot = file->out.top = 0;
	}

	/*
	 * lets check our connection.
	 * start the fetch if we're SELECTing (GET request), or write an
	 * empty message if we're INSERTing (POST request)
	 */
	if (!forwrite)
	{
		gp_perform_backoff_and_check_response(file, multi_perform_work);
	}
	else
	{
		/* use empty message */
		CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDS, "");
		CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDSIZE, 0);

		/* post away and check response, retry if failed (timeout or * connect error) */
		gp_perform_backoff_and_check_response(file, easy_perform_work);
		file->seq_number++;
	}

	return (URL_FILE *) file;
}

/*
 * url_fclose: Disposes of resources associated with this external web table.
 *
 * If failOnError is true, errors encountered while closing the resource results
 * in raising an ERROR.  This is particularly true for "execute:" resources where
 * command termination is not reflected until close is called.  If failOnClose is
 * false, close errors are just logged.  failOnClose should be false when closure
 * is due to LIMIT clause satisfaction.
 *
 * relname is passed in for being available in data messages only.
 */
void
url_curl_fclose(URL_FILE *fileg, bool failOnError, const char *relname)
{
	URL_CURL_FILE *file = (URL_CURL_FILE *) fileg;
	StringInfoData sinfo;

	initStringInfo(&sinfo);

	/*
	 * if WET, send a final "I'm done" request from this segment.
	 */
	if (file->for_write && file->curl->handle != NULL)
		gp_proto0_write_done(file);

	destroy_curlhandle(file->curl);
	file->curl = NULL;

	/* free any allocated buffer space */
	if (file->in.ptr)
	{
		pfree(file->in.ptr);
		file->in.ptr = NULL;
	}

	if (file->curl_url)
	{
		pfree(file->curl_url);
		file->curl_url = NULL;
	}

	if (file->out.ptr)
	{
		Assert(file->for_write);
		pfree(file->out.ptr);
		file->out.ptr = NULL;
	}

	file->gp_proto = 0;
	file->error = file->eof = 0;
	memset(&file->in, 0, sizeof(file->in));
	memset(&file->block, 0, sizeof(file->block));

	pfree(file->common.url);

	pfree(file);
}

bool
url_curl_feof(URL_FILE *file, int bytesread)
{
	URL_CURL_FILE *cfile = (URL_CURL_FILE *) file;

	return (cfile->eof != 0);
}


bool
url_curl_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen)
{
	URL_CURL_FILE *cfile = (URL_CURL_FILE *) file;

	return (cfile->error != 0);
}

/*
 * gp_proto0_read
 *
 * get data from the server and handle it according to PROTO 0. In PROTO 0 we
 * expect the content of the file without any kind of meta info. Simple.
 */
static size_t
gp_proto0_read(char *buf, int bufsz, URL_CURL_FILE *file)
{
	int 		n = 0;

	fill_buffer(file, bufsz);

	/* check if there's data in the buffer - if not fill_buffer()
	 * either errored or EOF. For proto0, we cannot distinguish
	 * between error and EOF. */
	n = file->in.top - file->in.bot;
	if (n == 0 && !file->still_running)
		file->eof = 1;

	if (n > bufsz)
		n = bufsz;

	/* xfer data to caller */
	memcpy(buf, file->in.ptr + file->in.bot, n);
	file->in.bot += n;

	return n;
}

/*
 * gp_proto1_read
 *
 * get data from the server and handle it according to PROTO 1. In this protocol
 * each data block is tagged by meta info like this:
 * byte 0: type (can be 'F'ilename, 'O'ffset, 'D'ata, 'E'rror, 'L'inenumber)
 * byte 1-4: length. # bytes of following data block. in network-order.
 * byte 5-X: the block itself.
 */
static size_t
gp_proto1_read(char *buf, int bufsz, URL_CURL_FILE *file, CopyState pstate, char *buf2)
{
	char type;
	int  n, len;

	/*
	 * Loop through and get all types of messages, until we get actual data,
	 * or until there's no more data. Then quit the loop to process it and
	 * return it.
	 */
	while (file->block.datalen == 0 && !file->eof)
	{
		/* need 5 bytes, 1 byte type + 4 bytes length */
		fill_buffer(file, 5);
		n = file->in.top - file->in.bot;

		if (n == 0)
			ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("gpfdist error: server closed connection")));

		if (n < 5)
			ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("gpfdist error: incomplete packet - packet len %d", n)));

		/* read type */
		type = file->in.ptr[file->in.bot++];

		/* read len */
		memcpy(&len, &file->in.ptr[file->in.bot], 4);
		len = ntohl(len);		/* change order */
		file->in.bot += 4;

		if (len < 0)
			elog(ERROR, "gpfdist error: bad packet type %d len %d",
				 type, len);

		/* Error */
		if (type == 'E')
		{
			fill_buffer(file, len);
			n = file->in.top - file->in.bot;

			if (n > len)
				n = len;

			if (n > 0)
			{
				/*
				 * cheat a little. swap last char and
				 * NUL-terminator. then print string (without last
				 * char) and print last char artificially
				 */
				char x = file->in.ptr[file->in.bot + n - 1];
				file->in.ptr[file->in.bot + n - 1] = 0;
				ereport(ERROR,
						(errcode(ERRCODE_CONNECTION_FAILURE),
						 errmsg("gpfdist error - %s%c", &file->in.ptr[file->in.bot], x)));
			}

			elog(ERROR, "gpfdist error: please check gpfdist log messages.");

			return -1;
		}

		/* Filename */
		if (type == 'F')
		{
			if (buf != buf2)
			{
				file->in.bot -= 5;
				return 0;
			}
			if (len > 256)
				elog(ERROR, "gpfdist error: filename too long (%d)", len);

			if (-1 == fill_buffer(file, len))
				elog(ERROR, "gpfdist error: stream ends suddenly");

			/*
			 * If SREH is used we now update it with the actual file that the
			 * gpfdist server is reading. This is because SREH (or the client
			 * in general) doesn't know which file gpfdist is reading, since
			 * the original URL may include a wildcard or a directory listing.
			 */
			if (pstate->cdbsreh)
			{
				char fname[257];

				memcpy(fname, file->in.ptr + file->in.bot, len);
				fname[len] = 0;
				snprintf(pstate->cdbsreh->filename, sizeof pstate->cdbsreh->filename, "%s [%s]",
						 file->common.url, fname);
			}

			file->in.bot += len;
			Assert(file->in.bot <= file->in.top);
			continue;
		}

		/* Offset */
		if (type == 'O')
		{
			if (len != 8)
				elog(ERROR, "gpfdist error: offset not of length 8 (%d)", len);

			if (-1 == fill_buffer(file, len))
				elog(ERROR, "gpfdist error: stream ends suddenly");

			file->in.bot += 8;
			Assert(file->in.bot <= file->in.top);
			continue;
		}

		/* Line number */
		if (type == 'L')
		{
			int64 line_number;

			if (len != 8)
				elog(ERROR, "gpfdist error: line number not of length 8 (%d)", len);

			if (-1 == fill_buffer(file, len))
				elog(ERROR, "gpfdist error: stream ends suddenly");

			/*
			 * update the line number of the first line we're about to get from
			 * gpfdist. pstate will update the following lines when processing
			 * the data
			 */
			memcpy(&line_number, file->in.ptr + file->in.bot, len);
			line_number = local_ntohll(line_number);
			pstate->cur_lineno = line_number ? line_number : INT64_MIN;
			file->in.bot += 8;
			Assert(file->in.bot <= file->in.top);
			continue;
		}

		/* Data */
		if (type == 'D')
		{
			file->block.datalen = len;
			file->eof = (len == 0);
			break;
		}

		elog(ERROR, "gpfdist error: unknown meta type %d", type);
	}

	/* read data block */
	if (bufsz > file->block.datalen)
		bufsz = file->block.datalen;

	fill_buffer(file, bufsz);
	n = file->in.top - file->in.bot;

	/* if gpfdist closed connection prematurely or died catch it here */
	if (n == 0 && !file->eof)
	{
		file->error = 1;
		
		if (!file->still_running)
			ereport(ERROR,
					(errcode(ERRCODE_CONNECTION_FAILURE),
					 errmsg("gpfdist server closed connection"),
					 errhint("The root cause is likely to be an overload of the ETL host or "
							 "a temporary network glitch between the database and the ETL host "
							 "causing the connection between the gpfdist and database to disconnect.")));
	}

	if (n > bufsz)
		n = bufsz;

	memcpy(buf, file->in.ptr + file->in.bot, n);

	file->in.bot += n;
	file->block.datalen -= n;
	return n;
}

/*
 * gp_proto0_write
 *
 * use curl to write data to a the remote gpfdist server. We use
 * a push model with a POST request.
 */
static void
gp_proto0_write(URL_CURL_FILE *file, CopyState pstate)
{
	char*		buf = file->out.ptr;
	int		nbytes = file->out.top;

	if (nbytes == 0)
		return;

	/* post binary data */
	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDS, buf);

	/* set the size of the postfields data */
	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDSIZE, nbytes);

	/* set sequence number */
	char seq[128] = {0};
	snprintf(seq, sizeof(seq), INT64_FORMAT, file->seq_number);

	replace_httpheader(file, "X-GP-SEQ", seq);

	gp_perform_backoff_and_check_response(file, easy_perform_work);
	file->seq_number++;
}


/*
 * Send an empty POST request, with an added X-GP-DONE header.
 */
static void
gp_proto0_write_done(URL_CURL_FILE *file)
{
	set_httpheader(file, "X-GP-DONE", "1");

	/* use empty message */
	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDS, "");
	CURL_EASY_SETOPT(file->curl->handle, CURLOPT_POSTFIELDSIZE, 0);

	/* post away! */
	gp_perform_backoff_and_check_response(file, easy_perform_work);
}

static size_t
curl_fread(char *buf, int bufsz, URL_CURL_FILE *file, CopyState pstate)
{
	char*		p = buf;
	char*		q = buf + bufsz;
	int 		n;
	const int 	gp_proto = file->gp_proto;

	if (gp_proto != 0 && gp_proto != 1)
	{
		elog(ERROR, "unknown gp protocol %d", file->gp_proto);
		return 0;
	}

	for (; p < q; p += n)
	{
		if (gp_proto == 0)
			n = gp_proto0_read(p, q - p, file);
		else
			n = gp_proto1_read(p, q - p, file, pstate, buf);

		if (n <= 0)
			break;
	}

	return p - buf;
}

static size_t
curl_fwrite(char *buf, int nbytes, URL_CURL_FILE *file, CopyState pstate)
{
	if (!file->for_write)
		elog(ERROR, "cannot write to a read-mode external table");

	if (file->gp_proto != 0 && file->gp_proto != 1)
		elog(ERROR, "unknown gp protocol %d", file->gp_proto);

	/*
	 * if buffer is full (current item can't fit) - write it out to
	 * the server. if item still doesn't fit after we emptied the
	 * buffer, make more room.
	 */
	if (file->out.top + nbytes >= file->out.max)
	{
		/* item doesn't fit */
		if (file->out.top > 0)
		{
			/* write out existing data, empty the buffer */
			gp_proto0_write(file, pstate);
			file->out.top = 0;
		}
		
		/* does it still not fit? enlarge buffer */
		if (file->out.top + nbytes >= file->out.max)
		{
			int 	n = nbytes + 1024;
			char*	newbuf;

			newbuf = repalloc(file->out.ptr, n);

			if (!newbuf)
				elog(ERROR, "out of memory (curl_fwrite)");

			file->out.ptr = newbuf;
			file->out.max = n;

			Assert(nbytes < file->out.max);
		}
	}

	/* copy buffer into file->buf */
	memcpy(file->out.ptr + file->out.top, buf, nbytes);
	file->out.top += nbytes;

	return nbytes;
}

size_t
url_curl_fread(void *ptr, size_t size, URL_FILE *file, CopyState pstate)
{
	URL_CURL_FILE *cfile = (URL_CURL_FILE *) file;

	/* get data (up size) from the http/gpfdist server */
	return curl_fread(ptr, size, cfile, pstate);
}

size_t
url_curl_fwrite(void *ptr, size_t size, URL_FILE *file, CopyState pstate)
{
	URL_CURL_FILE *cfile = (URL_CURL_FILE *) file;

	/* write data to the gpfdist server via curl */
	return curl_fwrite(ptr, size, cfile, pstate);
}

/*
 * flush all remaining buffered data waiting to be written out to external source
 */
void
url_curl_fflush(URL_FILE *file, CopyState pstate)
{
	gp_proto0_write((URL_CURL_FILE *) file, pstate);
}

#else /* USE_CURL */


/* Dummy versions of all the url_curl_* functions, when built without libcurl. */

static void
curl_not_compiled_error(void)
{
	ereport(ERROR,
			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
			 errmsg("unsupported external table URI"),
			 errdetail("This functionality requires the server to be built with libcurl support."),
			 errhint("You need to rebuild the server using --with-libcurl.")));
}

URL_FILE *
url_curl_fopen(char *url, bool forwrite, extvar_t *ev, CopyState pstate)
{
	curl_not_compiled_error();
	return NULL; /* keep compiler quiet */
}
void
url_curl_fclose(URL_FILE *file, bool failOnError, const char *relname)
{
	curl_not_compiled_error();
}
bool url_curl_feof(URL_FILE *file, int bytesread)
{
	curl_not_compiled_error();
	return false; /* keep compiler quiet */
}
bool url_curl_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen)
{
	curl_not_compiled_error();
	return false; /* keep compiler quiet */
}
size_t url_curl_fread(void *ptr, size_t size, URL_FILE *file, CopyState pstate)
{
	curl_not_compiled_error();
	return 0; /* keep compiler quiet */
}
size_t url_curl_fwrite(void *ptr, size_t size, URL_FILE *file, CopyState pstate)
{
	curl_not_compiled_error();
	return 0; /* keep compiler quiet */
}
void url_curl_fflush(URL_FILE *file, CopyState pstate)
{
	curl_not_compiled_error();
}

#endif /* USE_CURL */

相关信息

greenplumn 源码目录

相关文章

greenplumn external 源码

greenplumn url 源码

greenplumn url_custom 源码

greenplumn url_execute 源码

greenplumn url_file 源码

0  赞