Airflow Integration Guide
Baselinr provides comprehensive integration with Apache Airflow 2.x to enable scalable profiling and drift detection within your Airflow workflows.
Overview
The Airflow integration includes:
- Operators:
BaselinrProfileOperator,BaselinrDriftOperator,BaselinrQueryOperator - RCA Collector: Automatic collection of Airflow DAG run metadata for root cause analysis
- XCom Support: Results are automatically passed via XCom for downstream tasks
Installation
Install Baselinr with Airflow support:
pip install baselinr[airflow]
Or install Airflow separately:
pip install baselinr
pip install apache-airflow>=2.0.0,<3.0.0
Quick Start
Basic Profiling DAG
from airflow import DAG
from airflow.utils.dates import days_ago
from baselinr.integrations.airflow import BaselinrProfileOperator
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"retries": 1,
}
dag = DAG(
"baselinr_profiling",
default_args=default_args,
description="Basic Baselinr profiling",
schedule_interval="@daily",
start_date=days_ago(1),
catchup=False,
)
profile_task = BaselinrProfileOperator(
task_id="profile_tables",
config_path="/path/to/baselinr/config.yml",
dag=dag,
)
Operators
BaselinrProfileOperator
Runs Baselinr profiling and returns results via XCom.
Parameters:
config_path(str, optional): Path to Baselinr configuration fileconfig(dict, optional): Baselinr configuration dictionary (alternative to config_path)table_patterns(list, optional): List of table patterns to profile. If not provided, uses patterns from config.dry_run(bool): If True, profile but don't write to storage (default: False)- Standard Airflow operator parameters (
task_id,dag, etc.)
XCom Return Value:
{
"run_ids": ["run-123", "run-456"],
"tables_profiled": ["customers", "orders"],
"tables_count": 2,
"results_count": 2,
}
Example:
profile_task = BaselinrProfileOperator(
task_id="profile_customers",
config_path="/path/to/config.yml",
table_patterns=[{"table": "customers", "schema": "public"}],
dag=dag,
)
BaselinrDriftOperator
Detects drift between profiling runs and optionally fails the DAG on drift.
Parameters:
config_path(str, optional): Path to Baselinr configuration fileconfig(dict, optional): Baselinr configuration dictionarydataset_name(str, required): Name of the dataset/table to checkschema_name(str, optional): Schema namebaseline_run_id(str, optional): Run ID to use as baseline (default: auto-selected)current_run_id(str, optional): Run ID to compare (default: latest run)fail_on_drift(bool): If True, raise exception if any drift is detected (default: False)fail_on_severity(str, optional): Only fail on drift of this severity or higher (low/medium/high)
XCom Return Value:
{
"has_drift": True,
"column_drifts_count": 3,
"schema_changes_count": 0,
"severity_counts": {"low": 1, "medium": 1, "high": 1},
"drift_report": {...},
}
Example:
drift_task = BaselinrDriftOperator(
task_id="check_drift",
config_path="/path/to/config.yml",
dataset_name="customers",
fail_on_severity="high", # Only fail on high severity drift
dag=dag,
)
BaselinrQueryOperator
Queries Baselinr metadata (runs, drift events, table history, etc.).
Parameters:
config_path(str, optional): Path to Baselinr configuration fileconfig(dict, optional): Baselinr configuration dictionaryquery_type(str): Type of query - one of:runs,drift,table_history,run_details- Query-specific parameters (passed as kwargs):
- For
runs:schema,table,status,environment,days,limit,offset - For
drift:table,schema,severity,days,limit,offset - For
table_history:table(required),schema,days,limit - For
run_details:run_id(required),dataset_name
- For
Example:
query_task = BaselinrQueryOperator(
task_id="query_recent_runs",
config_path="/path/to/config.yml",
query_type="runs",
days=7,
limit=50,
dag=dag,
)
Common Patterns
Pattern 1: Profile Then Check Drift
from baselinr.integrations.airflow import (
BaselinrProfileOperator,
BaselinrDriftOperator,
)
# Profile tables
profile_task = BaselinrProfileOperator(
task_id="profile_tables",
config_path="/path/to/config.yml",
dag=dag,
)
# Check drift for critical tables
drift_check = BaselinrDriftOperator(
task_id="check_drift",
config_path="/path/to/config.yml",
dataset_name="customers",
fail_on_severity="high",
dag=dag,
)
profile_task >> drift_check
Pattern 2: Integration with dbt
from airflow.operators.bash import BashOperator
from baselinr.integrations.airflow import BaselinrProfileOperator
# Run dbt
dbt_run = BashOperator(
task_id="dbt_run",
bash_command="cd /path/to/dbt && dbt run",
dag=dag,
)
# Profile dbt models
profile_dbt = BaselinrProfileOperator(
task_id="profile_dbt_models",
config_path="/path/to/config.yml",
table_patterns=[{"schema": "analytics", "select_schema": True}],
dag=dag,
)
dbt_run >> profile_dbt
Pattern 3: Dynamic Task Generation
tables_to_profile = ["customers", "orders", "products"]
profiling_tasks = []
for table in tables_to_profile:
task = BaselinrProfileOperator(
task_id=f"profile_{table}",
config_path="/path/to/config.yml",
table_patterns=[{"table": table}],
dag=dag,
)
profiling_tasks.append(task)
Pattern 4: Scheduled Monitoring
dag = DAG(
"baselinr_monitoring",
schedule_interval="0 */6 * * *", # Every 6 hours
start_date=days_ago(1),
catchup=False,
)
# Query recent drift events
query_drift = BaselinrQueryOperator(
task_id="query_drift",
config_path="/path/to/config.yml",
query_type="drift",
days=7,
severity="high",
dag=dag,
)
# Check specific table
check_drift = BaselinrDriftOperator(
task_id="check_critical_drift",
config_path="/path/to/config.yml",
dataset_name="customers",
fail_on_severity="high",
dag=dag,
)
query_drift >> check_drift
RCA Collector Configuration
The Airflow RCA collector automatically collects DAG run metadata for root cause analysis. Configure it in your Baselinr config:
rca:
enabled: true
collectors:
airflow: true
airflow_api_url: "http://localhost:8080/api/v1"
airflow_api_version: "v1" # or "v2"
airflow_username: "admin" # Optional
airflow_password: "admin" # Optional
airflow_metadata_db_connection: "postgresql://user:pass@localhost/airflow" # Optional
airflow_dag_ids: # Optional: filter specific DAGs
- "my_dag"
- "another_dag"
Collection Methods
The collector supports three methods (tries each in order):
- REST API: Uses Airflow REST API (v1 or v2)
- Metadata Database: Direct SQL access to Airflow's metadata database
- Environment Variables: For Airflow Cloud/Managed environments
Environment Variables
The collector can also be configured via environment variables:
AIRFLOW_API_URL: Airflow API URLAIRFLOW_API_VERSION: API version (v1 or v2)AIRFLOW_USERNAME: Username for API authAIRFLOW_PASSWORD: Password for API authAIRFLOW_METADATA_DB_CONNECTION: Database connection stringAIRFLOW_CTX_DAG_ID: Current DAG ID (set by Airflow)AIRFLOW_CTX_RUN_ID: Current run ID (set by Airflow)
Best Practices
1. Use Config Files
Store Baselinr configuration in version-controlled YAML files rather than passing dicts:
# Good
BaselinrProfileOperator(
task_id="profile",
config_path="/path/to/config.yml",
dag=dag,
)
# Less ideal
BaselinrProfileOperator(
task_id="profile",
config={"source": {...}, "storage": {...}}, # Hard to maintain
dag=dag,
)
2. Fail on High Severity Only
Use fail_on_severity instead of fail_on_drift to avoid failing on minor drifts:
BaselinrDriftOperator(
task_id="check_drift",
dataset_name="customers",
fail_on_severity="high", # Only fail on high severity
dag=dag,
)
3. Use XCom for Downstream Tasks
Access profiling results in downstream tasks:
def process_profiling_results(**context):
ti = context["ti"]
profiling_summary = ti.xcom_pull(task_ids="profile_tables")
run_ids = profiling_summary["run_ids"]
# Process results...
process_task = PythonOperator(
task_id="process_results",
python_callable=process_profiling_results,
dag=dag,
)
profile_task >> process_task
4. Separate Profiling and Drift Detection
Run profiling and drift detection in separate tasks for better observability:
profile_task = BaselinrProfileOperator(...)
drift_task = BaselinrDriftOperator(...)
profile_task >> drift_task
5. Use Dynamic Task Generation for Many Tables
For large numbers of tables, use dynamic task generation:
tables = get_tables_from_config() # Your function
tasks = [
BaselinrProfileOperator(
task_id=f"profile_{table}",
table_patterns=[{"table": table}],
...
)
for table in tables
]
Troubleshooting
ImportError: Airflow is not installed
Install Airflow:
pip install apache-airflow>=2.0.0,<3.0.0
Operator fails with "Provide either config_path or config"
You must provide exactly one of config_path or config:
# Correct
BaselinrProfileOperator(config_path="/path/to/config.yml", ...)
# Also correct
BaselinrProfileOperator(config={"source": {...}}, ...)
# Wrong
BaselinrProfileOperator(config_path="...", config={...}, ...) # Both provided
BaselinrProfileOperator(...) # Neither provided
RCA Collector not collecting runs
Check:
- Airflow API is accessible:
curl http://localhost:8080/api/v1/health - Credentials are correct (if using auth)
- DAG IDs filter is not excluding your DAGs
- Check collector logs for errors
XCom data too large
If profiling results are too large for XCom:
- Use
dry_run=Trueto test without storing results - Store results in external storage and pass references via XCom
- Use
BaselinrQueryOperatorto query results instead of passing via XCom
Examples
See examples/airflow_dag_example.py for comprehensive examples including:
- Basic profiling DAG
- Profiling with drift detection
- Scheduled profiling with alerts
- Integration with dbt
- Multi-table profiling with dynamic task generation
- Query and monitor DAG