airflow developer_commands 源码
airflow developer_commands 代码
文件路径:/dev/breeze/src/airflow_breeze/commands/developer_commands.py
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
import shutil
import sys
import threading
from signal import SIGTERM
from time import sleep
from typing import Iterable
import click
from airflow_breeze.commands.ci_image_commands import rebuild_or_pull_ci_image_if_needed
from airflow_breeze.commands.main_command import main
from airflow_breeze.global_constants import (
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
DOCKER_DEFAULT_PLATFORM,
MOUNT_SELECTED,
get_available_documentation_packages,
)
from airflow_breeze.params.build_ci_params import BuildCiParams
from airflow_breeze.params.doc_build_params import DocBuildParams
from airflow_breeze.params.shell_params import ShellParams
from airflow_breeze.pre_commit_ids import PRE_COMMIT_LIST
from airflow_breeze.utils.cache import read_from_cache_file
from airflow_breeze.utils.common_options import (
option_airflow_constraints_reference,
option_airflow_extras,
option_answer,
option_backend,
option_db_reset,
option_dry_run,
option_force_build,
option_forward_credentials,
option_github_repository,
option_image_tag_for_running,
option_include_mypy_volume,
option_installation_package_format,
option_integration,
option_load_default_connection,
option_load_example_dags,
option_max_time,
option_mount_sources,
option_mssql_version,
option_mysql_version,
option_platform_single,
option_postgres_version,
option_python,
option_use_airflow_version,
option_use_packages_from_dist,
option_verbose,
)
from airflow_breeze.utils.confirm import set_forced_answer
from airflow_breeze.utils.console import get_console
from airflow_breeze.utils.custom_param_types import BetterChoice, NotVerifiedBetterChoice
from airflow_breeze.utils.docker_command_utils import (
DOCKER_COMPOSE_COMMAND,
check_docker_resources,
get_env_variables_for_docker_commands,
get_extra_docker_flags,
perform_environment_checks,
)
from airflow_breeze.utils.path_utils import AIRFLOW_SOURCES_ROOT, create_mypy_volume_if_needed
from airflow_breeze.utils.run_utils import (
RunCommandResult,
assert_pre_commit_installed,
filter_out_none,
run_command,
run_compile_www_assets,
)
from airflow_breeze.utils.visuals import ASCIIART, ASCIIART_STYLE, CHEATSHEET, CHEATSHEET_STYLE
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# Make sure that whatever you add here as an option is also
# Added in the "main" command in breeze.py. The min command above
# Is used for a shorthand of shell and except the extra
# Args it should have the same parameters.
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
class TimerThread(threading.Thread):
def __init__(self, max_time: int):
super().__init__(daemon=True)
self.max_time = max_time
def run(self):
get_console().print(f"[info]Setting timer to fail after {self.max_time} s.")
sleep(self.max_time)
get_console().print(f"[error]The command took longer than {self.max_time} s. Failing!")
os.killpg(os.getpgid(0), SIGTERM)
@main.command()
@option_verbose
@option_dry_run
@option_python
@option_platform_single
@option_backend
@option_github_repository
@option_postgres_version
@option_mysql_version
@option_mssql_version
@option_forward_credentials
@option_force_build
@option_use_airflow_version
@option_airflow_extras
@option_airflow_constraints_reference
@option_use_packages_from_dist
@option_installation_package_format
@option_mount_sources
@option_integration
@option_db_reset
@option_image_tag_for_running
@option_answer
@option_max_time
@option_include_mypy_volume
@click.argument('extra-args', nargs=-1, type=click.UNPROCESSED)
def shell(
verbose: bool,
dry_run: bool,
python: str,
github_repository: str,
backend: str,
integration: tuple[str],
postgres_version: str,
mysql_version: str,
mssql_version: str,
forward_credentials: bool,
mount_sources: str,
use_packages_from_dist: bool,
package_format: str,
use_airflow_version: str | None,
airflow_extras: str,
airflow_constraints_reference: str,
force_build: bool,
db_reset: bool,
include_mypy_volume: bool,
answer: str | None,
max_time: int | None,
image_tag: str | None,
platform: str | None,
extra_args: tuple,
):
"""Enter breeze environment. this is the default command use when no other is selected."""
if verbose or dry_run:
get_console().print("\n[success]Welcome to breeze.py[/]\n")
get_console().print(f"\n[success]Root of Airflow Sources = {AIRFLOW_SOURCES_ROOT}[/]\n")
if max_time:
TimerThread(max_time=max_time).start()
set_forced_answer('yes')
enter_shell(
verbose=verbose,
dry_run=dry_run,
python=python,
github_repository=github_repository,
backend=backend,
integration=integration,
postgres_version=postgres_version,
mysql_version=mysql_version,
mssql_version=mssql_version,
forward_credentials=str(forward_credentials),
mount_sources=mount_sources,
use_airflow_version=use_airflow_version,
airflow_extras=airflow_extras,
airflow_constraints_reference=airflow_constraints_reference,
use_packages_from_dist=use_packages_from_dist,
package_format=package_format,
force_build=force_build,
db_reset=db_reset,
include_mypy_volume=include_mypy_volume,
extra_args=extra_args if not max_time else ["exit"],
answer=answer,
image_tag=image_tag,
platform=platform,
)
@option_verbose
@main.command(name='start-airflow')
@option_dry_run
@option_python
@option_platform_single
@option_github_repository
@option_backend
@option_postgres_version
@option_load_example_dags
@option_load_default_connection
@option_mysql_version
@option_mssql_version
@option_forward_credentials
@option_force_build
@option_use_airflow_version
@option_airflow_extras
@option_airflow_constraints_reference
@option_use_packages_from_dist
@option_installation_package_format
@option_mount_sources
@option_integration
@option_image_tag_for_running
@click.option(
'--skip-asset-compilation',
help="Skips compilation of assets when starting airflow even if the content of www changed "
"(mutually exclusive with --dev-mode).",
is_flag=True,
)
@click.option(
'--dev-mode',
help="Starts webserver in dev mode (assets are always recompiled in this case when starting) "
"(mutually exclusive with --skip-asset-compilation).",
is_flag=True,
)
@option_db_reset
@option_answer
@click.argument('extra-args', nargs=-1, type=click.UNPROCESSED)
def start_airflow(
verbose: bool,
dry_run: bool,
python: str,
github_repository: str,
backend: str,
integration: tuple[str],
postgres_version: str,
load_example_dags: bool,
load_default_connections: bool,
mysql_version: str,
mssql_version: str,
forward_credentials: bool,
mount_sources: str,
use_airflow_version: str | None,
airflow_extras: str,
airflow_constraints_reference: str,
use_packages_from_dist: bool,
package_format: str,
force_build: bool,
skip_asset_compilation: bool,
dev_mode: bool,
image_tag: str | None,
db_reset: bool,
answer: str | None,
platform: str | None,
extra_args: tuple,
):
"""
Enter breeze environment and starts all Airflow components in the tmux session.
Compile assets if contents of www directory changed.
"""
if dev_mode and skip_asset_compilation:
get_console().print(
'[warning]You cannot skip asset compilation in dev mode! Assets will be compiled!'
)
skip_asset_compilation = True
if use_airflow_version is None and not skip_asset_compilation:
run_compile_www_assets(dev=dev_mode, run_in_background=True, verbose=verbose, dry_run=dry_run)
enter_shell(
verbose=verbose,
dry_run=dry_run,
python=python,
github_repository=github_repository,
backend=backend,
integration=integration,
postgres_version=postgres_version,
load_default_connections=load_default_connections,
load_example_dags=load_example_dags,
mysql_version=mysql_version,
mssql_version=mssql_version,
forward_credentials=str(forward_credentials),
mount_sources=mount_sources,
use_airflow_version=use_airflow_version,
airflow_extras=airflow_extras,
airflow_constraints_reference=airflow_constraints_reference,
use_packages_from_dist=use_packages_from_dist,
package_format=package_format,
force_build=force_build,
db_reset=db_reset,
start_airflow=True,
dev_mode=dev_mode,
image_tag=image_tag,
platform=platform,
extra_args=extra_args,
answer=answer,
)
@main.command(name='build-docs')
@option_verbose
@option_dry_run
@option_github_repository
@click.option('-d', '--docs-only', help="Only build documentation.", is_flag=True)
@click.option('-s', '--spellcheck-only', help="Only run spell checking.", is_flag=True)
@click.option(
'--package-filter',
help="List of packages to consider.",
type=NotVerifiedBetterChoice(get_available_documentation_packages()),
multiple=True,
)
@click.option(
'--clean-build',
help="Clean inventories of Inter-Sphinx documentation and generated APIs and sphinx artifacts "
"before the build - useful for a clean build.",
is_flag=True,
)
@click.option(
'--for-production',
help="Builds documentation for official release i.e. all links point to stable version. "
"Implies --clean-build",
is_flag=True,
)
def build_docs(
verbose: bool,
dry_run: bool,
github_repository: str,
docs_only: bool,
spellcheck_only: bool,
for_production: bool,
clean_build: bool,
package_filter: tuple[str],
):
"""Build documentation in the container."""
if for_production and not clean_build:
get_console().print("\n[warning]When building docs for production, clan-build is forced\n")
clean_build = True
perform_environment_checks(verbose=verbose)
params = BuildCiParams(github_repository=github_repository, python=DEFAULT_PYTHON_MAJOR_MINOR_VERSION)
rebuild_or_pull_ci_image_if_needed(command_params=params, dry_run=dry_run, verbose=verbose)
if clean_build:
docs_dir = AIRFLOW_SOURCES_ROOT / "docs"
for dir_name in ['_build', "_doctrees", '_inventory_cache', '_api']:
for dir in docs_dir.rglob(dir_name):
get_console().print(f"[info]Removing {dir}")
shutil.rmtree(dir, ignore_errors=True)
ci_image_name = params.airflow_image_name
doc_builder = DocBuildParams(
package_filter=package_filter,
docs_only=docs_only,
spellcheck_only=spellcheck_only,
for_production=for_production,
skip_environment_initialization=True,
)
extra_docker_flags = get_extra_docker_flags(MOUNT_SELECTED)
env = get_env_variables_for_docker_commands(params)
cmd = [
"docker",
"run",
"-t",
*extra_docker_flags,
"--pull",
"never",
ci_image_name,
"/opt/airflow/scripts/in_container/run_docs_build.sh",
*doc_builder.args_doc_builder,
]
process = run_command(cmd, verbose=verbose, dry_run=dry_run, text=True, env=env, check=False)
sys.exit(process.returncode)
@main.command(
name="static-checks",
help="Run static checks.",
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.option(
'-t',
'--type',
help="Type(s) of the static checks to run (multiple can be added).",
type=BetterChoice(PRE_COMMIT_LIST),
multiple=True,
)
@click.option('-a', '--all-files', help="Run checks on all files.", is_flag=True)
@click.option('-f', '--file', help="List of files to run the checks on.", type=click.Path(), multiple=True)
@click.option(
'-s', '--show-diff-on-failure', help="Show diff for files modified by the checks.", is_flag=True
)
@click.option(
'-c',
'--last-commit',
help="Run checks for all files in last commit. Mutually exclusive with --commit-ref.",
is_flag=True,
)
@click.option(
'-r',
'--commit-ref',
help="Run checks for this commit reference only "
"(can be any git commit-ish reference). "
"Mutually exclusive with --last-commit.",
)
@option_verbose
@option_dry_run
@option_github_repository
@click.argument('precommit_args', nargs=-1, type=click.UNPROCESSED)
def static_checks(
verbose: bool,
dry_run: bool,
github_repository: str,
all_files: bool,
show_diff_on_failure: bool,
last_commit: bool,
commit_ref: str,
type: tuple[str],
file: Iterable[str],
precommit_args: tuple,
):
assert_pre_commit_installed(verbose=verbose)
perform_environment_checks(verbose=verbose)
command_to_execute = [sys.executable, "-m", "pre_commit", 'run']
if last_commit and commit_ref:
get_console().print("\n[error]You cannot specify both --last-commit and --commit-ref[/]\n")
sys.exit(1)
for single_check in type:
command_to_execute.append(single_check)
if all_files:
command_to_execute.append("--all-files")
if show_diff_on_failure:
command_to_execute.append("--show-diff-on-failure")
if last_commit:
command_to_execute.extend(["--from-ref", "HEAD^", "--to-ref", "HEAD"])
if commit_ref:
command_to_execute.extend(["--from-ref", f"{commit_ref}^", "--to-ref", f"{commit_ref}"])
if verbose or dry_run:
command_to_execute.append("--verbose")
if file:
command_to_execute.append("--files")
command_to_execute.extend(file)
if precommit_args:
command_to_execute.extend(precommit_args)
env = os.environ.copy()
env['GITHUB_REPOSITORY'] = github_repository
static_checks_result = run_command(
command_to_execute,
verbose=verbose,
dry_run=dry_run,
check=False,
no_output_dump_on_exception=True,
text=True,
env=env,
)
if static_checks_result.returncode != 0:
if os.environ.get('CI'):
get_console().print("[error]There were errors during pre-commit check. They should be fixed[/]")
sys.exit(static_checks_result.returncode)
@main.command(
name="compile-www-assets",
help="Compiles www assets.",
)
@click.option(
"--dev",
help="Run development version of assets compilation - it will not quit and automatically "
"recompile assets on-the-fly when they are changed.",
is_flag=True,
)
@option_verbose
@option_dry_run
def compile_www_assets(
dev: bool,
verbose: bool,
dry_run: bool,
):
perform_environment_checks(verbose=verbose)
assert_pre_commit_installed(verbose=verbose)
compile_www_assets_result = run_compile_www_assets(
dev=dev, run_in_background=False, verbose=verbose, dry_run=dry_run
)
if compile_www_assets_result.returncode != 0:
get_console().print("[warn]New assets were generated[/]")
sys.exit(0)
@main.command(name="stop", help="Stop running breeze environment.")
@option_verbose
@option_dry_run
@click.option(
"-p",
"--preserve-volumes",
help="Skip removing volumes when stopping Breeze.",
is_flag=True,
)
def stop(verbose: bool, dry_run: bool, preserve_volumes: bool):
perform_environment_checks(verbose=verbose)
command_to_execute = [*DOCKER_COMPOSE_COMMAND, 'down', "--remove-orphans"]
if not preserve_volumes:
command_to_execute.append("--volumes")
shell_params = ShellParams(verbose=verbose, backend="all", include_mypy_volume=True)
env_variables = get_env_variables_for_docker_commands(shell_params)
run_command(command_to_execute, verbose=verbose, dry_run=dry_run, env=env_variables)
@main.command(name='exec', help='Joins the interactive shell of running airflow container.')
@option_verbose
@option_dry_run
@click.argument('exec_args', nargs=-1, type=click.UNPROCESSED)
def exec(verbose: bool, dry_run: bool, exec_args: tuple):
perform_environment_checks(verbose=verbose)
container_running = find_airflow_container(verbose, dry_run)
if container_running:
cmd_to_run = [
"docker",
"exec",
"-it",
container_running,
"/opt/airflow/scripts/docker/entrypoint_exec.sh",
]
if exec_args:
cmd_to_run.extend(exec_args)
process = run_command(
cmd_to_run,
verbose=verbose,
dry_run=dry_run,
check=False,
no_output_dump_on_exception=False,
text=True,
)
if not process:
sys.exit(1)
sys.exit(process.returncode)
def enter_shell(**kwargs) -> RunCommandResult:
"""
Executes entering shell using the parameters passed as kwargs:
* checks if docker version is good
* checks if docker-compose version is good
* updates kwargs with cached parameters
* displays ASCIIART and CHEATSHEET unless disabled
* build ShellParams from the updated kwargs
* executes the command to drop the user to Breeze shell
"""
verbose = kwargs['verbose']
dry_run = kwargs['dry_run']
perform_environment_checks(verbose=verbose)
if read_from_cache_file('suppress_asciiart') is None:
get_console().print(ASCIIART, style=ASCIIART_STYLE)
if read_from_cache_file('suppress_cheatsheet') is None:
get_console().print(CHEATSHEET, style=CHEATSHEET_STYLE)
enter_shell_params = ShellParams(**filter_out_none(**kwargs))
rebuild_or_pull_ci_image_if_needed(command_params=enter_shell_params, dry_run=dry_run, verbose=verbose)
if enter_shell_params.include_mypy_volume:
create_mypy_volume_if_needed()
return run_shell(verbose, dry_run, enter_shell_params)
def run_shell(verbose: bool, dry_run: bool, shell_params: ShellParams) -> RunCommandResult:
"""
Executes a shell command built from params passed.
* prints information about the build
* constructs docker compose command to enter shell
* executes it
:param verbose: print commands when running
:param dry_run: do not execute "write" commands - just print what would happen
:param shell_params: parameters of the execution
"""
shell_params.print_badge_info()
cmd = [*DOCKER_COMPOSE_COMMAND, 'run', '--service-ports', "-e", "BREEZE", '--rm', 'airflow']
cmd_added = shell_params.command_passed
env_variables = get_env_variables_for_docker_commands(shell_params)
if cmd_added is not None:
cmd.extend(['-c', cmd_added])
if "arm64" in DOCKER_DEFAULT_PLATFORM:
if shell_params.backend == "mysql":
get_console().print('\n[error]MySQL is not supported on ARM architecture.[/]\n')
sys.exit(1)
if shell_params.backend == "mssql":
get_console().print('\n[error]MSSQL is not supported on ARM architecture[/]\n')
sys.exit(1)
command_result = run_command(
cmd, verbose=verbose, dry_run=dry_run, env=env_variables, text=True, check=False
)
if command_result.returncode == 0:
return command_result
else:
get_console().print(f"[red]Error {command_result.returncode} returned[/]")
if verbose:
get_console().print(command_result.stderr)
sys.exit(command_result.returncode)
def stop_exec_on_error(returncode: int):
get_console().print('\n[error]ERROR in finding the airflow docker-compose process id[/]\n')
sys.exit(returncode)
def find_airflow_container(verbose, dry_run) -> str | None:
exec_shell_params = ShellParams(verbose=verbose, dry_run=dry_run)
check_docker_resources(exec_shell_params.airflow_image_name, verbose=verbose, dry_run=dry_run)
exec_shell_params.print_badge_info()
env_variables = get_env_variables_for_docker_commands(exec_shell_params)
cmd = [*DOCKER_COMPOSE_COMMAND, 'ps', '--all', '--filter', 'status=running', 'airflow']
docker_compose_ps_command = run_command(
cmd, verbose=verbose, dry_run=dry_run, text=True, capture_output=True, env=env_variables, check=False
)
if dry_run:
return "CONTAINER_ID"
if docker_compose_ps_command.returncode != 0:
if verbose:
get_console().print(docker_compose_ps_command.stdout)
get_console().print(docker_compose_ps_command.stderr)
stop_exec_on_error(docker_compose_ps_command.returncode)
return None
output = docker_compose_ps_command.stdout
container_info = output.strip().split('\n')
if container_info:
container_running = container_info[-1].split(' ')[0]
if container_running.startswith('-'):
# On docker-compose v1 we get '--------' as output here
stop_exec_on_error(docker_compose_ps_command.returncode)
return container_running
else:
stop_exec_on_error(1)
return None
相关信息
相关文章
airflow ci_image_commands_config 源码
airflow developer_commands_config 源码
airflow kubernetes_commands 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