greenplumn ic_proxy_router 源码

  • 2022-08-18
  • 浏览 (387)

greenplumn ic_proxy_router 代码

文件路径:/src/backend/cdb/motion/ic_proxy_router.c

/*-------------------------------------------------------------------------
 *
 * ic_proxy_router.c
 *
 *    Interconnect Proxy Router
 *
 * A router routes a packet to the correct target, a client or a peer.
 *
 * Copyright (c) 2020-Present VMware, Inc. or its affiliates.
 *
 *
 *-------------------------------------------------------------------------
 */


#include "ic_proxy.h"
#include "ic_proxy_server.h"
#include "ic_proxy_router.h"
#include "ic_proxy_packet.h"
#include "ic_proxy_pkt_cache.h"
#include "ic_proxy_server.h"


typedef struct ICProxyWriteReq ICProxyWriteReq;
typedef struct ICProxyLoopback ICProxyLoopback;


/*
 * A router write request.
 *
 * It is similar to the libuv write request, however the router will take care
 * of the common part, such as the freeing of the packet and the request, so
 * the caller can focus on the real business.
 */
struct ICProxyWriteReq
{
	uv_write_t	req;			/* the libuv write request */

	ic_proxy_sent_cb callback;	/* the callback */
	void	   *opaque;			/* the callback data */
};

/*
 * The loopback packet queue.
 *
 * Loopback packets can not be routed immediately, refer to ICProxyDelay for
 * details.  They are first put in a queue, and are actually routed in a libuv
 * check callback, it is triggered after all the current I/O events are
 * handled, so there will be no misordered packets, and no reentrance to some
 * critical functions.
 */
struct ICProxyLoopback
{
	uv_check_t	check;					/* the libuv check handle */

	List	   *queue;					/* List<ICProxyWriteReq *> */
};


static ICProxyLoopback ic_proxy_router_loopback;


/*
 * The loopback check is triggered.
 */
static void
ic_proxy_router_loopback_on_check(uv_check_t *handle)
{
	List	   *queue;
	ListCell   *cell;

	/*
	 * Stop the check callback, all the queued loopback packets will be routed
	 * in this round.  This must happen before routing the packets, so if new
	 * loopback packets are queued, the check callback can be re-turned on,
	 * those packets will be handled next round.
	 *
	 * TODO: the new loopback packets can be handled in this round, too.
	 * Queueing them to next round means it will not be triggered until some
	 * I/O events happen.  In current logic there is the per second timer, so
	 * in worst case the new loopback packets are delayed for 1 second.  If the
	 * timer is paused in the future, as an optimization, then in worst case
	 * the new packets may not get a chance to be routed.  The only concern on
	 * handling them now is that if a infinite ping-pong happens, this function
	 * would never return to the mainloop.  It's unlikely to happen, though,
	 * and we could prevent that by adding a round count limit.
	 */
	uv_check_stop(&ic_proxy_router_loopback.check);

	/*
	 * We must detach the queue before handling it, in case some new packets
	 * are queued during the process
	 */
	queue = ic_proxy_router_loopback.queue;
	ic_proxy_router_loopback.queue = NIL;

	foreach(cell, queue)
	{
		ICProxyDelay *delay = lfirst(cell);
		ICProxyClient *client;
		ICProxyKey	key;

		/* Loopback packets are always to a loopback client */
		ic_proxy_key_from_p2c_pkt(&key, delay->pkt);
		client = ic_proxy_client_blessed_lookup(handle->loop, &key);

		ic_proxy_log(LOG, "ic-proxy-router: looped back %s to %s",
					 ic_proxy_pkt_to_str(delay->pkt),
					 ic_proxy_client_get_name(client));

		ic_proxy_client_on_p2c_data(client, delay->pkt,
									delay->callback, delay->opaque);

		/* do not forget to call the callback */
		if (delay->callback)
			delay->callback(delay->opaque, delay->pkt, 0);

		/* and do not forget to free the memory */
		ic_proxy_free(delay);
	}

	list_free(queue);
}

/*
 * Push a loopback packet to the queue.
 */
static void
ic_proxy_router_loopback_push(ICProxyPkt *pkt,
							  ic_proxy_sent_cb callback, void *opaque)
{
	ICProxyDelay *delay;

	ic_proxy_log(LOG, "ic-proxy-router: looping back %s",
				 ic_proxy_pkt_to_str(pkt));

	/*
	 * Enable the libuv check callback if not yet.
	 */
	if (ic_proxy_router_loopback.queue == NIL)
		uv_check_start(&ic_proxy_router_loopback.check,
					   ic_proxy_router_loopback_on_check);

	/*
	 * Loopback packets are always to a loopback client, so it's safe to pass
	 * NULL as the peer.
	 */
	delay = ic_proxy_peer_build_delay(NULL, pkt, callback, opaque);
	ic_proxy_router_loopback.queue = lappend(ic_proxy_router_loopback.queue, delay);
}

/*
 * Initialize the router.
 */
void
ic_proxy_router_init(uv_loop_t *loop)
{
	uv_check_init(loop, &ic_proxy_router_loopback.check);
	ic_proxy_router_loopback.queue = NIL;
}

