greenplumn CTask 源码

  • 2022-08-18
  • 浏览 (399)

greenplumn CTask 代码

文件路径:/src/backend/gporca/libgpos/include/gpos/task/CTask.h

//---------------------------------------------------------------------------
//	Greenplum Database
//	Copyright (C) 2008 - 2010 Greenplum, Inc.
//
//	@filename:
//		CTask.h
//
//	@doc:
//		Interface class for task abstraction
//---------------------------------------------------------------------------
#ifndef GPOS_CTask_H
#define GPOS_CTask_H

#include "gpos/base.h"
#include "gpos/common/CList.h"
#include "gpos/error/CErrorContext.h"
#include "gpos/error/CException.h"
#include "gpos/memory/CMemoryPoolManager.h"
#include "gpos/task/CTaskContext.h"
#include "gpos/task/CTaskId.h"
#include "gpos/task/CTaskLocalStorage.h"
#include "gpos/task/ITask.h"

namespace gpos
{
//---------------------------------------------------------------------------
//	@class:
//		CTask
//
//	@doc:
//		Interface to abstract task (work unit);
//		provides asynchronous task execution and error handling;
//
//---------------------------------------------------------------------------
class CTask : public ITask
{
	friend class CAutoTaskProxy;
	friend class CAutoTaskProxyTest;
	friend class CTaskSchedulerFifo;
	friend class CWorker;
	friend class CWorkerPoolManager;
	friend class CUnittest;
	friend class CAutoSuspendAbort;

private:
	// task memory pool -- exclusively used by this task
	CMemoryPool *m_mp;

	// task context
	CTaskContext *m_task_ctxt;

	// error context
	IErrorContext *m_err_ctxt;

	// error handler stack
	CErrorHandler *m_err_handle;

	// function to execute
	void *(*m_func)(void *);

	// function argument
	void *m_arg;

	// function result
	void *m_res;

	// TLS
	CTaskLocalStorage m_tls;

	// task status
	ETaskStatus m_status;

	// cancellation flag
	BOOL *m_cancel;

	// local cancellation flag; used when no flag is externally passed
	BOOL m_cancel_local;

	// counter of requests to suspend cancellation
	ULONG m_abort_suspend_count;

	// flag denoting task completion report
	BOOL m_reported;

	// task identifier
	CTaskId m_tid;

	// ctor
	CTask(CMemoryPool *mp, CTaskContext *task_ctxt, IErrorContext *err_ctxt,
		  BOOL *cancel);

	// binding a task structure to a function and its arguments
	void Bind(void *(*func)(void *), void *arg);

	// execution, called by the owning worker
	void Execute();

	// check if task has been scheduled
	BOOL IsScheduled() const;

	// check if task finished executing
	BOOL IsFinished() const;

	// check if task is currently executing
	BOOL
	IsRunning() const
	{
		return EtsRunning == m_status;
	}

	// reported flag accessor
	BOOL
	IsReported() const
	{
		return m_reported;
	}

	// set reported flag
	void
	SetReported()
	{
		GPOS_ASSERT(!m_reported && "Task already reported as completed");

		m_reported = true;
	}

public:
	CTask(const CTask &) = delete;

	// dtor
	~CTask() override;

	// accessor for memory pool, e.g. used for allocating task parameters in
	CMemoryPool *
	Pmp() const override
	{
		return m_mp;
	}

	// TLS accessor
	CTaskLocalStorage &
	GetTls() override
	{
		return m_tls;
	}

	// task id accessor
	CTaskId &
	GetTid()
	{
		return m_tid;
	}

	// task context accessor
	CTaskContext *
	GetTaskCtxt() const override
	{
		return m_task_ctxt;
	}

	// basic output streams
	ILogger *
	GetOutputLogger() const override
	{
		return this->m_task_ctxt->GetOutputLogger();
	}

	ILogger *
	GetErrorLogger() const override
	{
		return this->m_task_ctxt->GetErrorLogger();
	}

	BOOL
	SetTrace(ULONG trace, BOOL val) override
	{
		return this->m_task_ctxt->SetTrace(trace, val);
	}

	BOOL
	IsTraceSet(ULONG trace) override
	{
		return this->m_task_ctxt->IsTraceSet(trace);
	}


	// locale
	ELocale
	Locale() const override
	{
		return m_task_ctxt->Locale();
	}

	// check if task is canceled
	BOOL
	IsCanceled() const
	{
		return *m_cancel;
	}

	// reset cancel flag
	void
	ResetCancel()
	{
		*m_cancel = false;
	}

	// set cancel flag
	void
	Cancel()
	{
		*m_cancel = true;
	}

	// check if a request to suspend abort was received
	BOOL
	IsAbortSuspended() const
	{
		return (0 < m_abort_suspend_count);
	}

	// increment counter for requests to suspend abort
	void
	SuspendAbort()
	{
		m_abort_suspend_count++;
	}

	// decrement counter for requests to suspend abort
	void ResumeAbort();

	// task status accessor
	ETaskStatus
	GetStatus() const
	{
		return m_status;
	}

	// set task status
	void SetStatus(ETaskStatus status);

	// task result accessor
	void *
	GetRes() const
	{
		return m_res;
	}

	// error context
	IErrorContext *
	GetErrCtxt() const override
	{
		return m_err_ctxt;
	}

	// error context
	CErrorContext *
	ConvertErrCtxt()
	{
		return dynamic_cast<CErrorContext *>(m_err_ctxt);
	}

	// pending exceptions
	BOOL
	HasPendingExceptions() const override
	{
		return m_err_ctxt->IsPending();
	}

#ifdef GPOS_DEBUG
	// check if task has expected status
	BOOL CheckStatus(BOOL completed);
#endif	// GPOS_DEBUG

	// slink for auto task proxy
	SLink m_proxy_link;

	// slink for task scheduler
	SLink m_task_scheduler_link;

	// slink for worker pool manager
	SLink m_worker_pool_manager_link;

	static CTask *
	Self()
	{
		return dynamic_cast<CTask *>(ITask::Self());
	}

};	// class CTask

}  // namespace gpos

#endif	// !GPOS_CTask_H

// EOF

相关信息

greenplumn 源码目录

相关文章

greenplumn CAutoSuspendAbort 源码

greenplumn CAutoTaskProxy 源码

greenplumn CAutoTraceFlag 源码

greenplumn CTaskContext 源码

greenplumn CTaskId 源码

greenplumn CTaskLocalStorage 源码

greenplumn CTaskLocalStorageObject 源码

greenplumn CTaskSchedulerFifo 源码

greenplumn CTraceFlagIter 源码

greenplumn CWorker 源码

0  赞