airflow cli_action_loggers 源码

  • 2022-10-20
  • 浏览 (364)

airflow cli_action_loggers 代码

文件路径:/airflow/utils/cli_action_loggers.py

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
An Action Logger module. Singleton pattern has been applied into this module
so that registered callbacks can be used all through the same python process.
"""
from __future__ import annotations

import json
import logging
from typing import Callable, List


def register_pre_exec_callback(action_logger):
    """
    Registers more action_logger function callback for pre-execution.
    This function callback is expected to be called with keyword args.
    For more about the arguments that is being passed to the callback,
    refer to airflow.utils.cli.action_logging()

    :param action_logger: An action logger function
    :return: None
    """
    logging.debug("Adding %s to pre execution callback", action_logger)
    __pre_exec_callbacks.append(action_logger)


def register_post_exec_callback(action_logger):
    """
    Registers more action_logger function callback for post-execution.
    This function callback is expected to be called with keyword args.
    For more about the arguments that is being passed to the callback,
    refer to airflow.utils.cli.action_logging()

    :param action_logger: An action logger function
    :return: None
    """
    logging.debug("Adding %s to post execution callback", action_logger)
    __post_exec_callbacks.append(action_logger)


def on_pre_execution(**kwargs):
    """
    Calls callbacks before execution.
    Note that any exception from callback will be logged but won't be propagated.

    :param kwargs:
    :return: None
    """
    logging.debug("Calling callbacks: %s", __pre_exec_callbacks)
    for callback in __pre_exec_callbacks:
        try:
            callback(**kwargs)
        except Exception:
            logging.exception('Failed on pre-execution callback using %s', callback)


def on_post_execution(**kwargs):
    """
    Calls callbacks after execution.
    As it's being called after execution, it can capture status of execution,
    duration, etc. Note that any exception from callback will be logged but
    won't be propagated.

    :param kwargs:
    :return: None
    """
    logging.debug("Calling callbacks: %s", __post_exec_callbacks)
    for callback in __post_exec_callbacks:
        try:
            callback(**kwargs)
        except Exception:
            logging.exception('Failed on post-execution callback using %s', callback)


def default_action_log(sub_command, user, task_id, dag_id, execution_date, host_name, full_command, **_):
    """
    A default action logger callback that behave same as www.utils.action_logging
    which uses global session and pushes log ORM object.

    :param log: An log ORM instance
    :param **_: other keyword arguments that is not being used by this function
    :return: None
    """
    from airflow.models.log import Log
    from airflow.utils import timezone
    from airflow.utils.session import create_session

    try:
        with create_session() as session:
            extra = json.dumps({'host_name': host_name, 'full_command': full_command})
            # Use bulk_insert_mappings here to avoid importing all models (which using the classes does) early
            # on in the CLI
            session.bulk_insert_mappings(
                Log,
                [
                    {
                        'event': f'cli_{sub_command}',
                        'task_instance': None,
                        'owner': user,
                        'extra': extra,
                        'task_id': task_id,
                        'dag_id': dag_id,
                        'execution_date': execution_date,
                        'dttm': timezone.utcnow(),
                    }
                ],
            )
    except Exception as error:
        logging.warning("Failed to log action with %s", error)


__pre_exec_callbacks = []  # type: List[Callable]
__post_exec_callbacks = []  # type: List[Callable]

# By default, register default action log into pre-execution callback
register_pre_exec_callback(default_action_log)

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow airflow_flask_app 源码

airflow cli 源码

airflow code_utils 源码

airflow compression 源码

airflow configuration 源码

airflow context 源码

airflow dag_edges 源码

airflow dag_parsing_context 源码

airflow dates 源码

0  赞