airflow custom_param_types 源码

  • 2022-10-20
  • 浏览 (287)

airflow custom_param_types 代码

文件路径:/dev/breeze/src/airflow_breeze/utils/custom_param_types.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from dataclasses import dataclass
from re import match
from typing import Any, Sequence

import click
from click import Context, Parameter

from airflow_breeze.utils.cache import (
    check_if_values_allowed,
    read_and_validate_value_from_cache,
    read_from_cache_file,
    write_to_cache_file,
)
from airflow_breeze.utils.confirm import set_forced_answer
from airflow_breeze.utils.console import get_console
from airflow_breeze.utils.recording import generating_command_images


class BetterChoice(click.Choice):
    """
    Nicer formatted choice class for click. We have a lot of parameters sometimes, and formatting
    them without spaces causes ugly artifacts as the words are broken. This one adds spaces so
    that when the long list of choices does not wrap on words.
    """

    name = "BetterChoice"

    def __init__(self, *args):
        super().__init__(*args)
        self.all_choices: Sequence[str] = self.choices

    def get_metavar(self, param) -> str:
        choices_str = " | ".join(self.all_choices)
        # Use curly braces to indicate a required argument.
        if param.required and param.param_type_name == "argument":
            return f"{{{choices_str}}}"

        if param.param_type_name == "argument" and param.nargs == -1:
            # avoid double [[ for multiple args
            return f"{choices_str}"

        # Use square braces to indicate an option or optional argument.
        return f"[{choices_str}]"


class NotVerifiedBetterChoice(BetterChoice):
    """
    This parameter allows to pass parameters that do not pass verification by choice. This is
    useful to keep autocomplete working but also to allow some extra parameters that are dynamic,
    for example allowing glob in package names for docs building.
    """

    name = "NotVerifiedBetterChoice"

    def convert(self, value: Any, param: Parameter | None, ctx: Context | None) -> Any:
        # Match through normalization and case sensitivity
        # first do token_normalize_func, then lowercase
        normed_value = value
        normed_choices = {choice: choice for choice in self.choices}

        if ctx is not None and ctx.token_normalize_func is not None:
            normed_value = ctx.token_normalize_func(value)
            normed_choices = {
                ctx.token_normalize_func(normed_choice): original
                for normed_choice, original in normed_choices.items()
            }

        if not self.case_sensitive:
            normed_value = normed_value.casefold()
            normed_choices = {
                normed_choice.casefold(): original for normed_choice, original in normed_choices.items()
            }

        if normed_value in normed_choices:
            return normed_choices[normed_value]
        return normed_value


class AnswerChoice(BetterChoice):
    """
    Stores forced answer if it has been selected
    """

    name = "AnswerChoice"

    def convert(self, value, param, ctx):
        set_forced_answer(value)
        return super().convert(value, param, ctx)


@dataclass
class CacheableDefault:
    value: Any

    def __repr__(self):
        return self.value


class CacheableChoice(click.Choice):
    """
    This class implements caching of values from the last use.
    """

    def convert(self, value, param, ctx):
        param_name = param.envvar if param.envvar else param.name.upper()
        if isinstance(value, CacheableDefault):
            is_cached, new_value = read_and_validate_value_from_cache(param_name, value.value)
            if not is_cached:
                get_console().print(f"\n[info]Default value of {param.name} parameter {new_value} used.[/]\n")
        else:
            allowed, allowed_values = check_if_values_allowed(param_name, value)
            if allowed:
                new_value = value
                write_to_cache_file(param_name, new_value, check_allowed_values=False)
            else:
                new_value = allowed_values[0]
                get_console().print(
                    f"\n[warning]The value {value} is not allowed for parameter {param.name}. "
                    f"Setting default value to {new_value}"
                )
                write_to_cache_file(param_name, new_value, check_allowed_values=False)
        return super().convert(new_value, param, ctx)

    def get_metavar(self, param) -> str:
        param_name = param.envvar if param.envvar else param.name.upper()
        current_value = (
            read_from_cache_file(param_name) if not generating_command_images() else param.default.value
        )
        if not current_value:
            current_choices = self.choices
        else:
            current_choices = [
                f">{choice}<" if choice == current_value else choice for choice in self.choices
            ]
        choices_str = " | ".join(current_choices)
        # Use curly braces to indicate a required argument.
        if param.required and param.param_type_name == "argument":
            return f"{{{choices_str}}}"

        if param.param_type_name == "argument" and param.nargs == -1:
            # avoid double [[ for multiple args
            return f"{choices_str}"

        # Use square braces to indicate an option or optional argument.
        return f"[{choices_str}]"

    def __init__(self, choices, case_sensitive: bool = True) -> None:
        super().__init__(choices=choices, case_sensitive=case_sensitive)


class UseAirflowVersionType(BetterChoice):
    """Extends choice with dynamic version number."""

    def __init__(self, *args):
        super().__init__(*args)
        self.all_choices = [*self.choices, "<airflow_version>"]

    def convert(self, value, param, ctx):
        if match(r"^\d*\.\d*\.\d*\S*$", value):
            return value
        return super().convert(value, param, ctx)

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow cache 源码

airflow ci_group 源码

airflow click_utils 源码

airflow common_options 源码

airflow confirm 源码

airflow console 源码

airflow docker_command_utils 源码

airflow find_newer_dependencies 源码

airflow github_actions 源码

0  赞