airflow state 源码
airflow state 代码
文件路径:/airflow/utils/state.py
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from enum import Enum
from airflow.settings import STATE_COLORS
class TaskInstanceState(str, Enum):
"""
Enum that represents all possible states that a Task Instance can be in.
Note that None is also allowed, so always use this in a type hint with Optional.
"""
# The scheduler sets a TaskInstance state to None when it's created but not
# yet run, but we don't list it here since TaskInstance is a string enum.
# Use None instead if need this state.
# Set by the scheduler
REMOVED = "removed" # Task vanished from DAG before it ran
SCHEDULED = "scheduled" # Task should run and will be handed to executor soon
# Set by the task instance itself
QUEUED = "queued" # Executor has enqueued the task
RUNNING = "running" # Task is executing
SUCCESS = "success" # Task completed
SHUTDOWN = "shutdown" # External request to shut down (e.g. marked failed when running)
RESTARTING = "restarting" # External request to restart (e.g. cleared when running)
FAILED = "failed" # Task errored out
UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed
SKIPPED = "skipped" # Skipped by branching or some other mechanism
DEFERRED = "deferred" # Deferrable operator waiting on a trigger
def __str__(self) -> str:
return self.value
class DagRunState(str, Enum):
"""
Enum that represents all possible states that a DagRun can be in.
These are "shared" with TaskInstanceState in some parts of the code,
so please ensure that their values always match the ones with the
same name in TaskInstanceState.
"""
QUEUED = "queued"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
def __str__(self) -> str:
return self.value
class State:
"""
Static class with task instance state constants and color methods to
avoid hardcoding.
"""
# Backwards-compat constants for code that does not yet use the enum
# These first three are shared by DagState and TaskState
SUCCESS = TaskInstanceState.SUCCESS
RUNNING = TaskInstanceState.RUNNING
FAILED = TaskInstanceState.FAILED
# These are TaskState only
NONE = None
REMOVED = TaskInstanceState.REMOVED
SCHEDULED = TaskInstanceState.SCHEDULED
QUEUED = TaskInstanceState.QUEUED
SHUTDOWN = TaskInstanceState.SHUTDOWN
RESTARTING = TaskInstanceState.RESTARTING
UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
SKIPPED = TaskInstanceState.SKIPPED
DEFERRED = TaskInstanceState.DEFERRED
task_states: tuple[TaskInstanceState | None, ...] = (None,) + tuple(TaskInstanceState)
dag_states: tuple[DagRunState, ...] = (
DagRunState.QUEUED,
DagRunState.SUCCESS,
DagRunState.RUNNING,
DagRunState.FAILED,
)
state_color: dict[TaskInstanceState | None, str] = {
None: 'lightblue',
TaskInstanceState.QUEUED: 'gray',
TaskInstanceState.RUNNING: 'lime',
TaskInstanceState.SUCCESS: 'green',
TaskInstanceState.SHUTDOWN: 'blue',
TaskInstanceState.RESTARTING: 'violet',
TaskInstanceState.FAILED: 'red',
TaskInstanceState.UP_FOR_RETRY: 'gold',
TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise',
TaskInstanceState.UPSTREAM_FAILED: 'orange',
TaskInstanceState.SKIPPED: 'hotpink',
TaskInstanceState.REMOVED: 'lightgrey',
TaskInstanceState.SCHEDULED: 'tan',
TaskInstanceState.DEFERRED: 'mediumpurple',
}
state_color.update(STATE_COLORS) # type: ignore
@classmethod
def color(cls, state):
"""Returns color for a state."""
return cls.state_color.get(state, 'white')
@classmethod
def color_fg(cls, state):
"""Black&white colors for a state."""
color = cls.color(state)
if color in ['green', 'red']:
return 'white'
return 'black'
running: frozenset[TaskInstanceState] = frozenset([TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED])
"""
A list of states indicating that a task is being executed.
"""
finished: frozenset[TaskInstanceState] = frozenset(
[
TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED,
TaskInstanceState.SKIPPED,
TaskInstanceState.UPSTREAM_FAILED,
TaskInstanceState.REMOVED,
]
)
"""
A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no
further action.
Note that the attempt could have resulted in failure or have been
interrupted; or perhaps never run at all (skip, or upstream_failed) in any
case, it is no longer running.
"""
unfinished: frozenset[TaskInstanceState | None] = frozenset(
[
None,
TaskInstanceState.SCHEDULED,
TaskInstanceState.QUEUED,
TaskInstanceState.RUNNING,
TaskInstanceState.SHUTDOWN,
TaskInstanceState.RESTARTING,
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.UP_FOR_RESCHEDULE,
TaskInstanceState.DEFERRED,
]
)
"""
A list of states indicating that a task either has not completed
a run or has not even started.
"""
failed_states: frozenset[TaskInstanceState] = frozenset(
[TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
)
"""
A list of states indicating that a task or dag is a failed state.
"""
success_states: frozenset[TaskInstanceState] = frozenset(
[TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
)
"""
A list of states indicating that a task or dag is a success state.
"""
terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING])
"""
A list of states indicating that a task has been terminated.
"""
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