airflow workflows 源码

  • 2022-10-20
  • 浏览 (344)

airflow workflows 代码

文件路径:/airflow/providers/google/cloud/sensors/workflows.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.retry import Retry
from google.cloud.workflows.executions_v1beta import Execution

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook
from airflow.sensors.base import BaseSensorOperator

if TYPE_CHECKING:
    from airflow.utils.context import Context


class WorkflowExecutionSensor(BaseSensorOperator):
    """
    Checks state of an execution for the given ``workflow_id`` and ``execution_id``.

    :param workflow_id: Required. The ID of the workflow.
    :param execution_id: Required. The ID of the execution.
    :param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
    :param location: Required. The Cloud Dataproc region in which to handle the request.
    :param success_states: Execution states to be considered as successful, by default
        it's only ``SUCCEEDED`` state
    :param failure_states: Execution states to be considered as failures, by default
        they are ``FAILED`` and ``CANCELLED`` states.
    :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
        retried.
    :param request_timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
        ``retry`` is specified, the timeout applies to each individual attempt.
    :param metadata: Additional metadata that is provided to the method.
    """

    template_fields: Sequence[str] = ("location", "workflow_id", "execution_id")

    def __init__(
        self,
        *,
        workflow_id: str,
        execution_id: str,
        location: str,
        project_id: str | None = None,
        success_states: set[Execution.State] | None = None,
        failure_states: set[Execution.State] | None = None,
        retry: Retry | _MethodDefault = DEFAULT,
        request_timeout: float | None = None,
        metadata: Sequence[tuple[str, str]] = (),
        gcp_conn_id: str = "google_cloud_default",
        impersonation_chain: str | Sequence[str] | None = None,
        **kwargs,
    ):
        super().__init__(**kwargs)

        self.success_states = success_states or {Execution.State(Execution.State.SUCCEEDED)}
        self.failure_states = failure_states or {
            Execution.State(Execution.State.FAILED),
            Execution.State(Execution.State.CANCELLED),
        }
        self.workflow_id = workflow_id
        self.execution_id = execution_id
        self.location = location
        self.project_id = project_id
        self.retry = retry
        self.request_timeout = request_timeout
        self.metadata = metadata
        self.gcp_conn_id = gcp_conn_id
        self.impersonation_chain = impersonation_chain

    def poke(self, context: Context):
        hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
        self.log.info("Checking state of execution %s for workflow %s", self.execution_id, self.workflow_id)
        execution: Execution = hook.get_execution(
            workflow_id=self.workflow_id,
            execution_id=self.execution_id,
            location=self.location,
            project_id=self.project_id,
            retry=self.retry,
            timeout=self.request_timeout,
            metadata=self.metadata,
        )

        state = execution.state
        if state in self.failure_states:
            raise AirflowException(
                f"Execution {self.execution_id} for workflow {self.execution_id} "
                f"failed and is in `{state}` state",
            )

        if state in self.success_states:
            self.log.info(
                "Execution %s for workflow %s completed with state: %s",
                self.execution_id,
                self.workflow_id,
                state,
            )
            return True

        self.log.info(
            "Execution %s for workflow %s does not completed yet, current state: %s",
            self.execution_id,
            self.workflow_id,
            state,
        )
        return False

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow bigquery 源码

airflow bigquery_dts 源码

airflow bigtable 源码

airflow cloud_storage_transfer_service 源码

airflow dataflow 源码

airflow dataform 源码

airflow datafusion 源码

airflow dataplex 源码

airflow dataproc 源码

0  赞