Cloud-Native Workflows with Object Storage¶
Added in version 2.8.
Welcome to the final tutorial in our Airflow series! By now, youâve built DAGs with Python and the Taskflow API, passed data with XComs, and chained tasks together into clear, reusable workflows.
In this tutorial weâll take it a step further by introducing the Object Storage API. This API makes it easier to read from and write to cloud storage â like Amazon S3, Google Cloud Storage (GCS), or Azure Blob Storage â without having to worry about provider-specific SDKs or low-level credentials management.
Weâll walk you through a real-world use case:
Pulling data from a public API
Saving that data to object storage in Parquet format
Analyzing it using SQL with DuckDB
Along the way, weâll highlight the new ObjectStoragePath
abstraction, explain how Airflow handles cloud credentials via connections, and show how this enables portable, cloud-agnostic pipelines.
Many data workflows depend on files â whether itâs raw CSVs, intermediate Parquet files, or model artifacts. Traditionally, youâd need to write S3-specific or GCS-specific code for this. Now, with ObjectStoragePath
, you can write generic code that works across providers, as long as youâve configured the right Airflow connection.
Letâs get started!
Prerequisites¶Before diving in, make sure you have the following:
DuckDB, an in-process SQL database: Install with pip install duckdb
Amazon S3 access and Amazon Provider with s3fs: pip install apache-airflow-providers-amazon[s3fs]
(You can substitute your preferred provider by changing the storage URL protocol and installing the relevant provider.)
Pandas for working with tabular data: pip install pandas
At the heart of this tutorial is ObjectStoragePath
, a new abstraction for handling paths on cloud object stores. Think of it like pathlib.Path
, but for buckets instead of filesystems.
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")
The URL syntax is simple: protocol://bucket/path/to/file
The protocol
(like s3
, gs
or azure
) determines the backend
The âusernameâ part of the URL can be a conn_id
, telling Airflow how to authenticate
If the conn_id
is omitted, Airflow will fall back to the default connection for that backend
You can also provide the conn_id
as keyword argument for clarity:
ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
This is especially handy when reusing a path defined elsewhere (like in an Asset), or when the connection isnât baked into the URL. The keyword argument always takes precedence.
Tip
You can safely create an ObjectStoragePath
in your global DAG scope. Connections are resolved only when the path is used, not when itâs created.
Letâs fetch some data and save it to the cloud.
@task def get_air_quality_data(**kwargs) -> ObjectStoragePath: """ #### Get Air Quality Data This task gets air quality data from the Finnish Meteorological Institute's open data API. The data is saved as parquet. """ import pandas as pd logical_date = kwargs["logical_date"] start_time = kwargs["data_interval_start"] params = { "format": "json", "precision": "double", "groupareas": "0", "producer": "airquality_urban", "area": "Uusimaa", "param": ",".join(aq_fields.keys()), "starttime": start_time.isoformat(timespec="seconds"), "endtime": logical_date.isoformat(timespec="seconds"), "tz": "UTC", } response = requests.get(API, params=params) response.raise_for_status() # ensure the bucket exists base.mkdir(exist_ok=True) formatted_date = logical_date.format("YYYYMMDD") path = base / f"air_quality_{formatted_date}.parquet" df = pd.DataFrame(response.json()).astype(aq_fields) with path.open("wb") as file: df.to_parquet(file) return path
Hereâs whatâs happening:
We call a public API from the Finnish Meteorological Institute for Helsinki air quality data
The JSON response is parsed into a pandas DataFrame
We generate a filename based on the taskâs logical date
Using ObjectStoragePath
, we write the data directly to cloud storage as Parquet
This is a classic Taskflow pattern. The object key changes each day, allowing us to run this daily and build a dataset over time. We return the final object path to be used in the next task.
Why this is cool: No boto3, no GCS client setup, no credentials juggling. Just simple file semantics that work across storage backends.
Analyzing the Data with DuckDB¶Now letâs analyze that data using SQL with DuckDB.
@task def analyze(path: ObjectStoragePath, **kwargs): """ #### Analyze This task analyzes the air quality data, prints the results """ import duckdb conn = duckdb.connect(database=":memory:") conn.register_filesystem(path.fs) conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')") df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf() print(df2.head())
A few key things to note:
DuckDB supports reading Parquet natively
DuckDB and ObjectStoragePath both rely on fsspec
, which makes it easy to register the object storage backend
We use path.fs
to grab the right filesystem object and register it with DuckDB
Finally, we query the Parquet file using SQL and return a pandas DataFrame
Notice that the function doesnât recreate the path manually â it gets the full path from the previous task using Xcom. This makes the task portable and decoupled from earlier logic.
Bringing It All Together¶Hereâs the full DAG that ties everything together:
import pendulum import requests from airflow.sdk import ObjectStoragePath, dag, task API = "https://opendata.fmi.fi/timeseries" aq_fields = { "fmisid": "int32", "time": "datetime64[ns]", "AQINDEX_PT1H_avg": "float64", "PM10_PT1H_avg": "float64", "PM25_PT1H_avg": "float64", "O3_PT1H_avg": "float64", "CO_PT1H_avg": "float64", "SO2_PT1H_avg": "float64", "NO2_PT1H_avg": "float64", "TRSC_PT1H_avg": "float64", } base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/") @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) def tutorial_objectstorage(): """ ### Object Storage Tutorial Documentation This is a tutorial DAG to showcase the usage of the Object Storage API. Documentation that goes along with the Airflow Object Storage tutorial is located [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html) """ @task def get_air_quality_data(**kwargs) -> ObjectStoragePath: """ #### Get Air Quality Data This task gets air quality data from the Finnish Meteorological Institute's open data API. The data is saved as parquet. """ import pandas as pd logical_date = kwargs["logical_date"] start_time = kwargs["data_interval_start"] params = { "format": "json", "precision": "double", "groupareas": "0", "producer": "airquality_urban", "area": "Uusimaa", "param": ",".join(aq_fields.keys()), "starttime": start_time.isoformat(timespec="seconds"), "endtime": logical_date.isoformat(timespec="seconds"), "tz": "UTC", } response = requests.get(API, params=params) response.raise_for_status() # ensure the bucket exists base.mkdir(exist_ok=True) formatted_date = logical_date.format("YYYYMMDD") path = base / f"air_quality_{formatted_date}.parquet" df = pd.DataFrame(response.json()).astype(aq_fields) with path.open("wb") as file: df.to_parquet(file) return path @task def analyze(path: ObjectStoragePath, **kwargs): """ #### Analyze This task analyzes the air quality data, prints the results """ import duckdb conn = duckdb.connect(database=":memory:") conn.register_filesystem(path.fs) conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')") df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf() print(df2.head()) obj_path = get_air_quality_data() analyze(obj_path) tutorial_objectstorage()
You can trigger this DAG and view it in the Graph View in the Airflow UI. Each task logs its inputs and outputs clearly, and you can inspect returned paths in the Xcom tab.
What to Explore Next¶Here are some ways to take this further:
Use object sensors (like S3KeySensor
) to wait for files uploaded by external systems
Orchestrate S3-to-GCS transfers or cross-region data syncs
Add branching logic to handle missing or malformed files
Experiment with different formats like CSV or JSON
See Also
Learn how to securely access cloud services by configuring Airflow connections in the Managing Connections guide
Build event-driven pipelines that respond to file uploads or external triggers using the Event-Driven Scheduling framework
Reinforce your understanding of decorators, return values, and task chaining with the TaskFlow API guide
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4