greenplumn ic_proxy_client 源码

  • 2022-08-18
  • 浏览 (261)

greenplumn ic_proxy_client 代码

文件路径:/src/backend/cdb/motion/ic_proxy_client.c

/*-------------------------------------------------------------------------
 *
 * ic_proxy_client.c
 *
 *    Interconnect Proxy Client
 *
 * A client lives in the proxy bgworker and connects to a backend, each logical
 * connection needs a client, so there could be multiple connections between
 * one backend process and the proxy bgworker, the same amount of clients are
 * needed, too.
 *
 * A local client communicates to exact one remote client, a special case is
 * that both clients are on the same segment, in which case both are called
 * loopback clients.
 *
 * A client is created on a new backend connection, it is registered after the
 * hand shaking.  The hand shaking is made between the backend and the client,
 * this is different with the ic-tcp or ic-udp modes, they do the hand shaking
 * between the local and remote backends.
 *
 * A client is identified with a key, every packet also contains such a key, so
 * when a packet is received we could know which client to route it to.
 *
 * Packets can arrive before the client registration, in such a case a
 * placeholder is registered to hold the early coming packets.  Once the client
 * finishes the hand shaking it replaces the placeholder and handles these
 * early packets in the arriving order.
 *
 * The interconnect treats the motion sender and the receiver differently,
 * however in ic-proxy we do not distinguish the sender client or the receiver
 * client.  Sometimes we will mention the words sender or receiver, they only
 * have logical meanings, and are not marked in the code.
 *
 * Sometimes a backend receives slower than the sender, so more and more
 * packets are put in the writing queue.  To prevent it consuming too many
 * memory we introduce an active flow control mechanism. The sender increases
 * the number of unack packets when it sends a packet to remote peer. The
 * sender will PAUSE itself when the number of unack packets exceeds the
 * threshold. The receiver will send a ACK message back to sender, when it
 * receives a batch of packets. After get ACK message, the sender will decrease
 * the number of unack packets. When the number of unack packets is below the
 * resume threshold, the sender continues to read data from the backend.
 *
 * Packets are routed in 2 directions, c2p and p2c:
 * - c2p packets are routed from a client to a peer;
 * - p2c packets are routed from a peer to a client;
 * - there is no term "c2c" because loopback packets are also handled as c2p
 *   by the sender and p2c by the receiver;
 *
 * The directions are not named as incoming and outgoing because the they can
 * lead to confusing descriptions: "a client receives an outgoing packet from
 * its backend", or, "a client sends an incoming packet to its backend".
 *
 *
 * Copyright (c) 2020-Present VMware, Inc. or its affiliates.
 *
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "ic_proxy_server.h"
#include "ic_proxy_pkt_cache.h"
#include "ic_proxy_router.h"
#include "utils/hsearch.h"

#include <uv.h>

typedef struct ICProxyClientEntry ICProxyClientEntry;


/*
 * TODO: a client only has one target, it's either a peer, or a loopback
 * client, there is no need to lookup it every time, the loopback client lookup
 * is expansive.
 *
 * We used to save the target since the very beginning, however it was buggy on
 * placeholders, so we disabled it.  If we want to reenable it be careful on
 * below notes:
 * - a peer can be a placeholder;
 * - a loopback client can be a placeholder;
 * - ibuf/obuf functions are not reentrantable, delayed queue is necessary;
 */
struct ICProxyClient
{
	uv_pipe_t	pipe;			/* the libuv handle */

	ICProxyKey	key;			/* the key to identify a client */

	uint32		state;			/* or'ed bits of below ones */
#define IC_PROXY_CLIENT_STATE_RECEIVED_HELLO  0x00000001
#define IC_PROXY_CLIENT_STATE_SENT_HELLO_ACK  0x00000002
#define IC_PROXY_CLIENT_STATE_REGISTERED      0x00000004
#define IC_PROXY_CLIENT_STATE_PLACEHOLDER     0x00000008
#define IC_PROXY_CLIENT_STATE_C2P_SHUTTING    0x00000010
#define IC_PROXY_CLIENT_STATE_C2P_SHUTTED     0x00000020
#define IC_PROXY_CLIENT_STATE_P2C_SHUTTING    0x00000040
#define IC_PROXY_CLIENT_STATE_P2C_SHUTTED     0x00000080
#define IC_PROXY_CLIENT_STATE_CLOSING         0x00000100
#define IC_PROXY_CLIENT_STATE_CLOSED          0x00000200
#define IC_PROXY_CLIENT_STATE_PAUSED          0x00000800

	int			unconsumed;		/* count of the packets that are unconsumed by
								 * the backend */
	int			sending;		/* count of the packets being sent, both c2p &
								 * p2c, both data & message */
	int			unackSendPkt;	/* count of the unack packets from sender */
	int			unackRecvPkt;	/* count of the unack packets from receiver */

	ICProxyOBuf	obuf;			/* obuf merges small outgoing packets into
								 * large ones to reduce network overhead */
	ICProxyIBuf	ibuf;			/* ibuf detects the packet boundaries */

	ICProxyClient *successor;	/* if a client is registered while a previous
								 * one with the same key is still alive, the
								 * registration is delayed, the new one marks
								 * itself as the successor of the previous one,
								 * when the previous one is deregistered the
								 * new one gets actually registered. */

	List	   *pkts;			/* early coming packets */

	char	   *name;			/* name of the client, only for logging */
#define IC_PROXY_CLIENT_NAME_SIZE 256

	/* TODO: statistics */
};

/*
 * Client hash table entry.
 */
struct ICProxyClientEntry
{
	ICProxyKey	key;			/* the hash key */

	ICProxyClient *client;		/* the client */
};


