airflow functions 源码

  • 2022-10-20
  • 浏览 (246)

airflow functions 代码

文件路径:/airflow/providers/google/cloud/hooks/functions.py

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains a Google Cloud Functions Hook."""
from __future__ import annotations

import time
from typing import Any, Optional, Sequence

import requests
from googleapiclient.discovery import build

from airflow.exceptions import AirflowException
from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook

# Time to sleep between active checks of the operation results
TIME_TO_SLEEP_IN_SECONDS = 1


class CloudFunctionsHook(GoogleBaseHook):
    """
    Hook for the Google Cloud Functions APIs.

    All the methods in the hook where project_id is used must be called with
    keyword arguments rather than positional.
    """

    _conn = None  # type: Optional[Any]

    def __init__(
        self,
        api_version: str,
        gcp_conn_id: str = "google_cloud_default",
        delegate_to: str | None = None,
        impersonation_chain: str | Sequence[str] | None = None,
    ) -> None:
        super().__init__(
            gcp_conn_id=gcp_conn_id,
            delegate_to=delegate_to,
            impersonation_chain=impersonation_chain,
        )
        self.api_version = api_version

    @staticmethod
    def _full_location(project_id: str, location: str) -> str:
        """
        Retrieve full location of the function in the form of
        ``projects/<GCP_PROJECT_ID>/locations/<GCP_LOCATION>``

        :param project_id: The Google Cloud Project project_id where the function belongs.
        :param location: The location where the function is created.
        :return:
        """
        return f'projects/{project_id}/locations/{location}'

    def get_conn(self) -> build:
        """
        Retrieves the connection to Cloud Functions.

        :return: Google Cloud Function services object.
        :rtype: dict
        """
        if not self._conn:
            http_authorized = self._authorize()
            self._conn = build(
                'cloudfunctions', self.api_version, http=http_authorized, cache_discovery=False
            )
        return self._conn

    def get_function(self, name: str) -> dict:
        """
        Returns the Cloud Function with the given name.

        :param name: Name of the function.
        :return: A Cloud Functions object representing the function.
        :rtype: dict
        """
        # fmt: off
        return self.get_conn().projects().locations().functions().get(
            name=name).execute(num_retries=self.num_retries)
        # fmt: on

    @GoogleBaseHook.fallback_to_default_project_id
    def create_new_function(self, location: str, body: dict, project_id: str) -> None:
        """
        Creates a new function in Cloud Function in the location specified in the body.

        :param location: The location of the function.
        :param body: The body required by the Cloud Functions insert API.
        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
            If set to None or missing, the default project_id from the Google Cloud connection is used.
        :return: None
        """
        # fmt: off
        response = self.get_conn().projects().locations().functions().create(
            location=self._full_location(project_id, location),
            body=body
        ).execute(num_retries=self.num_retries)
        # fmt: on
        operation_name = response["name"]
        self._wait_for_operation_to_complete(operation_name=operation_name)

    def update_function(self, name: str, body: dict, update_mask: list[str]) -> None:
        """
        Updates Cloud Functions according to the specified update mask.

        :param name: The name of the function.
        :param body: The body required by the cloud function patch API.
        :param update_mask: The update mask - array of fields that should be patched.
        :return: None
        """
        # fmt: off
        response = self.get_conn().projects().locations().functions().patch(
            updateMask=",".join(update_mask),
            name=name,
            body=body
        ).execute(num_retries=self.num_retries)
        # fmt: on
        operation_name = response["name"]
        self._wait_for_operation_to_complete(operation_name=operation_name)

    @GoogleBaseHook.fallback_to_default_project_id
    def upload_function_zip(self, location: str, zip_path: str, project_id: str) -> str:
        """
        Uploads zip file with sources.

        :param location: The location where the function is created.
        :param zip_path: The path of the valid .zip file to upload.
        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
            If set to None or missing, the default project_id from the Google Cloud connection is used.
        :return: The upload URL that was returned by generateUploadUrl method.
        :rtype: str
        """
        # fmt: off

        response = \
            self.get_conn().projects().locations().functions().generateUploadUrl(
                parent=self._full_location(project_id, location)
            ).execute(num_retries=self.num_retries)
        # fmt: on

        upload_url = response.get('uploadUrl')
        with open(zip_path, 'rb') as file:
            requests.put(
                url=upload_url,
                data=file,
                # Those two headers needs to be specified according to:
                # https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/generateUploadUrl
                # nopep8
                headers={
                    'Content-type': 'application/zip',
                    'x-goog-content-length-range': '0,104857600',
                },
            )
        return upload_url

    def delete_function(self, name: str) -> None:
        """
        Deletes the specified Cloud Function.

        :param name: The name of the function.
        :return: None
        """
        # fmt: off
        response = self.get_conn().projects().locations().functions().delete(
            name=name).execute(num_retries=self.num_retries)
        # fmt: on
        operation_name = response["name"]
        self._wait_for_operation_to_complete(operation_name=operation_name)

    @GoogleBaseHook.fallback_to_default_project_id
    def call_function(
        self,
        function_id: str,
        input_data: dict,
        location: str,
        project_id: str = PROVIDE_PROJECT_ID,
    ) -> dict:
        """
        Synchronously invokes a deployed Cloud Function. To be used for testing
        purposes as very limited traffic is allowed.

        :param function_id: ID of the function to be called
        :param input_data: Input to be passed to the function
        :param location: The location where the function is located.
        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
            If set to None or missing, the default project_id from the Google Cloud connection is used.
        :return: None
        """
        name = f"projects/{project_id}/locations/{location}/functions/{function_id}"
        # fmt: off
        response = self.get_conn().projects().locations().functions().call(
            name=name,
            body=input_data
        ).execute(num_retries=self.num_retries)
        # fmt: on
        if 'error' in response:
            raise AirflowException(response['error'])
        return response

    def _wait_for_operation_to_complete(self, operation_name: str) -> dict:
        """
        Waits for the named operation to complete - checks status of the
        asynchronous call.

        :param operation_name: The name of the operation.
        :return: The response returned by the operation.
        :rtype: dict
        :exception: AirflowException in case error is returned.
        """
        service = self.get_conn()
        while True:
            # fmt: off
            operation_response = service.operations().get(
                name=operation_name,
            ).execute(num_retries=self.num_retries)
            # fmt: on
            if operation_response.get("done"):
                response = operation_response.get("response")
                error = operation_response.get("error")
                # Note, according to documentation always either response or error is
                # set when "done" == True
                if error:
                    raise AirflowException(str(error))
                return response
            time.sleep(TIME_TO_SLEEP_IN_SECONDS)

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow automl 源码

airflow bigquery 源码

airflow bigquery_dts 源码

airflow bigtable 源码

airflow cloud_build 源码

airflow cloud_composer 源码

airflow cloud_memorystore 源码

airflow cloud_sql 源码

airflow cloud_storage_transfer_service 源码

0  赞