airflow tutorial_taskflow_api_virtualenv 源码
airflow tutorial_taskflow_api_virtualenv 代码
文件路径:/airflow/example_dags/tutorial_taskflow_api_virtualenv.py
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import logging
import shutil
from datetime import datetime
from airflow.decorators import dag, task
log = logging.getLogger(__name__)
if not shutil.which("virtualenv"):
    log.warning("The tutorial_taskflow_api_virtualenv example DAG requires virtualenv, please install it.")
else:
    @dag(schedule=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
    def tutorial_taskflow_api_virtualenv():
        """
        ### TaskFlow API example using virtualenv
        This is a simple data pipeline example which demonstrates the use of
        the TaskFlow API using three simple tasks for Extract, Transform, and Load.
        """
        @task.virtualenv(
            use_dill=True,
            system_site_packages=False,
            requirements=['funcsigs'],
        )
        def extract():
            """
            #### Extract task
            A simple Extract task to get data ready for the rest of the data
            pipeline. In this case, getting data is simulated by reading from a
            hardcoded JSON string.
            """
            import json
            data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
            order_data_dict = json.loads(data_string)
            return order_data_dict
        @task(multiple_outputs=True)
        def transform(order_data_dict: dict):
            """
            #### Transform task
            A simple Transform task which takes in the collection of order data and
            computes the total order value.
            """
            total_order_value = 0
            for value in order_data_dict.values():
                total_order_value += value
            return {"total_order_value": total_order_value}
        @task()
        def load(total_order_value: float):
            """
            #### Load task
            A simple Load task which takes in the result of the Transform task and
            instead of saving it to end user review, just prints it out.
            """
            print(f"Total order value is: {total_order_value:.2f}")
        order_data = extract()
        order_summary = transform(order_data)
        load(order_summary["total_order_value"])
    tutorial_dag = tutorial_taskflow_api_virtualenv()
相关信息
相关文章
airflow example_bash_operator 源码
airflow example_branch_datetime_operator 源码
airflow example_branch_day_of_week_operator 源码
airflow example_branch_labels 源码
airflow example_branch_operator 源码
airflow example_branch_operator_decorator 源码
                        
                            0
                        
                        
                             赞
                        
                    
                    
                热门推荐
- 
                        2、 - 优质文章
- 
                        3、 gate.io
- 
                        8、 openharmony
- 
                        9、 golang