airflow dot_renderer 源码

  • 2022-10-20
  • 浏览 (268)

airflow dot_renderer 代码

文件路径:/airflow/utils/dot_renderer.py

#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Renderer DAG (tasks and dependencies) to the graphviz object."""
from __future__ import annotations

from typing import Any

import graphviz

from airflow import AirflowException
from airflow.models import TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskmixin import DependencyMixin
from airflow.serialization.serialized_objects import DagDependency
from airflow.utils.dag_edges import dag_edges
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup


def _refine_color(color: str):
    """
    Converts color in #RGB (12 bits) format to #RRGGBB (32 bits), if it possible.
    Otherwise, it returns the original value. Graphviz does not support colors in #RGB format.

    :param color: Text representation of color
    :return: Refined representation of color
    """
    if len(color) == 4 and color[0] == "#":
        color_r = color[1]
        color_g = color[2]
        color_b = color[3]
        return "#" + color_r + color_r + color_g + color_g + color_b + color_b
    return color


def _draw_task(
    task: MappedOperator | BaseOperator,
    parent_graph: graphviz.Digraph,
    states_by_task_id: dict[Any, Any] | None,
) -> None:
    """Draw a single task on the given parent_graph"""
    if states_by_task_id:
        state = states_by_task_id.get(task.task_id, State.NONE)
        color = State.color_fg(state)
        fill_color = State.color(state)
    else:
        color = task.ui_fgcolor
        fill_color = task.ui_color

    parent_graph.node(
        task.task_id,
        _attributes={
            "label": task.label,
            "shape": "rectangle",
            "style": "filled,rounded",
            "color": _refine_color(color),
            "fillcolor": _refine_color(fill_color),
        },
    )


def _draw_task_group(
    task_group: TaskGroup, parent_graph: graphviz.Digraph, states_by_task_id: dict[str, str] | None
) -> None:
    """Draw the given task_group and its children on the given parent_graph"""
    # Draw joins
    if task_group.upstream_group_ids or task_group.upstream_task_ids:
        parent_graph.node(
            task_group.upstream_join_id,
            _attributes={
                "label": "",
                "shape": "circle",
                "style": "filled,rounded",
                "color": _refine_color(task_group.ui_fgcolor),
                "fillcolor": _refine_color(task_group.ui_color),
                "width": "0.2",
                "height": "0.2",
            },
        )

    if task_group.downstream_group_ids or task_group.downstream_task_ids:
        parent_graph.node(
            task_group.downstream_join_id,
            _attributes={
                "label": "",
                "shape": "circle",
                "style": "filled,rounded",
                "color": _refine_color(task_group.ui_fgcolor),
                "fillcolor": _refine_color(task_group.ui_color),
                "width": "0.2",
                "height": "0.2",
            },
        )

    # Draw children
    for child in sorted(task_group.children.values(), key=lambda t: t.node_id if t.node_id else ""):
        _draw_nodes(child, parent_graph, states_by_task_id)


def _draw_nodes(
    node: DependencyMixin, parent_graph: graphviz.Digraph, states_by_task_id: dict[str, str] | None
) -> None:
    """Draw the node and its children on the given parent_graph recursively."""
    if isinstance(node, BaseOperator) or isinstance(node, MappedOperator):
        _draw_task(node, parent_graph, states_by_task_id)
    else:
        if not isinstance(node, TaskGroup):
            raise AirflowException(f"The node {node} should be TaskGroup and is not")
        # Draw TaskGroup
        if node.is_root:
            # No need to draw background for root TaskGroup.
            _draw_task_group(node, parent_graph, states_by_task_id)
        else:
            with parent_graph.subgraph(name=f"cluster_{node.group_id}") as sub:
                sub.attr(
                    shape="rectangle",
                    style="filled",
                    color=_refine_color(node.ui_fgcolor),
                    # Partially transparent CornflowerBlue
                    fillcolor="#6495ed7f",
                    label=node.label,
                )
                _draw_task_group(node, sub, states_by_task_id)


def render_dag_dependencies(deps: dict[str, list[DagDependency]]) -> graphviz.Digraph:
    """
    Renders the DAG dependency to the DOT object.

    :param deps: List of DAG dependencies
    :return: Graphviz object
    :rtype: graphviz.Digraph
    """
    dot = graphviz.Digraph(graph_attr={"rankdir": "LR"})

    for dag, dependencies in deps.items():
        for dep in dependencies:
            with dot.subgraph(
                name=dag,
                graph_attr={
                    "rankdir": "LR",
                    "labelloc": "t",
                    "label": dag,
                },
            ) as dep_subgraph:
                dep_subgraph.edge(dep.source, dep.dependency_id)
                dep_subgraph.edge(dep.dependency_id, dep.target)

    return dot


def render_dag(dag: DAG, tis: list[TaskInstance] | None = None) -> graphviz.Digraph:
    """
    Renders the DAG object to the DOT object.

    If an task instance list is passed, the nodes will be painted according to task statuses.

    :param dag: DAG that will be rendered.
    :param tis: List of task instances
    :return: Graphviz object
    :rtype: graphviz.Digraph
    """
    dot = graphviz.Digraph(
        dag.dag_id,
        graph_attr={
            "rankdir": dag.orientation if dag.orientation else "LR",
            "labelloc": "t",
            "label": dag.dag_id,
        },
    )
    states_by_task_id = None
    if tis is not None:
        states_by_task_id = {ti.task_id: ti.state for ti in tis}

    _draw_nodes(dag.task_group, dot, states_by_task_id)

    for edge in dag_edges(dag):
        # Gets an optional label for the edge; this will be None if none is specified.
        label = dag.get_edge_info(edge["source_id"], edge["target_id"]).get("label")
        # Add the edge to the graph with optional label
        # (we can just use the maybe-None label variable directly)
        dot.edge(edge["source_id"], edge["target_id"], label)

    return dot

相关信息

airflow 源码目录

相关文章

airflow init 源码

airflow airflow_flask_app 源码

airflow cli 源码

airflow cli_action_loggers 源码

airflow code_utils 源码

airflow compression 源码

airflow configuration 源码

airflow context 源码

airflow dag_edges 源码

airflow dag_parsing_context 源码

0  赞