Arrow/Polars + HatiData: ML Feature Pipeline
Build a zero-copy ML feature pipeline that loads HatiData query results directly into Polars DataFrames via Apache Arrow IPC format.
What You'll Build
A zero-copy ML feature pipeline that loads HatiData query results directly into Polars DataFrames via Apache Arrow.
Prerequisites
$pip install polars pyarrow scikit-learn hatidata-agent
$hati init
Architecture
┌──────────────┐ ┌──────────────┐ Arrow IPC ┌──────────────┐
│ HatiData │───▶│ Engine │───────────▶│ Polars │
│ SQL Query │ │ Columnar │ zero-copy │ DataFrame │
└──────────────┘ └──────────────┘ └──────┬───────┘
│
┌──────▼───────┐
│ scikit-learn│
│ ML Model │
└──────────────┘Key Concepts
- ●Zero-copy Arrow IPC: HatiData returns query results in Apache Arrow format, which Polars loads directly without JSON parsing
- ●Type fidelity: native types (DOUBLE, TIMESTAMP, VARCHAR) map directly to Polars types (f64, datetime, str) with no lossy conversions
- ●Polars feature engineering: leverage Polars lazy evaluation and columnar operations for fast feature computation
- ●Round-trip pipeline: data flows from HatiData through ML processing and back to HatiData for dashboarding
- ●No serialization overhead: traditional REST APIs return JSON that must be parsed row-by-row, while Arrow transfers columnar binary data
Step-by-Step Implementation
Install Dependencies
Install Polars, PyArrow, scikit-learn, and the HatiData agent SDK.
pip install polars pyarrow scikit-learn hatidata-agent
hati initHatiData initialized successfully.
Proxy running on localhost:5439Note: Polars reads Arrow IPC natively — no JSON parsing or type conversion needed.
Load Training Data via Arrow
Query HatiData and load results directly into Polars DataFrames using Arrow IPC format.
import polars as pl
from hatidata_agent import HatiDataAgent
hati = HatiDataAgent(host="localhost", port=5439, agent_id="ml-pipeline")
# Create training data table
hati.execute("""
CREATE TABLE IF NOT EXISTS agent_metrics (
agent_id VARCHAR, timestamp TIMESTAMP,
response_time_ms DOUBLE, token_count INTEGER,
memory_queries INTEGER, error_count INTEGER,
success_rate DOUBLE
)
""")
hati.execute("""
INSERT INTO agent_metrics
SELECT
'agent-' || (i % 5) AS agent_id,
NOW() - INTERVAL (i * 3600) SECOND AS timestamp,
50.0 + RANDOM() * 200 AS response_time_ms,
100 + (RANDOM() * 900)::INTEGER AS token_count,
(RANDOM() * 10)::INTEGER AS memory_queries,
(RANDOM() * 3)::INTEGER AS error_count,
0.85 + RANDOM() * 0.15 AS success_rate
FROM generate_series(1, 1000) AS t(i)
""")
# Load via Arrow IPC format
arrow_data = hati.query_arrow("""
SELECT agent_id, response_time_ms, token_count,
memory_queries, error_count, success_rate
FROM agent_metrics
""")
df = pl.from_arrow(arrow_data)
print(f"Loaded {len(df)} rows via Arrow IPC")
print(f"Schema: {df.schema}")
print(df.head(5))Loaded 1000 rows via Arrow IPC
Schema: {'agent_id': Utf8, 'response_time_ms': Float64, 'token_count': Int32, 'memory_queries': Int32, 'error_count': Int32, 'success_rate': Float64}
shape: (5, 6)
┌──────────┬──────────────────┬─────────────┬────────────────┬─────────────┬──────────────┐
│ agent_id ┆ response_time_ms ┆ token_count ┆ memory_queries ┆ error_count ┆ success_rate │
╞══════════╪══════════════════╪═════════════╪════════════════╪═════════════╪══════════════╡
│ agent-1 ┆ 127.45 ┆ 456 ┆ 3 ┆ 0 ┆ 0.92 │
│ agent-2 ┆ 89.12 ┆ 234 ┆ 7 ┆ 1 ┆ 0.88 │
│ agent-3 ┆ 201.78 ┆ 891 ┆ 1 ┆ 0 ┆ 0.97 │
│ agent-4 ┆ 156.33 ┆ 567 ┆ 5 ┆ 2 ┆ 0.86 │
│ agent-0 ┆ 73.91 ┆ 312 ┆ 8 ┆ 0 ┆ 0.95 │
└──────────┴──────────────────┴─────────────┴────────────────┴─────────────┴──────────────┘Note: Arrow IPC is binary columnar — no JSON parsing overhead. Type fidelity is preserved from HatiData's engine to Polars.
Compute ML Features with Polars
Use Polars operations to engineer features for the ML model.
# Feature engineering with Polars
features = df.with_columns([
(pl.col("response_time_ms") / pl.col("token_count")).alias("ms_per_token"),
(pl.col("error_count") / (pl.col("memory_queries") + 1)).alias("error_rate"),
pl.col("response_time_ms").rolling_mean(window_size=10).alias("rolling_avg_ms"),
]).drop_nulls()
# Summary statistics per agent
stats = features.group_by("agent_id").agg([
pl.col("ms_per_token").mean().alias("avg_ms_per_token"),
pl.col("error_rate").mean().alias("avg_error_rate"),
pl.col("success_rate").mean().alias("avg_success_rate"),
pl.col("response_time_ms").std().alias("response_std"),
]).sort("avg_ms_per_token")
print("=== Agent Performance Features ===")
print(stats)=== Agent Performance Features ===
shape: (5, 5)
┌──────────┬──────────────────┬────────────────┬──────────────────┬──────────────┐
│ agent_id ┆ avg_ms_per_token ┆ avg_error_rate ┆ avg_success_rate ┆ response_std │
╞══════════╪══════════════════╪════════════════╪══════════════════╪══════════════╡
│ agent-0 ┆ 0.21 ┆ 0.12 ┆ 0.93 ┆ 45.2 │
│ agent-2 ┆ 0.24 ┆ 0.15 ┆ 0.91 ┆ 52.1 │
│ agent-1 ┆ 0.28 ┆ 0.18 ┆ 0.89 ┆ 61.3 │
│ agent-4 ┆ 0.31 ┆ 0.22 ┆ 0.87 ┆ 58.7 │
│ agent-3 ┆ 0.35 ┆ 0.25 ┆ 0.86 ┆ 67.9 │
└──────────┴──────────────────┴────────────────┴──────────────────┴──────────────┘Train a Model
Train a scikit-learn model on the Polars-computed features.
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import numpy as np
# Prepare training data
feature_cols = ["ms_per_token", "error_rate", "rolling_avg_ms", "token_count", "memory_queries"]
X = features.select(feature_cols).to_numpy()
y = (features["success_rate"].to_numpy() > 0.9).astype(int) # Binary: high vs low performance
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
accuracy = model.score(X_test, y_test)
print(f"Model accuracy: {accuracy:.2%}")
print(f"\nFeature importance:")
for name, imp in sorted(zip(feature_cols, model.feature_importances_), key=lambda x: -x[1]):
print(f" {name}: {imp:.3f}")Model accuracy: 87.50%
Feature importance:
ms_per_token: 0.312
error_rate: 0.274
rolling_avg_ms: 0.198
token_count: 0.121
memory_queries: 0.095Store Results Back to HatiData
Write model predictions and feature importance scores back to HatiData for the dashboard.
# Store feature importance as agent memory
hati.execute(f"""
SELECT store_memory(
'ML model trained: accuracy={accuracy:.2%}. Top features: ms_per_token (0.312), error_rate (0.274)',
'ml-pipeline-results'
)
""")
# Store predictions table
hati.execute("""
CREATE TABLE IF NOT EXISTS agent_predictions (
agent_id VARCHAR,
predicted_performance VARCHAR,
confidence DOUBLE,
model_version VARCHAR
)
""")
# Generate predictions for each agent
for agent in stats.iter_rows(named=True):
pred = "high" if agent["avg_success_rate"] > 0.9 else "needs-improvement"
hati.execute(f"""
INSERT INTO agent_predictions VALUES (
'{agent["agent_id"]}', '{pred}',
{agent["avg_success_rate"]:.3f}, 'rf-v1'
)
""")
results = hati.query("SELECT * FROM agent_predictions ORDER BY confidence DESC")
print("=== Predictions Stored ===")
for r in results:
print(f" {r['agent_id']}: {r['predicted_performance']} (confidence: {r['confidence']:.1%})")=== Predictions Stored ===
agent-0: high (confidence: 93.0%)
agent-2: high (confidence: 91.0%)
agent-1: needs-improvement (confidence: 89.0%)
agent-4: needs-improvement (confidence: 87.0%)
agent-3: needs-improvement (confidence: 86.0%)Note: The full pipeline: HatiData (source) -> Arrow IPC -> Polars (features) -> scikit-learn (model) -> HatiData (results). Zero JSON serialization.
Ready to build?
Install HatiData locally and start building with Arrow/Polars in minutes.
Join Waitlist