static void ic_proxy_client_clear_name(ICProxyClient *client);
static void ic_proxy_client_shutdown_c2p(ICProxyClient *client);
static void ic_proxy_client_shutdown_p2c(ICProxyClient *client);
static void ic_proxy_client_close(ICProxyClient *client);
static void ic_proxy_client_read_data(ICProxyClient *client);
static void ic_proxy_client_cache_p2c_pkt(ICProxyClient *client,
										  ICProxyPkt *pkt);
static void ic_proxy_client_cache_p2c_pkts(ICProxyClient *client, List *pkts);
static void ic_proxy_client_drop_p2c_cache(ICProxyClient *client);
static void ic_proxy_client_handle_p2c_cache(ICProxyClient *client);
static void ic_proxy_client_maybe_start_read_data(ICProxyClient *client);
static void ic_proxy_client_maybe_pause(ICProxyClient *client);
static void ic_proxy_client_maybe_send_ack_message(ICProxyClient *client);
static void ic_proxy_client_maybe_resume(ICProxyClient *client);


/*
 * The client register table.
 */
static HTAB		   *ic_proxy_clients = NULL;


void
ic_proxy_client_table_init(void)
{
	HASHCTL		hashctl;

	hashctl.hash = (HashValueFunc) ic_proxy_key_hash;
	hashctl.match = (HashCompareFunc) ic_proxy_key_equal_for_hash;
	hashctl.keycopy = memcpy;

	hashctl.keysize = sizeof(ICProxyKey);
	hashctl.entrysize = sizeof(ICProxyClientEntry);

	ic_proxy_clients = hash_create("ic-proxy clients",
								   MaxConnections /* nelem */,
								   &hashctl,
								   HASH_ELEM | HASH_FUNCTION |
								   HASH_COMPARE | HASH_KEYCOPY);
}

void
ic_proxy_client_table_uninit(void)
{
	if (ic_proxy_clients)
	{
		hash_destroy(ic_proxy_clients);
		ic_proxy_clients = NULL;
	}
}

/*
 * Shutdown all the clients who communicate with dbid.
 *
 * When a proxy-proxy connection to dbid is lost, all the logical connections
 * from or to it must be dropped, too.
 */
void
ic_proxy_client_table_shutdown_by_dbid(uint16 dbid)
{
	ICProxyClientEntry *entry;
	HASH_SEQ_STATUS seq;

	ic_proxy_log(LOG,
				 "ic-proxy-clients: shutting down all the clients by dbid %hu",
				 dbid);

	hash_seq_init(&seq, ic_proxy_clients);

	while ((entry = hash_seq_search(&seq)))
	{
		ICProxyClient *client = entry->client;

		/* cached pkts must also be dropped */
		ic_proxy_client_drop_p2c_cache(client);

		if (client->state & IC_PROXY_CLIENT_STATE_PLACEHOLDER)
			continue;

		if (client->key.remoteDbid != dbid)
			continue;

		ic_proxy_client_shutdown_c2p(client);
		ic_proxy_client_shutdown_p2c(client);
	}
}

/*
 * Register a client with its key.
 *
 * - if there was a placeholder, replace it;
 * - if there was a legacy but still alive one, make the client as its
 *   successor and delay the registration;
 */
static void
ic_proxy_client_register(ICProxyClient *client)
{
	ICProxyClientEntry *entry;
	bool		found;

	/* this should never happen */
	if (client->state & IC_PROXY_CLIENT_STATE_REGISTERED)
	{
		ic_proxy_log(WARNING, "%s: already registered",
					 ic_proxy_client_get_name(client));
		return;
	}

	entry = hash_search(ic_proxy_clients, &client->key, HASH_ENTER, &found);

	if (found)
	{
		/* someone with the same identifier comes earlier than me */
		ICProxyClient *placeholder = entry->client;

		/* this should never happen, if it does, sth. serious is wrong */
		if (placeholder == client)
		{
			ic_proxy_log(ERROR,
						 "%s: unregistered client found in the client table",
						 ic_proxy_client_get_name(client));
			return;
		}

		if (!(placeholder->state & IC_PROXY_CLIENT_STATE_PLACEHOLDER))
		{
			/*
			 * it's the client of last statement, for example in the query
			 * "alter table set distributed by", it contains multiple
			 * statements, some of them share the same command id.
			 *
			 * TODO: we believe the old one is shutting down, but what if not?
			 */
			ic_proxy_log(WARNING,
						 "%s: delay the register as the previous client is still shutting down",
						 ic_proxy_client_get_name(client));

			/* this should never happen, if it does, sth. serious is wrong */
			if (placeholder->successor != NULL)
				ic_proxy_log(ERROR,
							 "%s: the previous client already has a successor",
							 ic_proxy_client_get_name(client));

			/*
			 * register client as a successor, it will be actually registered
			 * when the placeholder unregister.
			 */
			placeholder->successor = client;
			return;
		}

		/* it's a placeholder, this happens if the pkts arrived before me */
		ic_proxy_log(LOG, "%s: replace my placeholder %s",
					 ic_proxy_client_get_name(client),
					 ic_proxy_client_get_name(placeholder));

		/*
		 * the only purpose of a placeholder is to save the early coming
		 * pkts, there should be no chance for it to be empty; but in case
		 * it really happens, don't panic, nothing serious.
		 */
		if (placeholder->pkts == NIL)
			ic_proxy_log(LOG, "%s: no cached pkts in the placeholder %s",
						 ic_proxy_client_get_name(client),
						 ic_proxy_client_get_name(placeholder));

		/* replace the placeholder / legacy client */
		entry->client = client;

		/* transfer the cached pkts */
		ic_proxy_client_cache_p2c_pkts(client, placeholder->pkts);
		placeholder->pkts = NIL;

		if (placeholder->state & IC_PROXY_CLIENT_STATE_PLACEHOLDER)
		{
			/*
			 * need to free the placeholder, but no need to unregister it as we
			 * replaced it already.  The placeholder has nothing more than
			 * itself, so free it directly.
			 */
			ic_proxy_free(placeholder);
			ic_proxy_log(LOG, "%s: freed my placeholder",
						 ic_proxy_client_get_name(client));
		}
	}
	else
	{
		entry->client = client;
	}

	client->state |= IC_PROXY_CLIENT_STATE_REGISTERED;

	/* clear the name so we could show the new name */
	ic_proxy_client_clear_name(client);

	ic_proxy_log(LOG, "%s: registered, %ld in total",
				 ic_proxy_client_get_name(client),
				 hash_get_num_entries(ic_proxy_clients));
}

