airflow pool_command 源码

  • 2022-10-20
  • 浏览 (338)

airflow pool_command 代码

文件路径:/airflow/cli/commands/pool_command.py

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Pools sub-commands"""
from __future__ import annotations

import json
import os
from json import JSONDecodeError

from airflow.api.client import get_current_api_client
from airflow.cli.simple_table import AirflowConsole
from airflow.exceptions import PoolNotFound
from airflow.utils import cli as cli_utils
from airflow.utils.cli import suppress_logs_and_warning


def _show_pools(pools, output):
    AirflowConsole().print_as(
        data=pools,
        output=output,
        mapper=lambda x: {
            "pool": x[0],
            "slots": x[1],
            "description": x[2],
        },
    )


@suppress_logs_and_warning
def pool_list(args):
    """Displays info of all the pools"""
    api_client = get_current_api_client()
    pools = api_client.get_pools()
    _show_pools(pools=pools, output=args.output)


@suppress_logs_and_warning
def pool_get(args):
    """Displays pool info by a given name"""
    api_client = get_current_api_client()
    try:
        pools = [api_client.get_pool(name=args.pool)]
        _show_pools(pools=pools, output=args.output)
    except PoolNotFound:
        raise SystemExit(f"Pool {args.pool} does not exist")


@cli_utils.action_cli
@suppress_logs_and_warning
def pool_set(args):
    """Creates new pool with a given name and slots"""
    api_client = get_current_api_client()
    api_client.create_pool(name=args.pool, slots=args.slots, description=args.description)
    print(f"Pool {args.pool} created")


@cli_utils.action_cli
@suppress_logs_and_warning
def pool_delete(args):
    """Deletes pool by a given name"""
    api_client = get_current_api_client()
    try:
        api_client.delete_pool(name=args.pool)
        print(f"Pool {args.pool} deleted")
    except PoolNotFound:
        raise SystemExit(f"Pool {args.pool} does not exist")


@cli_utils.action_cli
@suppress_logs_and_warning
def pool_import(args):
    """Imports pools from the file"""
    if not os.path.exists(args.file):
        raise SystemExit(f"Missing pools file {args.file}")
    pools, failed = pool_import_helper(args.file)
    if len(failed) > 0:
        raise SystemExit(f"Failed to update pool(s): {', '.join(failed)}")
    print(f"Uploaded {len(pools)} pool(s)")


def pool_export(args):
    """Exports all of the pools to the file"""
    pools = pool_export_helper(args.file)
    print(f"Exported {len(pools)} pools to {args.file}")


def pool_import_helper(filepath):
    """Helps import pools from the json file"""
    api_client = get_current_api_client()

    with open(filepath) as poolfile:
        data = poolfile.read()
    try:
        pools_json = json.loads(data)
    except JSONDecodeError as e:
        raise SystemExit(f"Invalid json file: {e}")
    pools = []
    failed = []
    for k, v in pools_json.items():
        if isinstance(v, dict) and len(v) == 2:
            pools.append(api_client.create_pool(name=k, slots=v["slots"], description=v["description"]))
        else:
            failed.append(k)
    return pools, failed


def pool_export_helper(filepath):
    """Helps export all of the pools to the json file"""
    api_client = get_current_api_client()
    pool_dict = {}
    pools = api_client.get_pools()
    for pool in pools:
        pool_dict[pool[0]] = {"slots": pool[1], "description": pool[2]}
    with open(filepath, 'w') as poolfile:
        poolfile.write(json.dumps(pool_dict, sort_keys=True, indent=4))
    return pools

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow celery_command 源码

airflow cheat_sheet_command 源码

airflow config_command 源码

airflow connection_command 源码

airflow dag_command 源码

airflow dag_processor_command 源码

airflow db_command 源码

airflow info_command 源码

airflow jobs_command 源码

0  赞