airflow provider_command 源码

  • 2022-10-20
  • 浏览 (410)

airflow provider_command 代码

文件路径:/airflow/cli/commands/provider_command.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Providers sub-commands"""
from __future__ import annotations

import re

from airflow.cli.simple_table import AirflowConsole
from airflow.providers_manager import ProvidersManager
from airflow.utils.cli import suppress_logs_and_warning

ERROR_IMPORTING_HOOK = "Error when importing hook!"


def _remove_rst_syntax(value: str) -> str:
    return re.sub("[`_<>]", "", value.strip(" \n."))


@suppress_logs_and_warning
def provider_get(args):
    """Get a provider info."""
    providers = ProvidersManager().providers
    if args.provider_name in providers:
        provider_version = providers[args.provider_name].version
        provider_info = providers[args.provider_name].data
        if args.full:
            provider_info["description"] = _remove_rst_syntax(provider_info["description"])
            AirflowConsole().print_as(
                data=[provider_info],
                output=args.output,
            )
        else:
            print(f"Provider: {args.provider_name}")
            print(f"Version: {provider_version}")
    else:
        raise SystemExit(f"No such provider installed: {args.provider_name}")


@suppress_logs_and_warning
def providers_list(args):
    """Lists all providers at the command line"""
    AirflowConsole().print_as(
        data=list(ProvidersManager().providers.values()),
        output=args.output,
        mapper=lambda x: {
            "package_name": x.data["package-name"],
            "description": _remove_rst_syntax(x.data["description"]),
            "version": x.version,
        },
    )


@suppress_logs_and_warning
def hooks_list(args):
    """Lists all hooks at the command line"""
    AirflowConsole().print_as(
        data=list(ProvidersManager().hooks.items()),
        output=args.output,
        mapper=lambda x: {
            "connection_type": x[0],
            "class": x[1].hook_class_name if x[1] else ERROR_IMPORTING_HOOK,
            "conn_id_attribute_name": x[1].connection_id_attribute_name if x[1] else ERROR_IMPORTING_HOOK,
            'package_name': x[1].package_name if x[1] else ERROR_IMPORTING_HOOK,
            'hook_name': x[1].hook_name if x[1] else ERROR_IMPORTING_HOOK,
        },
    )


@suppress_logs_and_warning
def connection_form_widget_list(args):
    """Lists all custom connection form fields at the command line"""
    AirflowConsole().print_as(
        data=list(sorted(ProvidersManager().connection_form_widgets.items())),
        output=args.output,
        mapper=lambda x: {
            "connection_parameter_name": x[0],
            "class": x[1].hook_class_name,
            'package_name': x[1].package_name,
            'field_type': x[1].field.field_class.__name__,
        },
    )


@suppress_logs_and_warning
def connection_field_behaviours(args):
    """Lists field behaviours"""
    AirflowConsole().print_as(
        data=list(ProvidersManager().field_behaviours.keys()),
        output=args.output,
        mapper=lambda x: {
            "field_behaviours": x,
        },
    )


@suppress_logs_and_warning
def extra_links_list(args):
    """Lists all extra links at the command line"""
    AirflowConsole().print_as(
        data=ProvidersManager().extra_links_class_names,
        output=args.output,
        mapper=lambda x: {
            "extra_link_class_name": x,
        },
    )


@suppress_logs_and_warning
def logging_list(args):
    """Lists all log task handlers at the command line"""
    AirflowConsole().print_as(
        data=list(ProvidersManager().logging_class_names),
        output=args.output,
        mapper=lambda x: {
            "logging_class_name": x,
        },
    )


@suppress_logs_and_warning
def secrets_backends_list(args):
    """Lists all secrets backends at the command line"""
    AirflowConsole().print_as(
        data=list(ProvidersManager().secrets_backend_class_names),
        output=args.output,
        mapper=lambda x: {
            "secrets_backend_class_name": x,
        },
    )


@suppress_logs_and_warning
def auth_backend_list(args):
    """Lists all API auth backend modules at the command line"""
    AirflowConsole().print_as(
        data=list(ProvidersManager().auth_backend_module_names),
        output=args.output,
        mapper=lambda x: {
            "api_auth_backand_module": x,
        },
    )

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow celery_command 源码

airflow cheat_sheet_command 源码

airflow config_command 源码

airflow connection_command 源码

airflow dag_command 源码

airflow dag_processor_command 源码

airflow db_command 源码

airflow info_command 源码

airflow jobs_command 源码

0  赞