/*
 * Unregister a client.
 *
 * If it has a successor, trigger its registration.
 *
 * If there are still unhandled pkts, transfer them to the successor or the
 * placeholder.  This happens on loopback connections, due to the limitation of
 * the delayed delivery, pkts can be pushed to a p2c shutting/shutted client,
 * these pkts belong to the next subplan in the same query.
 */
static void
ic_proxy_client_unregister(ICProxyClient *client)
{
	if (!(client->state & IC_PROXY_CLIENT_STATE_REGISTERED))
		return;

	hash_search(ic_proxy_clients, &client->key, HASH_REMOVE, NULL);

	client->state &= ~IC_PROXY_CLIENT_STATE_REGISTERED;

	ic_proxy_log(LOG, "%s: unregistered, %ld in total",
				 ic_proxy_client_get_name(client),
				 hash_get_num_entries(ic_proxy_clients));

	if (client->successor)
	{
		ICProxyClient *successor;

		successor = client->successor;
		client->successor = NULL;

		if (client->pkts)
		{
			ic_proxy_log(LOG, "%s: transfer %d unhandled pkts to my successor",
						 ic_proxy_client_get_name(client),
						 list_length(client->pkts));

			ic_proxy_client_cache_p2c_pkts(successor, client->pkts);
			client->pkts = NIL;
		}

		ic_proxy_log(LOG, "%s: re-register my successor",
					 ic_proxy_client_get_name(client));

		/* the successor must have not registered */
		Assert(!(successor->state & IC_PROXY_CLIENT_STATE_REGISTERED));

		ic_proxy_client_register(successor);

		/* the successor must have successfully registered */
		Assert(successor->state & IC_PROXY_CLIENT_STATE_REGISTERED);

		ic_proxy_client_maybe_start_read_data(successor);
	}
	else if (client->pkts)
	{
		ICProxyClient *placeholder;

		ic_proxy_log(LOG, "%s: transfer %d unhandled pkts to my placeholder",
					 ic_proxy_client_get_name(client),
					 list_length(client->pkts));

		placeholder = ic_proxy_client_new(client->pipe.loop,
										  true /* placeholder */);
		placeholder->key = client->key;
		ic_proxy_client_register(placeholder);

		ic_proxy_client_cache_p2c_pkts(placeholder, client->pkts);
		client->pkts = NIL;
	}
}

/*
 * Look up a client with a key.
 */
static ICProxyClient *
ic_proxy_client_lookup(const ICProxyKey *key)
{
	ICProxyClientEntry *entry;
	bool		found;

	entry = hash_search(ic_proxy_clients, key, HASH_FIND, &found);

	return found ? entry->client : NULL;
}

/*
 * Look up a client with a key, create a placeholder if not found.
 */
ICProxyClient *
ic_proxy_client_blessed_lookup(uv_loop_t *loop, const ICProxyKey *key)
{
	ICProxyClient *client = ic_proxy_client_lookup(key);

	if (!client)
	{
		client = ic_proxy_client_new(loop, true);
		client->key = *key;
		ic_proxy_client_register(client);

		ic_proxy_log(LOG, "%s: registered as a placeholder",
					 ic_proxy_client_get_name(client));
	}

	return client;
}

/*
 * Pass a c2p packet to the router.
 */
static void
ic_proxy_client_route_c2p_data(void *opaque, const void *data, uint16 size)
{
	const ICProxyPkt *pkt = data;
	ICProxyClient *client = opaque;

	Assert(ic_proxy_pkt_is_from_client(pkt, &client->key));
	Assert(ic_proxy_pkt_is_live(pkt, &client->key));

	ic_proxy_router_route(client->pipe.loop, ic_proxy_pkt_dup(pkt), NULL, NULL);
}

/*
 * Received a complete c2p packet.
 */
static void
ic_proxy_client_on_c2p_data_pkt(void *opaque, const void *data, uint16 size)
{
	ICProxyClient *client = opaque;

	ic_proxy_log(LOG, "%s: received B2C PKT [%d bytes] from the backend",
				 ic_proxy_client_get_name(client), size);

	/* increase the number of unack packets */
	client->unackSendPkt++;

	/* check whether pause threshold is reached */
	ic_proxy_client_maybe_pause(client);

	/*
	 * Send it out, but maybe not immediately.  The obuf helps to merge small
	 * packets into large ones, which reduces the network overhead
	 * significantly.
	 */
	ic_proxy_obuf_push(&client->obuf, data, size,
					   ic_proxy_client_route_c2p_data, client);
}

/*
 * Received c2p data, or events (error, eof).
 */
