A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html below:

Cloud-Native Workflows with Object Storage — Airflow 3.0.4 Documentation

Cloud-Native Workflows with Object Storage¶

Added in version 2.8.

Welcome to the final tutorial in our Airflow series! By now, you’ve built DAGs with Python and the Taskflow API, passed data with XComs, and chained tasks together into clear, reusable workflows.

In this tutorial we’ll take it a step further by introducing the Object Storage API. This API makes it easier to read from and write to cloud storage – like Amazon S3, Google Cloud Storage (GCS), or Azure Blob Storage – without having to worry about provider-specific SDKs or low-level credentials management.

We’ll walk you through a real-world use case:

  1. Pulling data from a public API

  2. Saving that data to object storage in Parquet format

  3. Analyzing it using SQL with DuckDB

Along the way, we’ll highlight the new ObjectStoragePath abstraction, explain how Airflow handles cloud credentials via connections, and show how this enables portable, cloud-agnostic pipelines.

Why This Matters¶

Many data workflows depend on files – whether it’s raw CSVs, intermediate Parquet files, or model artifacts. Traditionally, you’d need to write S3-specific or GCS-specific code for this. Now, with ObjectStoragePath, you can write generic code that works across providers, as long as you’ve configured the right Airflow connection.

Let’s get started!

Prerequisites¶

Before diving in, make sure you have the following:

Creating an ObjectStoragePath¶

At the heart of this tutorial is ObjectStoragePath, a new abstraction for handling paths on cloud object stores. Think of it like pathlib.Path, but for buckets instead of filesystems.

base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")

The URL syntax is simple: protocol://bucket/path/to/file

You can also provide the conn_id as keyword argument for clarity:

ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")

This is especially handy when reusing a path defined elsewhere (like in an Asset), or when the connection isn’t baked into the URL. The keyword argument always takes precedence.

Tip

You can safely create an ObjectStoragePath in your global DAG scope. Connections are resolved only when the path is used, not when it’s created.

Saving Data to Object Storage¶

Let’s fetch some data and save it to the cloud.

    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        logical_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": logical_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = logical_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path

Here’s what’s happening:

This is a classic Taskflow pattern. The object key changes each day, allowing us to run this daily and build a dataset over time. We return the final object path to be used in the next task.

Why this is cool: No boto3, no GCS client setup, no credentials juggling. Just simple file semantics that work across storage backends.

Analyzing the Data with DuckDB¶

Now let’s analyze that data using SQL with DuckDB.

    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())

A few key things to note:

Notice that the function doesn’t recreate the path manually – it gets the full path from the previous task using Xcom. This makes the task portable and decoupled from earlier logic.

Bringing It All Together¶

Here’s the full DAG that ties everything together:


import pendulum
import requests

from airflow.sdk import ObjectStoragePath, dag, task

API = "https://opendata.fmi.fi/timeseries"

aq_fields = {
    "fmisid": "int32",
    "time": "datetime64[ns]",
    "AQINDEX_PT1H_avg": "float64",
    "PM10_PT1H_avg": "float64",
    "PM25_PT1H_avg": "float64",
    "O3_PT1H_avg": "float64",
    "CO_PT1H_avg": "float64",
    "SO2_PT1H_avg": "float64",
    "NO2_PT1H_avg": "float64",
    "TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_objectstorage():
    """
    ### Object Storage Tutorial Documentation
    This is a tutorial DAG to showcase the usage of the Object Storage API.
    Documentation that goes along with the Airflow Object Storage tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
    """
    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        logical_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": logical_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = logical_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path
    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())
    obj_path = get_air_quality_data()
    analyze(obj_path)
tutorial_objectstorage()

You can trigger this DAG and view it in the Graph View in the Airflow UI. Each task logs its inputs and outputs clearly, and you can inspect returned paths in the Xcom tab.

What to Explore Next¶

Here are some ways to take this further:

See Also


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4