airflow secret 源码

  • 2022-10-20
  • 浏览 (453)

airflow secret 代码

文件路径:/airflow/kubernetes/secret.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Classes for interacting with Kubernetes API"""
from __future__ import annotations

import copy
import uuid

from kubernetes.client import models as k8s

from airflow.exceptions import AirflowConfigException
from airflow.kubernetes.k8s_model import K8SModel


class Secret(K8SModel):
    """Defines Kubernetes Secret Volume"""

    def __init__(self, deploy_type, deploy_target, secret, key=None, items=None):
        """
        Initialize a Kubernetes Secret Object. Used to track requested secrets from
        the user.

        :param deploy_type: The type of secret deploy in Kubernetes, either `env` or
            `volume`
        :param deploy_target: (Optional) The environment variable when
            `deploy_type` `env` or file path when `deploy_type` `volume` where
            expose secret. If `key` is not provided deploy target should be None.
        :param secret: Name of the secrets object in Kubernetes
        :param key: (Optional) Key of the secret within the Kubernetes Secret
            if not provided in `deploy_type` `env` it will mount all secrets in object
        :param items: (Optional) items that can be added to a volume secret for specifying projects of
        secret keys to paths
        https://kubernetes.io/docs/concepts/configuration/secret/#projection-of-secret-keys-to-specific-paths
        """
        if deploy_type not in ('env', 'volume'):
            raise AirflowConfigException("deploy_type must be env or volume")

        self.deploy_type = deploy_type
        self.deploy_target = deploy_target
        self.items = items or []

        if deploy_target is not None and deploy_type == 'env':
            # if deploying to env, capitalize the deploy target
            self.deploy_target = deploy_target.upper()

        if key is not None and deploy_target is None:
            raise AirflowConfigException('If `key` is set, `deploy_target` should not be None')

        self.secret = secret
        self.key = key

    def to_env_secret(self) -> k8s.V1EnvVar:
        """Stores es environment secret"""
        return k8s.V1EnvVar(
            name=self.deploy_target,
            value_from=k8s.V1EnvVarSource(
                secret_key_ref=k8s.V1SecretKeySelector(name=self.secret, key=self.key)
            ),
        )

    def to_env_from_secret(self) -> k8s.V1EnvFromSource:
        """Reads from environment to secret"""
        return k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name=self.secret))

    def to_volume_secret(self) -> tuple[k8s.V1Volume, k8s.V1VolumeMount]:
        """Converts to volume secret"""
        vol_id = f'secretvol{uuid.uuid4()}'
        volume = k8s.V1Volume(name=vol_id, secret=k8s.V1SecretVolumeSource(secret_name=self.secret))
        if self.items:
            volume.secret.items = self.items
        return (volume, k8s.V1VolumeMount(mount_path=self.deploy_target, name=vol_id, read_only=True))

    def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod:
        """Attaches to pod"""
        cp_pod = copy.deepcopy(pod)

        if self.deploy_type == 'volume':
            volume, volume_mount = self.to_volume_secret()
            if cp_pod.spec.volumes is None:
                cp_pod.spec.volumes = []
            cp_pod.spec.volumes.append(volume)
            if cp_pod.spec.containers[0].volume_mounts is None:
                cp_pod.spec.containers[0].volume_mounts = []
            cp_pod.spec.containers[0].volume_mounts.append(volume_mount)

        if self.deploy_type == 'env' and self.key is not None:
            env = self.to_env_secret()
            if cp_pod.spec.containers[0].env is None:
                cp_pod.spec.containers[0].env = []
            cp_pod.spec.containers[0].env.append(env)

        if self.deploy_type == 'env' and self.key is None:
            env_from = self.to_env_from_secret()
            if cp_pod.spec.containers[0].env_from is None:
                cp_pod.spec.containers[0].env_from = []
            cp_pod.spec.containers[0].env_from.append(env_from)

        return cp_pod

    def __eq__(self, other):
        return (
            self.deploy_type == other.deploy_type
            and self.deploy_target == other.deploy_target
            and self.secret == other.secret
            and self.key == other.key
        )

    def __repr__(self):
        return f'Secret({self.deploy_type}, {self.deploy_target}, {self.secret}, {self.key})'

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow k8s_model 源码

airflow kube_client 源码

airflow kube_config 源码

airflow kubernetes_helper_functions 源码

airflow pod 源码

airflow pod_generator 源码

airflow pod_generator_deprecated 源码

airflow pod_launcher 源码

airflow pod_launcher_deprecated 源码

0  赞