static void
ic_proxy_client_on_c2p_data(uv_stream_t *stream,
							ssize_t nread, const uv_buf_t *buf)
{
	ICProxyClient *client = CONTAINER_OF((void *) stream, ICProxyClient, pipe);

	if (unlikely(nread < 0))
	{
		if (nread != UV_EOF)
			ic_proxy_log(WARNING, "%s: fail to receive c2p DATA: %s",
						 ic_proxy_client_get_name(client), uv_strerror(nread));
		else
			ic_proxy_log(LOG, "%s: received EOF while waiting for c2p DATA",
						 ic_proxy_client_get_name(client));

		if (buf->base)
			ic_proxy_pkt_cache_free(buf->base);

		/*
		 * If a backend shuts down normally the ibuf should be empty, or
		 * contains exactly 1 byte, the STOP message.  However it's possible
		 * that the backend shuts down due to exception, in such a case there
		 * can be any amount of data left in the ibuf.
		 */
		if (client->ibuf.len > 0)
		{
			ic_proxy_log(LOG,
						 "%s: the ibuf still contains %d bytes,"
						 " flush before shutting down",
						 ic_proxy_client_get_name(client), client->ibuf.len);

			ic_proxy_ibuf_push(&client->ibuf, NULL, 0,
							   ic_proxy_client_on_c2p_data_pkt, client);
		}

		/* flush unsent data */
		ic_proxy_obuf_push(&client->obuf, NULL, 0,
						   ic_proxy_client_route_c2p_data, client);

		/* stop reading from the backend */
		uv_read_stop((uv_stream_t *) &client->pipe);

		/* inform the other side of the logical connection to close, too */
		ic_proxy_client_shutdown_c2p(client);
		return;
	}
	else if (unlikely(nread == 0))
	{
		if (buf->base)
			ic_proxy_pkt_cache_free(buf->base);

		/* EAGAIN or EWOULDBLOCK, retry */
		return;
	}

	/* TODO: Will this really happen? */
	if (client->state & IC_PROXY_CLIENT_STATE_PAUSED)
		ic_proxy_log(ERROR,
					 "%s: paused already, but still received DATA[%zd bytes] from the backend, state is 0x%08x",
					 ic_proxy_client_get_name(client), nread, client->state);

	ic_proxy_log(LOG, "%s: received DATA[%zd bytes] from the backend",
				 ic_proxy_client_get_name(client), nread);

	/*
	 * The c2p data needs to be handled as packets, the ibuf helps to find the
	 * boundaries.
	 */
	ic_proxy_ibuf_push(&client->ibuf, buf->base, nread,
					   ic_proxy_client_on_c2p_data_pkt, client);
	ic_proxy_pkt_cache_free(buf->base);
}

/*
 * Start reading from backend if the client is successfully registered.
 */
static void
ic_proxy_client_maybe_start_read_data(ICProxyClient *client)
{
	if (!(client->state & IC_PROXY_CLIENT_STATE_REGISTERED))
		/*
		 * the register is delayed as the previous client is still shutting
		 * down, do not send the ACK until that's done.
		 */
		return;

	ic_proxy_log(LOG, "%s: start receiving DATA",
				 ic_proxy_client_get_name(client));

	/* since now on, the client and backend communicate only via b2c packets */
	Assert(ic_proxy_ibuf_empty(&client->ibuf));
	ic_proxy_ibuf_uninit(&client->ibuf);
	ic_proxy_ibuf_init_b2c(&client->ibuf);

	/* and do not forget to init the obuf header */
	ic_proxy_message_init(ic_proxy_obuf_ensure_buffer(&client->obuf),
						  0, &client->key);

#ifdef USE_ASSERT_CHECKING
	/*
	 * this should never happen because no p2c pkt is handled yet, the early
	 * coming pkts are in the cache list, we are just going to handle them.
	 */
	if (client->state &
		(IC_PROXY_CLIENT_STATE_C2P_SHUTTING |
		 IC_PROXY_CLIENT_STATE_P2C_SHUTTING))
	{
		ic_proxy_log(WARNING,
					 "%s: state=0x%08x: unexpected shutting down in progress",
					 ic_proxy_client_get_name(client), client->state);
		return;
	}
#endif /* USE_ASSERT_CHECKING */

	/*
	 * now it's time to receive the normal data, it is important to start the
	 * reading before handling the cached pkts, so if the cached pkts contain
	 * the BYE we could stop the reading in time.
	 */
	ic_proxy_client_read_data(client);

	ic_proxy_client_handle_p2c_cache(client);
}

/*
 * The HELLO ACK is sent out.
 */
static void
ic_proxy_client_on_sent_hello_ack(void *opaque,
								  const ICProxyPkt *pkt, int status)
{
	ICProxyClient *client = opaque;

	if (status < 0)
	{
		ic_proxy_client_shutdown_p2c(client);
		return;
	}

	client->state |= IC_PROXY_CLIENT_STATE_SENT_HELLO_ACK;

	ic_proxy_client_register(client);

	ic_proxy_client_maybe_start_read_data(client);
}

/*
 * Received the complete HELLO message.
 */
static void
ic_proxy_client_on_hello_pkt(void *opaque, const void *data, uint16 size)
{
	const ICProxyPkt *pkt = data;
	ICProxyClient *client = opaque;
	ICProxyKey	key;
	ICProxyPkt *ackpkt;

	/* we only expect one HELLO message */
	uv_read_stop((uv_stream_t *) &client->pipe);

	if (!ic_proxy_pkt_is(pkt, IC_PROXY_MESSAGE_HELLO))
	{
		ic_proxy_log(WARNING, "%s: invalid %s",
					 ic_proxy_client_get_name(client),
					 ic_proxy_pkt_to_str(pkt));

		ic_proxy_client_shutdown_p2c(client);
		return;
	}

	ic_proxy_log(LOG, "%s: received %s from the backend",
				 ic_proxy_client_get_name(client), ic_proxy_pkt_to_str(pkt));

	client->state |= IC_PROXY_CLIENT_STATE_RECEIVED_HELLO;

	ic_proxy_key_from_c2p_pkt(&key, pkt);

	/*
	 * If we register before the HELLO ACK is sent, the peer has a chance to
	 * route data before HELLO ACK.  So we must register in the write callback.
	 */

	client->key = key;

	/* clear the name so we could show the new name */
	ic_proxy_client_clear_name(client);

	/* build a HELLO ACK */
	ic_proxy_key_reverse(&key);
	ackpkt = ic_proxy_message_new(IC_PROXY_MESSAGE_HELLO_ACK, &key);

	ic_proxy_router_write((uv_stream_t *) &client->pipe, ackpkt, 0,
						  ic_proxy_client_on_sent_hello_ack, client);
}

