airflow info_command 源码

  • 2022-10-20
  • 浏览 (314)

airflow info_command 代码

文件路径:/airflow/cli/commands/info_command.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Config sub-commands"""
from __future__ import annotations

import locale
import logging
import os
import platform
import subprocess
import sys
from urllib.parse import urlsplit, urlunsplit

import httpx
import tenacity

from airflow import configuration
from airflow.cli.simple_table import AirflowConsole
from airflow.providers_manager import ProvidersManager
from airflow.typing_compat import Protocol
from airflow.utils.cli import suppress_logs_and_warning
from airflow.utils.platform import getuser
from airflow.version import version as airflow_version

log = logging.getLogger(__name__)


class Anonymizer(Protocol):
    """Anonymizer protocol."""

    def process_path(self, value) -> str:
        """Remove pii from paths"""

    def process_username(self, value) -> str:
        """Remove pii from username"""

    def process_url(self, value) -> str:
        """Remove pii from URL"""


class NullAnonymizer(Anonymizer):
    """Do nothing."""

    def _identity(self, value) -> str:
        return value

    process_path = process_username = process_url = _identity

    del _identity


class PiiAnonymizer(Anonymizer):
    """Remove personally identifiable info from path."""

    def __init__(self):
        home_path = os.path.expanduser("~")
        username = getuser()
        self._path_replacements = {home_path: "${HOME}", username: "${USER}"}

    def process_path(self, value) -> str:
        if not value:
            return value
        for src, target in self._path_replacements.items():
            value = value.replace(src, target)
        return value

    def process_username(self, value) -> str:
        if not value:
            return value
        return value[0] + "..." + value[-1]

    def process_url(self, value) -> str:
        if not value:
            return value

        url_parts = urlsplit(value)
        netloc = None
        if url_parts.netloc:
            # unpack
            userinfo = None
            username = None
            password = None

            if "@" in url_parts.netloc:
                userinfo, _, host = url_parts.netloc.partition("@")
            else:
                host = url_parts.netloc
            if userinfo:
                if ":" in userinfo:
                    username, _, password = userinfo.partition(":")
                else:
                    username = userinfo

            # anonymize
            username = self.process_username(username) if username else None
            password = "PASSWORD" if password else None

            # pack
            if username and password and host:
                netloc = username + ":" + password + "@" + host
            elif username and host:
                netloc = username + "@" + host
            elif password and host:
                netloc = ":" + password + "@" + host
            elif host:
                netloc = host
            else:
                netloc = ""

        return urlunsplit((url_parts.scheme, netloc, url_parts.path, url_parts.query, url_parts.fragment))


class OperatingSystem:
    """Operating system"""

    WINDOWS = "Windows"
    LINUX = "Linux"
    MACOSX = "Mac OS"
    CYGWIN = "Cygwin"

    @staticmethod
    def get_current() -> str | None:
        """Get current operating system"""
        if os.name == "nt":
            return OperatingSystem.WINDOWS
        elif "linux" in sys.platform:
            return OperatingSystem.LINUX
        elif "darwin" in sys.platform:
            return OperatingSystem.MACOSX
        elif "cygwin" in sys.platform:
            return OperatingSystem.CYGWIN
        return None


class Architecture:
    """Compute architecture"""

    X86_64 = "x86_64"
    X86 = "x86"
    PPC = "ppc"
    ARM = "arm"

    @staticmethod
    def get_current():
        """Get architecture"""
        return _MACHINE_TO_ARCHITECTURE.get(platform.machine().lower())


_MACHINE_TO_ARCHITECTURE = {
    "amd64": Architecture.X86_64,
    "x86_64": Architecture.X86_64,
    "i686-64": Architecture.X86_64,
    "i386": Architecture.X86,
    "i686": Architecture.X86,
    "x86": Architecture.X86,
    "ia64": Architecture.X86,  # Itanium is different x64 arch, treat it as the common x86.
    "powerpc": Architecture.PPC,
    "power macintosh": Architecture.PPC,
    "ppc64": Architecture.PPC,
    "armv6": Architecture.ARM,
    "armv6l": Architecture.ARM,
    "arm64": Architecture.ARM,
    "armv7": Architecture.ARM,
    "armv7l": Architecture.ARM,
}


class AirflowInfo:
    """Renders information about Airflow instance"""

    def __init__(self, anonymizer):
        self.anonymizer = anonymizer

    @staticmethod
    def _get_version(cmd: list[str], grep: bytes | None = None):
        """Return tools version."""
        try:
            with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as proc:
                stdoutdata, _ = proc.communicate()
                data = [f for f in stdoutdata.split(b"\n") if f]
                if grep:
                    data = [line for line in data if grep in line]
                if len(data) != 1:
                    return "NOT AVAILABLE"
                else:
                    return data[0].decode()
        except OSError:
            return "NOT AVAILABLE"

    @staticmethod
    def _task_logging_handler():
        """Returns task logging handler."""

        def get_fullname(o):
            module = o.__class__.__module__
            if module is None or module == str.__class__.__module__:
                return o.__class__.__name__  # Avoid reporting __builtin__
            else:
                return module + '.' + o.__class__.__name__

        try:
            handler_names = [get_fullname(handler) for handler in logging.getLogger('airflow.task').handlers]
            return ", ".join(handler_names)
        except Exception:
            return "NOT AVAILABLE"

    @property
    def _airflow_info(self):
        executor = configuration.conf.get("core", "executor")
        sql_alchemy_conn = self.anonymizer.process_url(
            configuration.conf.get("database", "SQL_ALCHEMY_CONN", fallback="NOT AVAILABLE")
        )
        dags_folder = self.anonymizer.process_path(
            configuration.conf.get("core", "dags_folder", fallback="NOT AVAILABLE")
        )
        plugins_folder = self.anonymizer.process_path(
            configuration.conf.get("core", "plugins_folder", fallback="NOT AVAILABLE")
        )
        base_log_folder = self.anonymizer.process_path(
            configuration.conf.get("logging", "base_log_folder", fallback="NOT AVAILABLE")
        )
        remote_base_log_folder = self.anonymizer.process_path(
            configuration.conf.get("logging", "remote_base_log_folder", fallback="NOT AVAILABLE")
        )

        return [
            ("version", airflow_version),
            ("executor", executor),
            ("task_logging_handler", self._task_logging_handler()),
            ("sql_alchemy_conn", sql_alchemy_conn),
            ("dags_folder", dags_folder),
            ("plugins_folder", plugins_folder),
            ("base_log_folder", base_log_folder),
            ("remote_base_log_folder", remote_base_log_folder),
        ]

    @property
    def _system_info(self):
        operating_system = OperatingSystem.get_current()
        arch = Architecture.get_current()
        uname = platform.uname()
        _locale = locale.getdefaultlocale()
        python_location = self.anonymizer.process_path(sys.executable)
        python_version = sys.version.replace("\n", " ")

        return [
            ("OS", operating_system or "NOT AVAILABLE"),
            ("architecture", arch or "NOT AVAILABLE"),
            ("uname", str(uname)),
            ("locale", str(_locale)),
            ("python_version", python_version),
            ("python_location", python_location),
        ]

    @property
    def _tools_info(self):
        git_version = self._get_version(["git", "--version"])
        ssh_version = self._get_version(["ssh", "-V"])
        kubectl_version = self._get_version(["kubectl", "version", "--short=True", "--client=True"])
        gcloud_version = self._get_version(["gcloud", "version"], grep=b"Google Cloud SDK")
        cloud_sql_proxy_version = self._get_version(["cloud_sql_proxy", "--version"])
        mysql_version = self._get_version(["mysql", "--version"])
        sqlite3_version = self._get_version(["sqlite3", "--version"])
        psql_version = self._get_version(["psql", "--version"])

        return [
            ("git", git_version),
            ("ssh", ssh_version),
            ("kubectl", kubectl_version),
            ("gcloud", gcloud_version),
            ("cloud_sql_proxy", cloud_sql_proxy_version),
            ("mysql", mysql_version),
            ("sqlite3", sqlite3_version),
            ("psql", psql_version),
        ]

    @property
    def _paths_info(self):
        system_path = os.environ.get("PATH", "").split(os.pathsep)
        airflow_home = self.anonymizer.process_path(configuration.get_airflow_home())
        system_path = [self.anonymizer.process_path(p) for p in system_path]
        python_path = [self.anonymizer.process_path(p) for p in sys.path]
        airflow_on_path = any(os.path.exists(os.path.join(path_elem, "airflow")) for path_elem in system_path)

        return [
            ("airflow_home", airflow_home),
            ("system_path", os.pathsep.join(system_path)),
            ("python_path", os.pathsep.join(python_path)),
            ("airflow_on_path", str(airflow_on_path)),
        ]

    @property
    def _providers_info(self):
        return [(p.data['package-name'], p.version) for p in ProvidersManager().providers.values()]

    def show(self, output: str, console: AirflowConsole | None = None) -> None:
        """Shows information about Airflow instance"""
        all_info = {
            "Apache Airflow": self._airflow_info,
            "System info": self._system_info,
            "Tools info": self._tools_info,
            "Paths info": self._paths_info,
            "Providers info": self._providers_info,
        }

        console = console or AirflowConsole(show_header=False)
        if output in ("table", "plain"):
            # Show each info as table with key, value column
            for key, info in all_info.items():
                console.print(f"\n[bold][green]{key}[/bold][/green]", highlight=False)
                console.print_as(data=[{"key": k, "value": v} for k, v in info], output=output)
        else:
            # Render info in given format, change keys to snake_case
            console.print_as(
                data=[{k.lower().replace(" ", "_"): dict(v)} for k, v in all_info.items()], output=output
            )

    def render_text(self, output: str) -> str:
        """Exports the info to string"""
        console = AirflowConsole(record=True)
        with console.capture():
            self.show(output=output, console=console)
        return console.export_text()


class FileIoException(Exception):
    """Raises when error happens in FileIo.io integration"""


@tenacity.retry(
    stop=tenacity.stop_after_attempt(5),
    wait=tenacity.wait_exponential(multiplier=1, max=10),
    retry=tenacity.retry_if_exception_type(FileIoException),
    before=tenacity.before_log(log, logging.DEBUG),
    after=tenacity.after_log(log, logging.DEBUG),
)
def _upload_text_to_fileio(content):
    """Upload text file to File.io service and return lnk"""
    resp = httpx.post("https://file.io", content=content)
    if resp.status_code not in [200, 201]:
        print(resp.json())
        raise FileIoException("Failed to send report to file.io service.")
    try:
        return resp.json()["link"]
    except ValueError as e:
        log.debug(e)
        raise FileIoException("Failed to send report to file.io service.")


def _send_report_to_fileio(info):
    print("Uploading report to file.io service.")
    try:
        link = _upload_text_to_fileio(str(info))
        print("Report uploaded.")
        print(link)
        print()
    except FileIoException as ex:
        print(str(ex))


@suppress_logs_and_warning
def show_info(args):
    """Show information related to Airflow, system and other."""
    # Enforce anonymization, when file_io upload is tuned on.
    anonymizer = PiiAnonymizer() if args.anonymize or args.file_io else NullAnonymizer()
    info = AirflowInfo(anonymizer)
    if args.file_io:
        _send_report_to_fileio(info.render_text(args.output))
    else:
        info.show(args.output)

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow celery_command 源码

airflow cheat_sheet_command 源码

airflow config_command 源码

airflow connection_command 源码

airflow dag_command 源码

airflow dag_processor_command 源码

airflow db_command 源码

airflow jobs_command 源码

airflow kerberos_command 源码

0  赞