greenplumn CJobQueue 源码

  • 2022-08-18
  • 浏览 (340)

greenplumn CJobQueue 代码

文件路径:/src/backend/gporca/libgpopt/src/search/CJobQueue.cpp

//---------------------------------------------------------------------------
//	Greenplum Database
//	Copyright (C) 2011 EMC Corp.
//
//	@filename:
//		CJobQueue.cpp
//
//	@doc:
//		Implementation of class controlling unique execution of an operation
//		that is potentially assigned to many jobs.

//	@owner:
//
//
//	@test:
//
//
//---------------------------------------------------------------------------

#include "gpopt/search/CJobQueue.h"

#include "gpopt/search/CJobFactory.h"
#include "gpopt/search/CScheduler.h"
#include "gpopt/search/CSchedulerContext.h"

using namespace gpos;
using namespace gpopt;


//---------------------------------------------------------------------------
//	@function:
//		CJobQueue::EjqrAdd
//
//	@doc:
//		Add job as a waiter;
//
//---------------------------------------------------------------------------
CJobQueue::EJobQueueResult
CJobQueue::EjqrAdd(CJob *pj)
{
	GPOS_ASSERT(nullptr != pj);

	EJobQueueResult ejer = EjqrCompleted;

	// check if job has completed before getting the lock
	if (!m_fCompleted)
	{
		// check if this is the main job
		if (pj == m_pj)
		{
			GPOS_ASSERT(!m_fCompleted);
			ejer = EjqrMain;
			pj->IncRefs();
		}
		else
		{
			// check if job is completed
			if (!m_fCompleted)
			{
				m_listjQueued.Append(pj);
				BOOL fOwner = (pj == m_listjQueued.First());

				// first caller becomes the owner
				if (fOwner)
				{
					GPOS_ASSERT(nullptr == m_pj);

					m_pj = pj;
					ejer = EjqrMain;
				}
				else
				{
					ejer = EjqrQueued;
				}

				pj->IncRefs();
			}
		}
	}

	return ejer;
}


//---------------------------------------------------------------------------
//	@function:
//		CJobQueue::NotifyCompleted
//
//	@doc:
//		Notify waiting jobs of job completion
//
//---------------------------------------------------------------------------
void
CJobQueue::NotifyCompleted(CSchedulerContext *psc)
{
	GPOS_ASSERT(!m_fCompleted);
	m_fCompleted = true;

	GPOS_ASSERT(!m_listjQueued.IsEmpty());
	while (!m_listjQueued.IsEmpty())
	{
		CJob *pj = m_listjQueued.RemoveHead();

		// check if job execution has completed
		if (1 == pj->UlpDecrRefs())
		{
			// update job as completed
			psc->Psched()->CompleteQueued(pj);

			// recycle job
			psc->Pjf()->Release(pj);
		}
	}
}

#ifdef GPOS_DEBUG

//---------------------------------------------------------------------------
//	@function:
//		CJobQueue::OsPrintQueuedJobs
//
//	@doc:
//		Print queue - not thread-safe
//
//---------------------------------------------------------------------------
IOstream &
CJobQueue::OsPrintQueuedJobs(IOstream &os)
{
	os << "Job queue: " << std::endl;

	CJob *pj = m_listjQueued.First();
	while (nullptr != pj)
	{
		pj->OsPrint(os);
		pj = m_listjQueued.Next(pj);
	}

	return os;
}

#endif	// GPOS_DEBUG

// EOF

相关信息

greenplumn 源码目录

相关文章

greenplumn CBinding 源码

greenplumn CGroup 源码

greenplumn CGroupExpression 源码

greenplumn CGroupProxy 源码

greenplumn CJob 源码

greenplumn CJobFactory 源码

greenplumn CJobGroup 源码

greenplumn CJobGroupExploration 源码

greenplumn CJobGroupExpression 源码

greenplumn CJobGroupExpressionExploration 源码

0  赞