Event and Alert Hook System
Baselinr includes a lightweight, pluggable event emission and alert hook system that allows you to react to runtime events such as data drift detection, schema changes, and profiling lifecycle events.
Table of Contents
- Overview
- Core Concepts
- Event Types
- Built-in Hooks
- Configuration
- Custom Hooks
- Usage Examples
- Best Practices
Overview
The event system enables Baselinr to:
- Emit events during profiling and drift detection operations
- Process events through multiple registered hooks
- Alert or persist events based on your configuration
- Remain orchestration-agnostic - the core tool has no external dependencies
Events are emitted in-memory and dispatched synchronously to all registered hooks. Hooks can fail independently without stopping other hooks or the profiling process.
Core Concepts
EventBus
The EventBus is the central component that manages hook registration and event emission:
from baselinr.events import EventBus, LoggingAlertHook
# Create event bus
bus = EventBus()
# Register hooks
bus.register(LoggingAlertHook())
# Emit event
bus.emit(event)
Events
Events are dataclasses that represent specific occurrences in the profiling lifecycle:
from baselinr.events import DataDriftDetected
from datetime import datetime
event = DataDriftDetected(
event_type="DataDriftDetected",
timestamp=datetime.utcnow(),
table="users",
column="age",
metric="mean",
baseline_value=30.5,
current_value=35.2,
change_percent=15.4,
drift_severity="high",
metadata={}
)
Alert Hooks
Hooks are objects that implement the AlertHook protocol:
from baselinr.events import AlertHook, BaseEvent
class MyCustomHook:
def handle_event(self, event: BaseEvent) -> None:
# Process the event
print(f"Event: {event.event_type}")
Event Types
DataDriftDetected
Emitted when data drift is detected between profiling runs:
Fields:
table: Table namecolumn: Column namemetric: Metric name (e.g., "mean", "count", "null_percent")baseline_value: Value from baseline runcurrent_value: Value from current runchange_percent: Percentage changedrift_severity: Severity level ("low", "medium", "high")
Example:
DataDriftDetected(
event_type="DataDriftDetected",
timestamp=datetime.utcnow(),
table="orders",
column="total_amount",
metric="mean",
baseline_value=100.50,
current_value=150.75,
change_percent=50.0,
drift_severity="high",
metadata={}
)
SchemaChangeDetected
Emitted when schema changes are detected:
Fields:
table: Table namechange_type: Type of change ("column_added", "column_removed", "type_changed")column: Column name (if applicable)old_type: Previous data type (for type changes)new_type: New data type (for type changes)
Example:
SchemaChangeDetected(
event_type="SchemaChangeDetected",
timestamp=datetime.utcnow(),
table="users",
change_type="column_added",
column="email",
metadata={}
)
ProfilingStarted
Emitted when profiling begins for a table:
Fields:
table: Table namerun_id: Unique run identifier
ProfilingCompleted
Emitted when profiling completes successfully:
Fields:
table: Table namerun_id: Unique run identifierrow_count: Number of rows profiledcolumn_count: Number of columns profiledduration_seconds: Profiling duration
ProfilingFailed
Emitted when profiling fails:
Fields:
table: Table namerun_id: Unique run identifiererror: Error message
Built-in Hooks
LoggingAlertHook
Logs events to stdout using Python's logging module:
hooks:
enabled: true
hooks:
- type: logging
log_level: INFO
Use Case: Development, debugging, simple monitoring
SQLEventHook
Persists events to any SQL database (Postgres, MySQL, SQLite):
hooks:
enabled: true
hooks:
- type: sql
table_name: baselinr_events
connection:
type: postgres
host: localhost
port: 5432
database: monitoring
username: user
password: pass
Use Case: Historical event tracking, audit trails
SnowflakeEventHook
Persists events to Snowflake with VARIANT support for metadata:
hooks:
enabled: true
hooks:
- type: snowflake
table_name: baselinr_events
connection:
type: snowflake
account: myaccount
database: monitoring
warehouse: compute_wh
username: user
password: pass
Use Case: Enterprise data warehousing, Snowflake-native monitoring
SlackAlertHook
Sends formatted alerts to Slack channels when drift or other events occur:
hooks:
enabled: true
hooks:
- type: slack
webhook_url: ${SLACK_WEBHOOK_URL}
channel: "#data-alerts"
username: "Baselinr Bot"
min_severity: medium
alert_on_drift: true
alert_on_schema_change: true
alert_on_profiling_failure: true
timeout: 10
Use Case: Real-time team notifications, incident response, monitoring dashboards
Features:
- Severity-based filtering (low, medium, high)
- Rich formatted messages with color coding
- Separate alerts for drift, schema changes, and failures
- Configurable channel and username
- Environment variable support for webhook URLs
Configuration
Master Switch
Enable or disable all hooks with the master switch:
hooks:
enabled: true # Set to false to disable all hooks
hooks:
# ... hook configurations
Multiple Hooks
You can register multiple hooks that will all receive events:
hooks:
enabled: true
hooks:
# Log all events
- type: logging
log_level: INFO
# Persist critical events to database
- type: sql
table_name: baselinr_events
connection:
type: postgres
host: localhost
database: monitoring
username: user
password: pass
# Send to custom webhook
- type: custom
module: my_hooks
class_name: WebhookAlertHook
params:
webhook_url: https://api.example.com/alerts
Selective Enablement
Disable individual hooks without removing their configuration:
hooks:
enabled: true
hooks:
- type: logging
enabled: true # Active
- type: sql
enabled: false # Temporarily disabled
connection:
# ... config preserved
Custom Hooks
Create custom hooks by implementing the AlertHook protocol:
1. Create Hook Class
# my_hooks.py
from baselinr.events import BaseEvent
import requests
class WebhookAlertHook:
"""Send events to a webhook endpoint."""
def __init__(self, webhook_url: str, auth_token: str = None):
self.webhook_url = webhook_url
self.auth_token = auth_token
def handle_event(self, event: BaseEvent) -> None:
"""Send event to webhook."""
headers = {}
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
payload = {
"event_type": event.event_type,
"timestamp": event.timestamp.isoformat(),
"metadata": event.metadata
}
try:
response = requests.post(
self.webhook_url,
json=payload,
headers=headers,
timeout=5
)
response.raise_for_status()
except Exception as e:
# Log but don't fail - hook failures shouldn't stop profiling
print(f"Webhook alert failed: {e}")
2. Configure Hook
hooks:
enabled: true
hooks:
- type: custom
module: my_hooks
class_name: WebhookAlertHook
params:
webhook_url: https://api.example.com/alerts
auth_token: ${WEBHOOK_AUTH_TOKEN} # Use environment variable
3. Advanced Custom Hook Example
# my_hooks.py
from baselinr.events import BaseEvent, DataDriftDetected
import smtplib
from email.mime.text import MIMEText
class EmailAlertHook:
"""Send email alerts for high-severity drift."""
def __init__(self, smtp_host: str, smtp_port: int,
from_email: str, to_emails: list,
severity_threshold: str = "high"):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.from_email = from_email
self.to_emails = to_emails
self.severity_threshold = severity_threshold
def handle_event(self, event: BaseEvent) -> None:
"""Send email for high-severity drift events."""
# Only handle drift events
if not isinstance(event, DataDriftDetected):
return
# Only alert on high severity (or higher)
if event.drift_severity != self.severity_threshold:
return
# Compose email
subject = f"[ALERT] Data Drift Detected: {event.table}.{event.column}"
body = f"""
High-severity data drift detected:
Table: {event.table}
Column: {event.column}
Metric: {event.metric}
Baseline: {event.baseline_value}
Current: {event.current_value}
Change: {event.change_percent:+.2f}%
Severity: {event.drift_severity.upper()}
"""
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = self.from_email
msg["To"] = ", ".join(self.to_emails)
try:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.send_message(msg)
except Exception as e:
print(f"Email alert failed: {e}")
Usage Examples
Example 1: Basic Logging
# config.yml
hooks:
enabled: true
hooks:
- type: logging
log_level: INFO
baselinr profile --config config.yml
Output:
[ALERT] ProfilingStarted: {'table': 'users', 'run_id': '...'}
[ALERT] ProfilingCompleted: {'table': 'users', 'row_count': 1000, ...}
Example 2: Database Persistence
# config.yml
hooks:
enabled: true
hooks:
- type: sql
table_name: baselinr_events
connection:
type: sqlite
database: monitoring
filepath: ./monitoring.db
Events are automatically stored in the baselinr_events table:
SELECT * FROM baselinr_events
WHERE event_type = 'DataDriftDetected'
ORDER BY timestamp DESC;
Example 3: Multi-Environment Setup
# config-dev.yml
hooks:
enabled: true
hooks:
- type: logging # Dev: just log
log_level: DEBUG
# config-prod.yml
hooks:
enabled: true
hooks:
- type: logging # Still log
log_level: WARNING
- type: snowflake # Persist in production
table_name: prod.monitoring.baselinr_events
connection:
type: snowflake
account: ${SNOWFLAKE_ACCOUNT}
database: monitoring
warehouse: compute_wh
username: ${SNOWFLAKE_USER}
password: ${SNOWFLAKE_PASSWORD}
Example 4: Slack Alerts for Drift Detection
Configure Slack alerts using the built-in SlackAlertHook:
# config.yml
hooks:
enabled: true
hooks:
- type: slack
webhook_url: ${SLACK_WEBHOOK_URL}
channel: "#data-alerts"
username: "Baselinr Bot"
min_severity: medium
alert_on_drift: true
alert_on_schema_change: true
alert_on_profiling_failure: true
Or use it programmatically:
from baselinr.events import EventBus, SlackAlertHook
from baselinr.drift import DriftDetector
import os
# Create event bus with Slack hook
bus = EventBus()
bus.register(SlackAlertHook(
webhook_url=os.getenv("SLACK_WEBHOOK_URL"),
channel="#data-alerts",
min_severity="medium"
))
# Run drift detection with Slack alerts
detector = DriftDetector(
storage_config=storage_config,
drift_config=drift_config,
event_bus=bus
)
report = detector.detect_drift("orders")
# Drift events automatically sent to Slack
See Slack Alerts Guide for detailed setup instructions.
Best Practices
1. Use Master Switch for Environments
Enable hooks in production, disable in development:
hooks:
enabled: ${ENABLE_HOOKS:-false} # Defaults to false
2. Handle Hook Failures Gracefully
Hooks should catch and log exceptions, not raise them:
def handle_event(self, event: BaseEvent) -> None:
try:
# Process event
self._send_alert(event)
except Exception as e:
logger.error(f"Alert failed: {e}")
# Don't re-raise - let profiling continue
3. Filter Events in Hooks
Don't process every event if you only care about specific types:
def handle_event(self, event: BaseEvent) -> None:
if not isinstance(event, DataDriftDetected):
return # Skip non-drift events
if event.drift_severity != "high":
return # Only alert on high severity
# Process high-severity drift
self._send_alert(event)
4. Use Async for External Calls
For production systems with external APIs, consider async:
import asyncio
import aiohttp
class AsyncWebhookHook:
def handle_event(self, event: BaseEvent) -> None:
# Fire and forget
asyncio.create_task(self._send_async(event))
async def _send_async(self, event: BaseEvent):
async with aiohttp.ClientSession() as session:
await session.post(self.webhook_url, json=event.to_dict())
5. Monitor Hook Performance
Log hook execution time for debugging:
import time
def handle_event(self, event: BaseEvent) -> None:
start = time.time()
try:
self._process_event(event)
finally:
duration = time.time() - start
logger.debug(f"Hook processed event in {duration:.3f}s")
6. Use SQL Schema for Persistence
When using SQL/Snowflake hooks, create the events table first:
-- Run this before enabling SQL hooks
CREATE TABLE baselinr_events (
event_id VARCHAR(36) PRIMARY KEY,
event_type VARCHAR(100) NOT NULL,
table_name VARCHAR(255),
column_name VARCHAR(255),
metric_name VARCHAR(100),
baseline_value FLOAT,
current_value FLOAT,
change_percent FLOAT,
drift_severity VARCHAR(20),
timestamp TIMESTAMP NOT NULL,
metadata TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
See baselinr/storage/schema.sql and baselinr/storage/schema_snowflake.sql for full schemas.
Integration with Orchestrators
Dagster
# dagster_repository.py
from baselinr.events import EventBus, LoggingAlertHook
from baselinr.profiling import ProfileEngine
@asset
def profile_users():
config = load_config()
# Create event bus
bus = EventBus()
bus.register(LoggingAlertHook())
# Profile with events
engine = ProfileEngine(config, event_bus=bus)
results = engine.profile()
return results
Airflow
# dags/profiling_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from baselinr.events import EventBus, SQLEventHook
from baselinr.profiling import ProfileEngine
def profile_with_alerts():
config = load_config()
# Register hooks
bus = EventBus()
bus.register(SQLEventHook(engine=get_airflow_db_engine()))
engine = ProfileEngine(config, event_bus=bus)
engine.profile()
dag = DAG('profile_users', schedule_interval='@daily')
task = PythonOperator(
task_id='profile',
python_callable=profile_with_alerts,
dag=dag
)
Troubleshooting
Hooks Not Firing
- Check
hooks.enabledis set totrue - Verify individual hook
enabledfields - Check logs for hook registration messages
Event Persistence Failing
- Ensure the
baselinr_eventstable exists - Verify database connection configuration
- Check database user has INSERT permissions
- Review hook error logs
Custom Hook Not Loading
- Verify module path is correct and importable
- Check class name spelling
- Ensure hook implements
handle_event(event: BaseEvent)method - Review import errors in logs
Future Enhancements
Planned improvements to the event system:
- Event Filtering: Configure which events each hook receives
- Async Hooks: Native async/await support for non-blocking hooks
- Event Batching: Batch multiple events for efficient persistence
- Retry Logic: Automatic retry for failed hook executions
- Rate Limiting: Prevent alert fatigue with configurable limits
- Event Streaming: Kafka/Kinesis integration for event streams