airflow logging_config 源码

  • 2022-10-20
  • 浏览 (392)

airflow logging_config 代码

文件路径:/airflow/logging_config.py

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
import warnings
from logging.config import dictConfig

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.module_loading import import_string

log = logging.getLogger(__name__)


def configure_logging():
    """Configure & Validate Airflow Logging"""
    logging_class_path = ''
    try:
        logging_class_path = conf.get('logging', 'logging_config_class')
    except AirflowConfigException:
        log.debug('Could not find key logging_config_class in config')

    if logging_class_path:
        try:
            logging_config = import_string(logging_class_path)

            # Make sure that the variable is in scope
            if not isinstance(logging_config, dict):
                raise ValueError("Logging Config should be of dict type")

            log.info('Successfully imported user-defined logging config from %s', logging_class_path)
        except Exception as err:
            # Import default logging configurations.
            raise ImportError(f'Unable to load custom logging from {logging_class_path} due to {err}')
    else:
        logging_class_path = 'airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG'
        logging_config = import_string(logging_class_path)
        log.debug('Unable to load custom logging, using default config instead')

    try:
        # Ensure that the password masking filter is applied to the 'task' handler
        # no matter what the user did.
        if 'filters' in logging_config and 'mask_secrets' in logging_config['filters']:
            # But if they replace the logging config _entirely_, don't try to set this, it won't work
            task_handler_config = logging_config['handlers']['task']

            task_handler_config.setdefault('filters', [])

            if 'mask_secrets' not in task_handler_config['filters']:
                task_handler_config['filters'].append('mask_secrets')

        # Try to init logging
        dictConfig(logging_config)
    except (ValueError, KeyError) as e:
        log.error('Unable to load the config, contains a configuration error.')
        # When there is an error in the config, escalate the exception
        # otherwise Airflow would silently fall back on the default config
        raise e

    validate_logging_config(logging_config)

    return logging_class_path


def validate_logging_config(logging_config):
    """Validate the provided Logging Config"""
    # Now lets validate the other logging-related settings
    task_log_reader = conf.get('logging', 'task_log_reader')

    logger = logging.getLogger('airflow.task')

    def _get_handler(name):
        return next((h for h in logger.handlers if h.name == name), None)

    if _get_handler(task_log_reader) is None:
        # Check for pre 1.10 setting that might be in deployed airflow.cfg files
        if task_log_reader == "file.task" and _get_handler("task"):
            warnings.warn(
                f"task_log_reader setting in [logging] has a deprecated value of {task_log_reader!r}, "
                "but no handler with this name was found. Please update your config to use task. "
                "Running config has been adjusted to match",
                DeprecationWarning,
            )
            conf.set('logging', 'task_log_reader', 'task')
        else:
            raise AirflowConfigException(
                f"Configured task_log_reader {task_log_reader!r} was not a handler of "
                f"the 'airflow.task' logger."
            )

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow main 源码

airflow configuration 源码

airflow exceptions 源码

airflow plugins_manager 源码

airflow providers_manager 源码

airflow sentry 源码

airflow settings 源码

airflow stats 源码

airflow templates 源码

0  赞