/*
 * Received the HELLO message, maybe only part of.
 */
static void
ic_proxy_client_on_hello_data(uv_stream_t *stream,
							  ssize_t nread, const uv_buf_t *buf)
{
	ICProxyClient *client = CONTAINER_OF((void *) stream, ICProxyClient, pipe);

	if (unlikely(nread < 0))
	{
		if (nread != UV_EOF)
			ic_proxy_log(WARNING, "%s: fail to receive HELLO: %s",
						 ic_proxy_client_get_name(client), uv_strerror(nread));
		else
			ic_proxy_log(LOG, "%s: received EOF while waiting for HELLO",
						 ic_proxy_client_get_name(client));

		if (buf->base)
			ic_proxy_pkt_cache_free(buf->base);

		ic_proxy_log(WARNING, "%s, TODO: fail to receive HELLO",
					 ic_proxy_client_get_name(client));
		/* 
		 * Pending data in a failed handshake is useless,
		 * no need to call shutdown() explicitly.
		 */
		ic_proxy_client_close(client);
		return;
	}
	else if (unlikely(nread == 0))
	{
		if (buf->base)
			ic_proxy_pkt_cache_free(buf->base);

		/* EAGAIN or EWOULDBLOCK, retry */
		return;
	}

	/* Hand shaking is done via p2p packets */
	ic_proxy_ibuf_push(&client->ibuf, buf->base, nread,
					   ic_proxy_client_on_hello_pkt, client);
	ic_proxy_pkt_cache_free(buf->base);
}

/*
 * Start reading the HELLO message.
 */
int
ic_proxy_client_read_hello(ICProxyClient *client)
{
	/* start reading the HELLO message */
	return uv_read_start((uv_stream_t *) &client->pipe,
						 ic_proxy_pkt_cache_alloc_buffer,
						 ic_proxy_client_on_hello_data);
}

/*
 * Start reading the DATA.
 */
static void
ic_proxy_client_read_data(ICProxyClient *client)
{
	int			ret;

	ret = uv_read_start((uv_stream_t *) &client->pipe,
						ic_proxy_pkt_cache_alloc_buffer,
						ic_proxy_client_on_c2p_data);

	if (ret < 0)
	{
		ic_proxy_log(WARNING, "%s: state=0x%08x: fail to start reading data: %s",
					 ic_proxy_client_get_name(client),
					 client->state, uv_strerror(ret));

		ic_proxy_client_shutdown_c2p(client);
	}
}

/*
 * Get the client name.
 *
 * The name is constructed from the state lazily.  When the state gets changed,
 * call ic_proxy_client_clear_name() to clear the name, so we could reconstruct
 * the name from the new state.
 */
const char *
ic_proxy_client_get_name(ICProxyClient *client)
{
	if (unlikely(client->name == NULL))
	{
		uint32		namesize = IC_PROXY_CLIENT_NAME_SIZE;
		const char *keystr = "";
		const char *suffix = "";

		if (client->state & IC_PROXY_CLIENT_STATE_REGISTERED)
			keystr = ic_proxy_key_to_str(&client->key);

		if (client->state & IC_PROXY_CLIENT_STATE_PLACEHOLDER)
			suffix = ".placeholder";

		client->name = ic_proxy_alloc(namesize);
		snprintf(client->name, namesize, "client%s%s", suffix, keystr);
	}

	return client->name;
}

static void
ic_proxy_client_clear_name(ICProxyClient *client)
{
	if (client->name)
	{
		ic_proxy_free(client->name);
		client->name = NULL;
	}
}

/*
 * Return the libuv stream handle of a client.
 */
uv_stream_t *
ic_proxy_client_get_stream(ICProxyClient *client)
{
	return (uv_stream_t *) &client->pipe;
}

ICProxyClient *
ic_proxy_client_new(uv_loop_t *loop, bool placeholder)
{
	ICProxyClient *client;

	client = ic_proxy_new(ICProxyClient);
	uv_pipe_init(loop, &client->pipe, false);

	client->pkts = NIL;
	client->state = 0;
	client->unconsumed = 0;
	client->sending = 0;
	client->unackSendPkt = 0;
	client->unackRecvPkt = 0;
	client->successor = NULL;
	client->name = NULL;

	ic_proxy_obuf_init_p2p(&client->obuf);

	/* hand shaking messages are in the p2p format */
	ic_proxy_ibuf_init_p2p(&client->ibuf);

	if (placeholder)
		client->state |= IC_PROXY_CLIENT_STATE_PLACEHOLDER;

	return client;
}

/*
 * Free a non-placeholder client.
 *
 * The placeholders are free()'ed directly in ic_proxy_client_register(), only
 * the non-placeholder clients will need this function.
 */
static void
ic_proxy_client_free(ICProxyClient *client)
{
	ic_proxy_log(LOG, "%s: freeing", ic_proxy_client_get_name(client));

	/*
	 * This should not happen, but in case it happens we want to leave a
	 * message in the logs to help us understand why.
	 */
	ic_proxy_client_drop_p2c_cache(client);

	ic_proxy_obuf_uninit(&client->obuf);
	ic_proxy_ibuf_uninit(&client->ibuf);

	Assert(client->successor == NULL);

	ic_proxy_client_clear_name(client);
	ic_proxy_free(client);
}

