Use ShortCircuitOperator with exclusive mode to stop downstream tasks if a certain key’s value doesn’t meet a threshold:
Configuring the Object Storage XCom Backend is a straightforward process involving environment variables or modifications to your airflow.cfg file.
Push only once, never overwrite a key. Use execution_date + task_id as part of the key. Enable exclusive mode to prevent accidental re-push. airflow xcom exclusive
from typing import Any from airflow.models.xcom import BaseXCom from airflow.providers.amazon.aws.hooks.s3 import S3Hook import pandas as pd import uuid class S3XComBackend(BaseXCom): PREFIX = "s3://my-airflow-xcom-bucket/data/" @staticmethod def serialize(value: Any, **kwargs) -> str: if isinstance(value, pd.DataFrame): key = f"S3XComBackend.PREFIXuuid.uuid4().parquet" hook = S3Hook(aws_conn_id='aws_default') # Convert DataFrame to bytes file_buffer = value.to_parquet(index=False) # Upload to S3 hook.load_bytes(file_buffer, key=key, replace=True) # Return the reference path to be stored in the DB return BaseXCom.serialize(key) # Fallback to default serialization for small native data types return BaseXCom.serialize(value) @staticmethod def deserialize(result) -> Any: deserialized_value = BaseXCom.deserialize(result) # Check if the stored value is an S3 pointer if isinstance(deserialized_value, str) and deserialized_value.startswith(S3XComBackend.PREFIX): hook = S3Hook(aws_conn_id='aws_default') file_bytes = hook.read_key(deserialized_value) # Reconstruct the DataFrame return pd.read_parquet(file_bytes) return deserialized_value Use code with caution. Activating the Custom Backend
: This is the most important rule. Use XCom for metadata only. Enable exclusive mode to prevent accidental re-push
This is the most critical constraint. Because XComs live in the metadata database, they are .
Mastering Apache Airflow XComs: Managing Exclusive Data Exchange Use XCom for metadata only
What (PostgreSQL, MySQL, etc.) handles your metadata?
For MySQL, the effective per-row limit is about 64KB, which aligns with the 48KB recommendation to stay safely within database constraints.
While there is no single feature or official Airflow term known as "Airflow XCom Exclusive," the phrase typically refers to specific configurations or high-level design patterns within Airflow's cross-communication (XCom) system. Mutually Exclusive XCom Configurations
def clear(self, dag_id, task_id, run_id, **kwargs): """Custom cleanup logic""" # Delete stored XComs from your backend pass