airflow env 源码

  • 2022-10-20
  • 浏览 (319)

airflow env 代码

文件路径:/airflow/migrations/env.py

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import contextlib
import sys
from logging.config import fileConfig

from alembic import context

from airflow import models, settings
from airflow.utils.db import compare_server_default, compare_type


def include_object(_, name, type_, *args):
    """Filter objects for autogenerating revisions"""
    # Ignore _anything_ to do with Celery, or FlaskSession's tables
    if type_ == "table" and (name.startswith("celery_") or name == "session"):
        return False
    else:
        return True


# Make sure everything is imported so that alembic can find it all
models.import_all_models()

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name, disable_existing_loggers=False)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = models.base.Base.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    context.configure(
        url=settings.SQL_ALCHEMY_CONN,
        target_metadata=target_metadata,
        literal_binds=True,
        compare_type=compare_type,
        compare_server_default=compare_server_default,
        render_as_batch=True,
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online():
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    with contextlib.ExitStack() as stack:
        connection = config.attributes.get('connection', None)

        if not connection:
            connection = stack.push(settings.engine.connect())

        context.configure(
            connection=connection,
            transaction_per_migration=True,
            target_metadata=target_metadata,
            compare_type=compare_type,
            compare_server_default=compare_server_default,
            include_object=include_object,
            render_as_batch=True,
        )

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

if 'airflow.www.app' in sys.modules:
    # Already imported, make sure we clear out any cached app
    from airflow.www.app import purge_cached_app

    purge_cached_app()

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow db_types 源码

airflow utils 源码

0  赞