greenplumn CJobTest 源码

  • 2022-08-18
  • 浏览 (407)

greenplumn CJobTest 代码

文件路径:/src/backend/gporca/libgpopt/src/search/CJobTest.cpp

//---------------------------------------------------------------------------
//	Greenplum Database
//	Copyright (C) 2011 EMC Corp.
//
//	@filename:
//		CJobTest.cpp
//
//	@doc:
//		Implementation of optimizer job test class
//---------------------------------------------------------------------------

#include "gpopt/search/CJobTest.h"

#include "gpos/io/COstreamString.h"
#include "gpos/string/CWStringDynamic.h"

#include "gpopt/search/CJobFactory.h"
#include "gpopt/search/CScheduler.h"
#include "gpopt/search/CSchedulerContext.h"

using namespace gpopt;
using namespace gpos;

#define GPOPT_JOB_TEST_DUMMY_CONST 45

// initialization of static members
ULONG_PTR CJobTest::m_ulpCnt;

//---------------------------------------------------------------------------
//	@function:
//		CJobTest::CJobTest
//
//	@doc:
//		Ctor
//
//---------------------------------------------------------------------------
CJobTest::CJobTest() = default;


//---------------------------------------------------------------------------
//	@function:
//		CJobTest::~CJobTest
//
//	@doc:
//		Dtor
//
//---------------------------------------------------------------------------
CJobTest::~CJobTest() = default;


//---------------------------------------------------------------------------
//	@function:
//		CJobTest::FExecute
//
//	@doc:
//		Execution of test job
//
//---------------------------------------------------------------------------
BOOL
CJobTest::FExecute(CSchedulerContext *psc)
{
	BOOL fRes = false;

	switch (m_ett)
	{
		case EttSpawn:
			fRes = FSpawn(psc);
			break;

		case EttQueueu:
			fRes = FQueue(psc);
			break;

		case EttStartQueue:
			fRes = FStartQueue(psc);
			break;
	}

	return fRes;
}


//---------------------------------------------------------------------------
//	@function:
//		CJobTest::FSpawn
//
//	@doc:
//		Test job spawning
//
//---------------------------------------------------------------------------
BOOL
CJobTest::FSpawn(CSchedulerContext *psc)
{
	ULONG_PTR ulpOffset = m_ulpCnt++;

#ifdef GPOS_DEBUG
	if (10 == ulpOffset && psc->Psched()->FTrackingJobs())
	{
		CWStringDynamic str(psc->GetGlobalMemoryPool());
		COstreamString oss(&str);

		psc->Psched()->OsPrintActiveJobs(oss);

		GPOS_TRACE(str.GetBuffer());
	}
#endif	// GPOS_DEBUG

	if (m_ulRounds > ulpOffset)
	{
		for (ULONG i = 0; i < m_ulFanout; i++)
		{
			// get new job from factory
			CJob *pj = psc->Pjf()->PjCreate(CJob::EjtTest);

			// initialize test job
			CJobTest *pjt = PjConvert(pj);
			pjt->Init(this);

			// schedule new job for execution as child
			psc->Psched()->Add(pj, this);

			GPOS_CHECK_ABORT;
		}

		// after forking jobs burn a few CPU cycles
		// to simulate an actual transformation
		Loop();

		return false;
	}

	return true;
}


//---------------------------------------------------------------------------
//	@function:
//		CJobTest::FStartQueue
//
//	@doc:
//		Start jobs to be queued
//
//---------------------------------------------------------------------------
BOOL
CJobTest::FStartQueue(CSchedulerContext *psc)
{
	ULONG_PTR ulpOffset = m_ulpCnt++;

	if (0 == ulpOffset)
	{
		for (ULONG i = 0; i < m_ulFanout; i++)
		{
			// get new job from factory
			CJob *pj = psc->Pjf()->PjCreate(CJob::EjtTest);

			// initialize test job
			CJobTest *pjt = PjConvert(pj);
			pjt->Init(CJobTest::EttQueueu, m_ulRounds, m_ulFanout, m_ulIters,
					  m_pjq);

			// schedule new job for execution as child
			psc->Psched()->Add(pj, this);

			GPOS_CHECK_ABORT;
		}

		return false;
	}

	return true;
}


//---------------------------------------------------------------------------
//	@function:
//		CJobTest::FQueue
//
//	@doc:
//		Test job queueing
//
//---------------------------------------------------------------------------
BOOL
CJobTest::FQueue(CSchedulerContext *psc)
{
	BOOL fCompleted = true;

	switch (m_pjq->EjqrAdd(this))
	{
		case CJobQueue::EjqrMain:
			if (10 > m_ulFanout)
			{
				GPOS_TRACE(GPOS_WSZ_LIT("Queued job is not executing -> run"));
			}
			Loop();
#ifdef GPOS_DEBUG
			if (10 > m_ulFanout)
			{
				CWStringDynamic str(psc->GetGlobalMemoryPool());
				COstreamString oss(&str);
				m_pjq->OsPrintQueuedJobs(oss);

				GPOS_TRACE(str.GetBuffer());
			}
#endif	// GPOS_DEBUG
			m_pjq->NotifyCompleted(psc);
			break;
			;

		case CJobQueue::EjqrQueued:
			if (10 > m_ulFanout)
			{
				GPOS_TRACE(GPOS_WSZ_LIT("Queued job is executing -> wait"));
			}
			fCompleted = false;
			break;

		case CJobQueue::EjqrCompleted:
			if (10 > m_ulFanout)
			{
				GPOS_TRACE(GPOS_WSZ_LIT("Queued job has already completed"));
			}
			break;
	}

	return fCompleted;
}


//---------------------------------------------------------------------------
//	@function:
//		CJobTest::Loop
//
//	@doc:
//		Burn some CPU to simulate actual work
//
//---------------------------------------------------------------------------
void
CJobTest::Loop() const
{
	ULONG ulOuter = 0;
	while (ulOuter < m_ulIters)
	{
		for (ULONG ulInner = ulOuter; ulInner > 0; ulInner--)
		{
			if (0 < ulOuter * (ulOuter + GPOPT_JOB_TEST_DUMMY_CONST))
			{
				ulOuter++;
			}
		}
		ulOuter++;

		GPOS_CHECK_ABORT;
	}
}


#ifdef GPOS_DEBUG

//---------------------------------------------------------------------------
//	@function:
//		CJobTest::OsPrint
//
//	@doc:
//		Print job
//
//---------------------------------------------------------------------------
IOstream &
CJobTest::OsPrint(IOstream &os) const
{
	os << "Test job, ";
	return CJob::OsPrint(os);
}

#endif	// GPOS_DEBUG

// EOF

相关信息

greenplumn 源码目录

相关文章

greenplumn CBinding 源码

greenplumn CGroup 源码

greenplumn CGroupExpression 源码

greenplumn CGroupProxy 源码

greenplumn CJob 源码

greenplumn CJobFactory 源码

greenplumn CJobGroup 源码

greenplumn CJobGroupExploration 源码

greenplumn CJobGroupExpression 源码

greenplumn CJobGroupExpressionExploration 源码

0  赞