/*
 * Cleanup the router.
 */
void
ic_proxy_router_uninit(void)
{
	List	   *queue;
	ListCell   *cell;

	queue = ic_proxy_router_loopback.queue;
	ic_proxy_router_loopback.queue = NIL;

	uv_check_stop(&ic_proxy_router_loopback.check);

	foreach(cell, queue)
	{
		ICProxyDelay *delay = lfirst(cell);

		/*
		 * TODO: this function is only called on exiting, so it's better to
		 * drop the callbacks silently, right?
		 */
#if 0
		if (delay->callback)
			delay->callback(delay->opaque, pkt, UV_ECANCELED);
#endif

		ic_proxy_pkt_cache_free(delay->pkt);
		ic_proxy_free(delay);
	}

	list_free(queue);
}

/*
 * Route a packet.
 */
void
ic_proxy_router_route(uv_loop_t *loop, ICProxyPkt *pkt,
					  ic_proxy_sent_cb callback, void *opaque)
{
	if (pkt->dstDbid == pkt->srcDbid)
	{
		/*
		 * For a loopback target, we do not need to send the packet via a peer,
		 * we could pass the packet to the target client via
		 * ic_proxy_client_on_p2c_data(), however that function is not
		 * reentrantable, which happens on PAUSE & RESUME messages, so we must
		 * schedule it in a libuv check callback.
		 *
		 * TODO: when callback is NULL, we could pass the packet immediately.
		 */
		ic_proxy_router_loopback_push(pkt, callback, opaque);
	}
	else if (pkt->dstDbid == GpIdentity.dbid)
	{
		ICProxyClient *client;
		ICProxyKey	key;

		ic_proxy_key_from_p2c_pkt(&key, pkt);
		client = ic_proxy_client_blessed_lookup(loop, &key);

		ic_proxy_log(LOG, "ic-proxy-router: routing %s to %s",
					 ic_proxy_pkt_to_str(pkt),
					 ic_proxy_client_get_name(client));

		ic_proxy_client_on_p2c_data(client, pkt, callback, opaque);
	}
	else
	{
		ICProxyPeer *peer;

		peer = ic_proxy_peer_blessed_lookup(loop,
											pkt->dstContentId, pkt->dstDbid);

		ic_proxy_log(LOG, "ic-proxy-router: routing %s to %s",
					 ic_proxy_pkt_to_str(pkt), peer->name);

		ic_proxy_peer_route_data(peer, pkt, callback, opaque);
	}
}

/*
 * The packet is written.
 */
static void
ic_proxy_router_on_write(uv_write_t *req, int status)
{
	ICProxyWriteReq *wreq = (ICProxyWriteReq *) req;
	ICProxyPkt *pkt = req->data;

	if (status < 0)
		ic_proxy_log(LOG, "ic-proxy-router: fail to send %s: %s",
					 ic_proxy_pkt_to_str(pkt), uv_strerror(status));
	else
		ic_proxy_log(LOG, "ic-proxy-router: sent %s",
					 ic_proxy_pkt_to_str(pkt));

	if (wreq->callback)
		wreq->callback(wreq->opaque, pkt, status);

	ic_proxy_pkt_cache_free(pkt);
	ic_proxy_free(req);
}

/*
 * Write a packet to a libuv stream.
 *
 * This is a simple wrapper for the uv_write() function.  The boring parts,
 * like buffer & request management, are handled by this wrapper, so the caller
 * can focus on the real business.
 *
 * It can write the packet at a specific offset, this is useful when writing
 * data from the client to the backend, the backend wants headless data, so the
 * client can specify sizeof(ICProxyPkt) as the offset.
 *
 * - stream: the target stream, usually a peer or a client;
 * - pkt: the data to write, the ownership is taken;
 * - offset: the data offset to write from, usually 0 when writing to a peer,
 *   or sizeof(ICProxyPkt) when writing to a client;
 * - callback: the callback function;
 * - opaque: the callback data;
 */
void
ic_proxy_router_write(uv_stream_t *stream, ICProxyPkt *pkt, int32 offset,
					  ic_proxy_sent_cb callback, void *opaque)
{
	ICProxyWriteReq *wreq;
	uv_buf_t	wbuf;

	ic_proxy_log(LOG, "ic-proxy-router: sending %s", ic_proxy_pkt_to_str(pkt));

	wreq = ic_proxy_new(ICProxyWriteReq);

	wreq->req.data = pkt;
	wreq->callback = callback;
	wreq->opaque = opaque;

	wbuf.base = ((char *) pkt) + offset;
	wbuf.len = pkt->len - offset;

	uv_write(&wreq->req, stream, &wbuf, 1, ic_proxy_router_on_write);
}

相关信息

greenplumn 源码目录

相关文章

greenplumn cdbmotion 源码

greenplumn htupfifo 源码

greenplumn ic_common 源码

greenplumn ic_proxy 源码

greenplumn ic_proxy_addr 源码

greenplumn ic_proxy_addr 源码

greenplumn ic_proxy_backend 源码

greenplumn ic_proxy_backend 源码

greenplumn ic_proxy_bgworker 源码

greenplumn ic_proxy_client 源码

0  赞