/*
 * The client is closed.
 */
static void
ic_proxy_client_on_close(uv_handle_t *handle)
{
	ICProxyClient *client = CONTAINER_OF((void *) handle, ICProxyClient, pipe);

	if (client->state & IC_PROXY_CLIENT_STATE_CLOSED)
		ic_proxy_log(ERROR, "%s: double close",
					 ic_proxy_client_get_name(client));

	client->state |= IC_PROXY_CLIENT_STATE_CLOSED;

	ic_proxy_log(LOG, "%s: closed", ic_proxy_client_get_name(client));

	ic_proxy_client_free(client);
}

/*
 * Close a client.
 */
static void
ic_proxy_client_close(ICProxyClient *client)
{
	if (client->state & IC_PROXY_CLIENT_STATE_CLOSING)
		return;

	client->state |= IC_PROXY_CLIENT_STATE_CLOSING;

	ic_proxy_client_unregister(client);

	ic_proxy_log(LOG, "%s: closing", ic_proxy_client_get_name(client));
	uv_close((uv_handle_t *) &client->pipe, ic_proxy_client_on_close);
}

/*
 * Close a client if it is ready to close.
 *
 * Where ready means below conditions are both met:
 * - no data or message pkts being sent;
 * - both c2p and p2c are shutted down;
 *
 * Return true if it is ready to close, false otherwise.
 */
static bool
ic_proxy_client_maybe_close(ICProxyClient *client)
{
	uint32		bits = 0;

	bits |= IC_PROXY_CLIENT_STATE_C2P_SHUTTED;
	bits |= IC_PROXY_CLIENT_STATE_P2C_SHUTTED;

	if (client->sending == 0 && ((client->state & bits) == bits))
	{
		ic_proxy_client_close(client);
		return true;
	}

	return false;
}

/*
 * Sent the c2p BYE message.
 */
static void
ic_proxy_client_on_sent_c2p_bye(void *opaque, const ICProxyPkt *pkt, int status)
{
	ICProxyClient *client = opaque;

	if (status < 0)
	{
		/*
		 * Failed to send the c2p BYE, maybe the peer connection is already
		 * lost.  This can be safely ignored, we could just behave as it is
		 * sent out.
		 */
		ic_proxy_log(LOG, "%s: fail to shutdown c2p",
					 ic_proxy_client_get_name(client));

		/*
		 * When we fail to send the BYE, should we trigger the shutdown_p2c
		 * process immediately?
		 */
	}

	ic_proxy_log(LOG, "%s: shutted down c2p", ic_proxy_client_get_name(client));

	client->sending--;

	client->state |= IC_PROXY_CLIENT_STATE_C2P_SHUTTED;
	ic_proxy_client_maybe_close(client);
}

/*
 * Shutdown in c2p direction.
 */
static void
ic_proxy_client_shutdown_c2p(ICProxyClient *client)
{
	/* Do not re-shutdown if we are already doing so or even have done */
	if (client->state & IC_PROXY_CLIENT_STATE_C2P_SHUTTING)
	{
		ic_proxy_client_maybe_close(client);
		return;
	}

	ic_proxy_log(LOG, "%s: shutting down c2p",
				 ic_proxy_client_get_name(client));

	client->state |= IC_PROXY_CLIENT_STATE_C2P_SHUTTING;

	if (client->state & IC_PROXY_CLIENT_STATE_REGISTERED)
	{
		/*
		 * For a registered client, try to send a c2p BYE message to the
		 * remote.
		 */

		ICProxyPkt *pkt = ic_proxy_message_new(IC_PROXY_MESSAGE_BYE,
											   &client->key);

		client->sending++;

		ic_proxy_router_route(client->pipe.loop, pkt,
							  ic_proxy_client_on_sent_c2p_bye, client);
	}
	else
	{
		/*
		 * For a non-registered client, simply behave as if there is a remote
		 * and it has received the BYE and sent back a P2C BYE.
		 */

		ic_proxy_log(LOG, "%s: shutted down c2p",
					 ic_proxy_client_get_name(client));

		client->state |= IC_PROXY_CLIENT_STATE_C2P_SHUTTED;

		ic_proxy_client_shutdown_p2c(client);
	}
}

/*
 * Shutted down in p2c direction.
 */
static void
ic_proxy_client_on_shutdown_p2c(uv_shutdown_t *req, int status)
{
	ICProxyClient *client = CONTAINER_OF((void *) req->handle, ICProxyClient, pipe);

	ic_proxy_free(req);

	if (status < 0)
		ic_proxy_log(WARNING, "%s: fail to shutdown p2c: %s",
					 ic_proxy_client_get_name(client), uv_strerror(status));
	else
		ic_proxy_log(LOG, "%s: shutted down p2c",
					 ic_proxy_client_get_name(client));

	client->state |= IC_PROXY_CLIENT_STATE_P2C_SHUTTED;
	ic_proxy_client_maybe_close(client);
}

/*
 * Shutdown in p2c direction.
 */
static void
ic_proxy_client_shutdown_p2c(ICProxyClient *client)
{
	uv_shutdown_t *req;

	if (client->state & IC_PROXY_CLIENT_STATE_P2C_SHUTTING)
	{
		ic_proxy_client_maybe_close(client);
		return;
	}

	ic_proxy_log(LOG, "%s: shutting down p2c",
				 ic_proxy_client_get_name(client));

	client->state |= IC_PROXY_CLIENT_STATE_P2C_SHUTTING;

	req = ic_proxy_new(uv_shutdown_t);
	uv_shutdown(req, (uv_stream_t *) &client->pipe,
				ic_proxy_client_on_shutdown_p2c);
}

