greenplumn barrier 源码

  • 2022-08-18
  • 浏览 (263)

greenplumn barrier 代码

文件路径:/src/backend/storage/ipc/barrier.c

/*-------------------------------------------------------------------------
 *
 * barrier.c
 *	  Barriers for synchronizing cooperating processes.
 *
 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * From Wikipedia[1]: "In parallel computing, a barrier is a type of
 * synchronization method.  A barrier for a group of threads or processes in
 * the source code means any thread/process must stop at this point and cannot
 * proceed until all other threads/processes reach this barrier."
 *
 * This implementation of barriers allows for static sets of participants
 * known up front, or dynamic sets of participants which processes can join or
 * leave at any time.  In the dynamic case, a phase number can be used to
 * track progress through a parallel algorithm, and may be necessary to
 * synchronize with the current phase of a multi-phase algorithm when a new
 * participant joins.  In the static case, the phase number is used
 * internally, but it isn't strictly necessary for client code to access it
 * because the phase can only advance when the declared number of participants
 * reaches the barrier, so client code should be in no doubt about the current
 * phase of computation at all times.
 *
 * Consider a parallel algorithm that involves separate phases of computation
 * A, B and C where the output of each phase is needed before the next phase
 * can begin.
 *
 * In the case of a static barrier initialized with 4 participants, each
 * participant works on phase A, then calls BarrierArriveAndWait to wait until
 * all 4 participants have reached that point.  When BarrierArriveAndWait
 * returns control, each participant can work on B, and so on.  Because the
 * barrier knows how many participants to expect, the phases of computation
 * don't need labels or numbers, since each process's program counter implies
 * the current phase.  Even if some of the processes are slow to start up and
 * begin running phase A, the other participants are expecting them and will
 * patiently wait at the barrier.  The code could be written as follows:
 *
 *     perform_a();
 *     BarrierArriveAndWait(&barrier, ...);
 *     perform_b();
 *     BarrierArriveAndWait(&barrier, ...);
 *     perform_c();
 *     BarrierArriveAndWait(&barrier, ...);
 *
 * If the number of participants is not known up front, then a dynamic barrier
 * is needed and the number should be set to zero at initialization.  New
 * complications arise because the number necessarily changes over time as
 * participants attach and detach, and therefore phases B, C or even the end
 * of processing may be reached before any given participant has started
 * running and attached.  Therefore the client code must perform an initial
 * test of the phase number after attaching, because it needs to find out
 * which phase of the algorithm has been reached by any participants that are
 * already attached in order to synchronize with that work.  Once the program
 * counter or some other representation of current progress is synchronized
 * with the barrier's phase, normal control flow can be used just as in the
 * static case.  Our example could be written using a switch statement with
 * cases that fall-through, as follows:
 *
 *     phase = BarrierAttach(&barrier);
 *     switch (phase)
 *     {
 *     case PHASE_A:
 *         perform_a();
 *         BarrierArriveAndWait(&barrier, ...);
 *     case PHASE_B:
 *         perform_b();
 *         BarrierArriveAndWait(&barrier, ...);
 *     case PHASE_C:
 *         perform_c();
 *         BarrierArriveAndWait(&barrier, ...);
 *     }
 *     BarrierDetach(&barrier);
 *
 * Static barriers behave similarly to POSIX's pthread_barrier_t.  Dynamic
 * barriers behave similarly to Java's java.util.concurrent.Phaser.
 *
 * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science)
 *
 * IDENTIFICATION
 *	  src/backend/storage/ipc/barrier.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"
#include "storage/barrier.h"

static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive);

/*
 * Initialize this barrier.  To use a static party size, provide the number of
 * participants to wait for at each phase indicating that that number of
 * backends is implicitly attached.  To use a dynamic party size, specify zero
 * here and then use BarrierAttach() and
 * BarrierDetach()/BarrierArriveAndDetach() to register and deregister
 * participants explicitly.
 */
void
BarrierInit(Barrier *barrier, int participants)
{
	SpinLockInit(&barrier->mutex);
	barrier->participants = participants;
	barrier->arrived = 0;
	barrier->phase = 0;
	barrier->elected = 0;
	barrier->static_party = participants > 0;
	ConditionVariableInit(&barrier->condition_variable);
}

/*
 * Arrive at this barrier, wait for all other attached participants to arrive
 * too and then return.  Increments the current phase.  The caller must be
 * attached.
 *
 * While waiting, pg_stat_activity shows a wait_event_type and wait_event
 * controlled by the wait_event_info passed in, which should be a value from
 * one of the WaitEventXXX enums defined in pgstat.h.
 *
 * Return true in one arbitrarily chosen participant.  Return false in all
 * others.  The return code can be used to elect one participant to execute a
 * phase of work that must be done serially while other participants wait.
 */