/*
 * Sent c2p RESUME message.
 */
static void
ic_proxy_client_on_sent_c2p_resume(void *opaque,
								   const ICProxyPkt *pkt, int status)
{
	ICProxyClient *client = opaque;

	if (status < 0)
	{
		/*
		 * TODO: Fail to send the RESUME, should we retry instead of shutting
		 * down?
		 */
		ic_proxy_client_shutdown_p2c(client);
	}

	client->sending--;

	ic_proxy_client_maybe_close(client);
}

/*
 * Routed p2c DATA to the backend.
 */
static void
ic_proxy_client_on_sent_p2c_data(void *opaque,
								 const ICProxyPkt *pkt, int status)
{
	ICProxyClient *client = opaque;

	/* this could happen if the backend errored out */
	if (status < 0)
		ic_proxy_client_shutdown_c2p(client);

	client->unconsumed--;
	client->sending--;
	client->unackRecvPkt++;

	ic_proxy_client_maybe_send_ack_message(client);
}

/*
 * Received a complete p2c message.
 *
 * This should not be called by a placeholder.
 */
static void
ic_proxy_client_on_p2c_message(ICProxyClient *client, const ICProxyPkt *pkt,
							   ic_proxy_sent_cb callback, void *opaque)
{
	if (ic_proxy_pkt_is(pkt, IC_PROXY_MESSAGE_BYE))
	{
		ic_proxy_log(LOG, "%s: received %s",
					 ic_proxy_client_get_name(client),
					 ic_proxy_pkt_to_str(pkt));

		ic_proxy_client_maybe_resume(client);

		ic_proxy_client_shutdown_p2c(client);
	}
	else if (ic_proxy_pkt_is(pkt, IC_PROXY_MESSAGE_DATA_ACK))
	{
		ic_proxy_log(LOG, "%s: received %s, with %d existing unack packets",
					 ic_proxy_client_get_name(client),
					 ic_proxy_pkt_to_str(pkt),
					 client->unackSendPkt);

		client->unackSendPkt -= IC_PROXY_ACK_INTERVAL;

#if 0
		/* for debug purpose */
		if (client->unackSendPkt < 0)
			ic_proxy_log(WARNING, "%s: unexpected number of unack packets: %d",
						 ic_proxy_client_get_name(client),
						 client->unackSendPkt);
#endif

		ic_proxy_client_maybe_resume(client);
	}
	else
	{
		ic_proxy_log(ERROR, "%s: unsupported message",
					 ic_proxy_client_get_name(client));
	}
}

/*
 * Received a complete p2c DATA / MESSAGE packet.
 *
 * The client will takes the ownership of the pkt.
 */
void
ic_proxy_client_on_p2c_data(ICProxyClient *client, ICProxyPkt *pkt,
							ic_proxy_sent_cb callback, void *opaque)
{
	/* A placeholder does not send any packets, it always cache them */
	if (client->state & IC_PROXY_CLIENT_STATE_PLACEHOLDER)
	{
		/* FIXME: also save the callback */
		ic_proxy_client_cache_p2c_pkt(client, pkt);
	}
#ifdef USE_ASSERT_CHECKING
	else if (!ic_proxy_pkt_is_to_client(pkt, &client->key))
	{
		ic_proxy_log(WARNING, "%s: the %s is not to me",
					 ic_proxy_client_get_name(client),
					 ic_proxy_pkt_to_str(pkt));
		/* TODO: callback? */
		ic_proxy_pkt_cache_free(pkt);
	}
#endif /* USE_ASSERT_CHECKING */
	else if (!ic_proxy_pkt_is_live(pkt, &client->key))
	{
		if (ic_proxy_pkt_is_out_of_date(pkt, &client->key))
		{
			ic_proxy_log(LOG, "%s: drop out-of-date %s",
						 ic_proxy_client_get_name(client),
						 ic_proxy_pkt_to_str(pkt));
			/* TODO: callback? */
			ic_proxy_pkt_cache_free(pkt);
		}
		else
		{
			Assert(ic_proxy_pkt_is_in_the_future(pkt, &client->key));

			ic_proxy_log(LOG, "%s: future %s",
						 ic_proxy_client_get_name(client),
						 ic_proxy_pkt_to_str(pkt));
			/* FIXME: also save the callback */
			ic_proxy_client_cache_p2c_pkt(client, pkt);
		}
	}
	/* the pkt is valid and live, send it to the backend */
	else if (ic_proxy_pkt_is(pkt, IC_PROXY_MESSAGE_DATA))
	{
		if (client->state & IC_PROXY_CLIENT_STATE_P2C_SHUTTING)
		{
			/*
			 * a special kind of future packet happens between different stats
			 * of a query, the client and its successor have the same keys,
			 * such a future packet can not be detected in previous checks,
			 * we could only tell accoding to the client state.
			 */
			ic_proxy_client_cache_p2c_pkt(client, pkt);
			return;
		}

		client->unconsumed++;
		client->sending++;

		Assert(callback == NULL);
		ic_proxy_router_write((uv_stream_t *) &client->pipe,
							  pkt, sizeof(*pkt),
							  ic_proxy_client_on_sent_p2c_data, client);
	}
	else
	{
		ic_proxy_client_on_p2c_message(client, pkt, callback, opaque);
		ic_proxy_pkt_cache_free(pkt);
	}
}

/*
 * Save a packet for later handling.
 *
 * The client will takes the ownership of the pkt.
 */
static void
ic_proxy_client_cache_p2c_pkt(ICProxyClient *client, ICProxyPkt *pkt)
{
	/* TODO: drop out-of-date pkt directly */
	/* TODO: verify the pkt is to client */

	client->pkts = lappend(client->pkts, pkt);

	ic_proxy_log(LOG, "%s: cached a %s for future use, %d in the list",
				 ic_proxy_client_get_name(client), ic_proxy_pkt_to_str(pkt),
				 list_length(client->pkts));
}

/*
 * Save a list of packets for later handling.
 *
 * The client will takes the ownership of the pkts.
 */
static void
ic_proxy_client_cache_p2c_pkts(ICProxyClient *client, List *pkts)
{
	/* TODO: verify the pkts is to client */

	client->pkts = list_concat(client->pkts, pkts);

	ic_proxy_log(LOG,
				 "%s: cached %d pkts for future use, %d in the list",
				 ic_proxy_client_get_name(client),
				 list_length(pkts), list_length(client->pkts));
}

/*
 * Drop all the cached p2c packets.
 */
static void
ic_proxy_client_drop_p2c_cache(ICProxyClient *client)
{
	ListCell   *cell;

	if (!client->pkts)
		return;

	foreach(cell, client->pkts)
	{
		ICProxyPkt *pkt = lfirst(cell);

		ic_proxy_log(WARNING, "%s: unhandled cached %s, dropping it",
					 ic_proxy_client_get_name(client),
					 ic_proxy_pkt_to_str(pkt));

		ic_proxy_pkt_cache_free(pkt);
	}

	list_free(client->pkts);
	client->pkts = NIL;
}

/*
 * Handle all the cached p2c packets.
 *
 * Out-of-date ones will be droped, future ones will still be cached, only the
 * live ones are handled.
 *
 * This function is only called on a new client, so it is not so expansive
 * to rebuild the cache list.
 */
static void
ic_proxy_client_handle_p2c_cache(ICProxyClient *client)
{
	int			count = 0;

	/* A placeholder does not handle any packets */
	if (client->state & IC_PROXY_CLIENT_STATE_PLACEHOLDER)
		return;

	if (client->pkts == NIL)
		return;

	ic_proxy_log(LOG, "%s: trying to consume the %d cached pkts",
				 ic_proxy_client_get_name(client), list_length(client->pkts));

	/*
	 * Consume the pkts one by one, stop immediately if the client begin to
	 * shutdown in the p2c direction.
	 */
	while (client->pkts != NIL &&
		   !(client->state & IC_PROXY_CLIENT_STATE_P2C_SHUTTING))
	{
		ICProxyPkt *pkt = linitial(client->pkts);

		count++;

		/*
		 * The pkt's ownership is taken by ic_proxy_client_on_p2c_data(), so
		 * only need to free the list cell itself.
		 */
		client->pkts = list_delete_first(client->pkts);

		/* FIXME: callback */
		ic_proxy_client_on_p2c_data(client, pkt, NULL, NULL);
	}

	ic_proxy_log(LOG, "%s: consumed %d cached pkts",
				 ic_proxy_client_get_name(client), count);
}

/*
 * Receiver sends ack message to the sender
 */
static void
ic_proxy_client_maybe_send_ack_message(ICProxyClient *client)
{
	ic_proxy_log(LOG, "%s: %d unconsumed packets to the backend",
				 ic_proxy_client_get_name(client), client->unconsumed);

	/*
	 * Send ack message when the unackRecvPkt exceeds the threshold
	 */
	if (client->unackRecvPkt >= IC_PROXY_ACK_INTERVAL)
	{
		ICProxyPkt *pkt = ic_proxy_message_new(IC_PROXY_MESSAGE_DATA_ACK,
											   &client->key);

		client->sending++;
		client->unackRecvPkt -= IC_PROXY_ACK_INTERVAL;

		ic_proxy_router_route(client->pipe.loop, pkt,
							  ic_proxy_client_on_sent_c2p_resume, client);
	}
}

/*
 * PAUSE if the number of unack-packets execeeds the pause threshold
 */
static void
ic_proxy_client_maybe_pause(ICProxyClient *client)
{
	/*
	 * If the number of unack-packets execeeds the pause threshold, the
	 * ic_proxy sender should stop reading from backend.
	 *
	 * The ic_proxy receiver will send ack message back to the sender, when
	 * the number of unack-packets is below the resume threshold, the sender
	 * continues to read from the backend again.
	 */
	if (client->unackSendPkt >= IC_PROXY_TRESHOLD_UNACK_PACKET_PAUSE
		&& !(client->state & IC_PROXY_CLIENT_STATE_PAUSED))
	{
		client->state |= IC_PROXY_CLIENT_STATE_PAUSED;
		uv_read_stop((uv_stream_t *) &client->pipe);

		ic_proxy_log(LOG, "%s: paused", ic_proxy_client_get_name(client));
	}
}

/*
 * RESUME if the number of unack packets is below the resume threshold
 */
static void
ic_proxy_client_maybe_resume(ICProxyClient *client)
{
	if (client->unackSendPkt <= IC_PROXY_TRESHOLD_UNACK_PACKET_RESUME
		&& (client->state & IC_PROXY_CLIENT_STATE_PAUSED))
	{
		ic_proxy_client_read_data(client);

		client->state &= ~IC_PROXY_CLIENT_STATE_PAUSED;

		ic_proxy_log(LOG, "%s: resumed", ic_proxy_client_get_name(client));
	}
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbmotion 源码

greenplumn htupfifo 源码

greenplumn ic_common 源码

greenplumn ic_proxy 源码

greenplumn ic_proxy_addr 源码

greenplumn ic_proxy_addr 源码

greenplumn ic_proxy_backend 源码

greenplumn ic_proxy_backend 源码

greenplumn ic_proxy_bgworker 源码

greenplumn ic_proxy_iobuf 源码

0  赞