bool
BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
{
	bool		release = false;
	bool		elected;
	int			start_phase;
	int			next_phase;

	SpinLockAcquire(&barrier->mutex);
	start_phase = barrier->phase;
	next_phase = start_phase + 1;
	++barrier->arrived;
	if (barrier->arrived == barrier->participants)
	{
		release = true;
		barrier->arrived = 0;
		barrier->phase = next_phase;
		barrier->elected = next_phase;
	}
	SpinLockRelease(&barrier->mutex);

	/*
	 * If we were the last expected participant to arrive, we can release our
	 * peers and return true to indicate that this backend has been elected to
	 * perform any serial work.
	 */
	if (release)
	{
		ConditionVariableBroadcast(&barrier->condition_variable);

		return true;
	}

	/*
	 * Otherwise we have to wait for the last participant to arrive and
	 * advance the phase.
	 */
	elected = false;
	ConditionVariablePrepareToSleep(&barrier->condition_variable);
	for (;;)
	{
		/*
		 * We know that phase must either be start_phase, indicating that we
		 * need to keep waiting, or next_phase, indicating that the last
		 * participant that we were waiting for has either arrived or detached
		 * so that the next phase has begun.  The phase cannot advance any
		 * further than that without this backend's participation, because
		 * this backend is attached.
		 */
		SpinLockAcquire(&barrier->mutex);
		Assert(barrier->phase == start_phase || barrier->phase == next_phase);
		release = barrier->phase == next_phase;
		if (release && barrier->elected != next_phase)
		{
			/*
			 * Usually the backend that arrives last and releases the other
			 * backends is elected to return true (see above), so that it can
			 * begin processing serial work while it has a CPU timeslice.
			 * However, if the barrier advanced because someone detached, then
			 * one of the backends that is awoken will need to be elected.
			 */
			barrier->elected = barrier->phase;
			elected = true;
		}
		SpinLockRelease(&barrier->mutex);
		if (release)
			break;
		ConditionVariableSleep(&barrier->condition_variable, wait_event_info);
	}
	ConditionVariableCancelSleep();

	return elected;
}

/*
 * Arrive at this barrier, but detach rather than waiting.  Returns true if
 * the caller was the last to detach.
 */
bool
BarrierArriveAndDetach(Barrier *barrier)
{
	return BarrierDetachImpl(barrier, true);
}

/*
 * Attach to a barrier.  All waiting participants will now wait for this
 * participant to call BarrierArriveAndWait(), BarrierDetach() or
 * BarrierArriveAndDetach().  Return the current phase.
 */
int
BarrierAttach(Barrier *barrier)
{
	int			phase;

	Assert(!barrier->static_party);

	SpinLockAcquire(&barrier->mutex);
	++barrier->participants;
	phase = barrier->phase;
	SpinLockRelease(&barrier->mutex);

	return phase;
}

/*
 * Detach from a barrier.  This may release other waiters from
 * BarrierArriveAndWait() and advance the phase if they were only waiting for
 * this backend.  Return true if this participant was the last to detach.
 */
bool
BarrierDetach(Barrier *barrier)
{
	return BarrierDetachImpl(barrier, false);
}

/*
 * Return the current phase of a barrier.  The caller must be attached.
 */
int
BarrierPhase(Barrier *barrier)
{
	/*
	 * It is OK to read barrier->phase without locking, because it can't
	 * change without us (we are attached to it), and we executed a memory
	 * barrier when we either attached or participated in changing it last
	 * time.
	 */
	return barrier->phase;
}

/*
 * Return an instantaneous snapshot of the number of participants currently
 * attached to this barrier.  For debugging purposes only.
 */
int
BarrierParticipants(Barrier *barrier)
{
	int			participants;

	SpinLockAcquire(&barrier->mutex);
	participants = barrier->participants;
	SpinLockRelease(&barrier->mutex);

	return participants;
}

/*
 * Detach from a barrier.  If 'arrive' is true then also increment the phase
 * if there are no other participants.  If there are other participants
 * waiting, then the phase will be advanced and they'll be released if they
 * were only waiting for the caller.  Return true if this participant was the
 * last to detach.
 */
static inline bool
BarrierDetachImpl(Barrier *barrier, bool arrive)
{
	bool		release;
	bool		last;

	Assert(!barrier->static_party);

	SpinLockAcquire(&barrier->mutex);
	Assert(barrier->participants > 0);
	--barrier->participants;

	/*
	 * If any other participants are waiting and we were the last participant
	 * waited for, release them.  If no other participants are waiting, but
	 * this is a BarrierArriveAndDetach() call, then advance the phase too.
	 */
	if ((arrive || barrier->participants > 0) &&
		barrier->arrived == barrier->participants)
	{
		release = true;
		barrier->arrived = 0;
		++barrier->phase;
	}
	else
		release = false;

	last = barrier->participants == 0;
	SpinLockRelease(&barrier->mutex);

	if (release)
		ConditionVariableBroadcast(&barrier->condition_variable);

	return last;
}

相关信息

greenplumn 源码目录

相关文章

greenplumn dsm 源码

greenplumn dsm_impl 源码

greenplumn ipc 源码

greenplumn ipci 源码

greenplumn latch 源码

greenplumn pmsignal 源码

greenplumn procarray 源码

greenplumn procsignal 源码

greenplumn shm_mq 源码

greenplumn shm_toc 源码

0  赞