← All Cookbooks
Arrow/PolarsAdvanced30 min

Arrow/Polars + HatiData: ML Feature Pipeline

Build a zero-copy ML feature pipeline that loads HatiData query results directly into Polars DataFrames via Apache Arrow IPC format.

What You'll Build

A zero-copy ML feature pipeline that loads HatiData query results directly into Polars DataFrames via Apache Arrow.

Prerequisites

$pip install polars pyarrow scikit-learn hatidata-agent

$hati init

Architecture

┌──────────────┐    ┌──────────────┐  Arrow IPC  ┌──────────────┐
│  HatiData    │───▶│  Engine      │───────────▶│   Polars     │
│  SQL Query   │    │  Columnar    │  zero-copy  │  DataFrame   │
└──────────────┘    └──────────────┘            └──────┬───────┘
                                                       │
                                                ┌──────▼───────┐
                                                │  scikit-learn│
                                                │   ML Model   │
                                                └──────────────┘

Key Concepts

  • Zero-copy Arrow IPC: HatiData returns query results in Apache Arrow format, which Polars loads directly without JSON parsing
  • Type fidelity: native types (DOUBLE, TIMESTAMP, VARCHAR) map directly to Polars types (f64, datetime, str) with no lossy conversions
  • Polars feature engineering: leverage Polars lazy evaluation and columnar operations for fast feature computation
  • Round-trip pipeline: data flows from HatiData through ML processing and back to HatiData for dashboarding
  • No serialization overhead: traditional REST APIs return JSON that must be parsed row-by-row, while Arrow transfers columnar binary data

Step-by-Step Implementation

1

Install Dependencies

Install Polars, PyArrow, scikit-learn, and the HatiData agent SDK.

Bash
pip install polars pyarrow scikit-learn hatidata-agent
hati init
Expected Output
HatiData initialized successfully.
Proxy running on localhost:5439

Note: Polars reads Arrow IPC natively — no JSON parsing or type conversion needed.

2

Load Training Data via Arrow

Query HatiData and load results directly into Polars DataFrames using Arrow IPC format.

Python
import polars as pl
from hatidata_agent import HatiDataAgent

hati = HatiDataAgent(host="localhost", port=5439, agent_id="ml-pipeline")

# Create training data table
hati.execute("""
    CREATE TABLE IF NOT EXISTS agent_metrics (
        agent_id VARCHAR, timestamp TIMESTAMP,
        response_time_ms DOUBLE, token_count INTEGER,
        memory_queries INTEGER, error_count INTEGER,
        success_rate DOUBLE
    )
""")

hati.execute("""
    INSERT INTO agent_metrics
    SELECT
        'agent-' || (i % 5) AS agent_id,
        NOW() - INTERVAL (i * 3600) SECOND AS timestamp,
        50.0 + RANDOM() * 200 AS response_time_ms,
        100 + (RANDOM() * 900)::INTEGER AS token_count,
        (RANDOM() * 10)::INTEGER AS memory_queries,
        (RANDOM() * 3)::INTEGER AS error_count,
        0.85 + RANDOM() * 0.15 AS success_rate
    FROM generate_series(1, 1000) AS t(i)
""")

# Load via Arrow IPC format
arrow_data = hati.query_arrow("""
    SELECT agent_id, response_time_ms, token_count,
           memory_queries, error_count, success_rate
    FROM agent_metrics
""")

df = pl.from_arrow(arrow_data)
print(f"Loaded {len(df)} rows via Arrow IPC")
print(f"Schema: {df.schema}")
print(df.head(5))
Expected Output
Loaded 1000 rows via Arrow IPC
Schema: {'agent_id': Utf8, 'response_time_ms': Float64, 'token_count': Int32, 'memory_queries': Int32, 'error_count': Int32, 'success_rate': Float64}

shape: (5, 6)
┌──────────┬──────────────────┬─────────────┬────────────────┬─────────────┬──────────────┐
│ agent_id ┆ response_time_ms ┆ token_count ┆ memory_queries ┆ error_count ┆ success_rate │
╞══════════╪══════════════════╪═════════════╪════════════════╪═════════════╪══════════════╡
│ agent-1  ┆ 127.45           ┆ 456         ┆ 3              ┆ 0           ┆ 0.92         │
│ agent-2  ┆ 89.12            ┆ 234         ┆ 7              ┆ 1           ┆ 0.88         │
│ agent-3  ┆ 201.78           ┆ 891         ┆ 1              ┆ 0           ┆ 0.97         │
│ agent-4  ┆ 156.33           ┆ 567         ┆ 5              ┆ 2           ┆ 0.86         │
│ agent-0  ┆ 73.91            ┆ 312         ┆ 8              ┆ 0           ┆ 0.95         │
└──────────┴──────────────────┴─────────────┴────────────────┴─────────────┴──────────────┘

Note: Arrow IPC is binary columnar — no JSON parsing overhead. Type fidelity is preserved from HatiData's engine to Polars.

3

Compute ML Features with Polars

Use Polars operations to engineer features for the ML model.

Python
# Feature engineering with Polars
features = df.with_columns([
    (pl.col("response_time_ms") / pl.col("token_count")).alias("ms_per_token"),
    (pl.col("error_count") / (pl.col("memory_queries") + 1)).alias("error_rate"),
    pl.col("response_time_ms").rolling_mean(window_size=10).alias("rolling_avg_ms"),
]).drop_nulls()

# Summary statistics per agent
stats = features.group_by("agent_id").agg([
    pl.col("ms_per_token").mean().alias("avg_ms_per_token"),
    pl.col("error_rate").mean().alias("avg_error_rate"),
    pl.col("success_rate").mean().alias("avg_success_rate"),
    pl.col("response_time_ms").std().alias("response_std"),
]).sort("avg_ms_per_token")

print("=== Agent Performance Features ===")
print(stats)
Expected Output
=== Agent Performance Features ===
shape: (5, 5)
┌──────────┬──────────────────┬────────────────┬──────────────────┬──────────────┐
│ agent_id ┆ avg_ms_per_token ┆ avg_error_rate ┆ avg_success_rate ┆ response_std │
╞══════════╪══════════════════╪════════════════╪══════════════════╪══════════════╡
│ agent-0  ┆ 0.21             ┆ 0.12           ┆ 0.93             ┆ 45.2         │
│ agent-2  ┆ 0.24             ┆ 0.15           ┆ 0.91             ┆ 52.1         │
│ agent-1  ┆ 0.28             ┆ 0.18           ┆ 0.89             ┆ 61.3         │
│ agent-4  ┆ 0.31             ┆ 0.22           ┆ 0.87             ┆ 58.7         │
│ agent-3  ┆ 0.35             ┆ 0.25           ┆ 0.86             ┆ 67.9         │
└──────────┴──────────────────┴────────────────┴──────────────────┴──────────────┘
4

Train a Model

Train a scikit-learn model on the Polars-computed features.

Python
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import numpy as np

# Prepare training data
feature_cols = ["ms_per_token", "error_rate", "rolling_avg_ms", "token_count", "memory_queries"]
X = features.select(feature_cols).to_numpy()
y = (features["success_rate"].to_numpy() > 0.9).astype(int)  # Binary: high vs low performance

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

accuracy = model.score(X_test, y_test)
print(f"Model accuracy: {accuracy:.2%}")
print(f"\nFeature importance:")
for name, imp in sorted(zip(feature_cols, model.feature_importances_), key=lambda x: -x[1]):
    print(f"  {name}: {imp:.3f}")
Expected Output
Model accuracy: 87.50%

Feature importance:
  ms_per_token: 0.312
  error_rate: 0.274
  rolling_avg_ms: 0.198
  token_count: 0.121
  memory_queries: 0.095
5

Store Results Back to HatiData

Write model predictions and feature importance scores back to HatiData for the dashboard.

Python
# Store feature importance as agent memory
hati.execute(f"""
    SELECT store_memory(
        'ML model trained: accuracy={accuracy:.2%}. Top features: ms_per_token (0.312), error_rate (0.274)',
        'ml-pipeline-results'
    )
""")

# Store predictions table
hati.execute("""
    CREATE TABLE IF NOT EXISTS agent_predictions (
        agent_id VARCHAR,
        predicted_performance VARCHAR,
        confidence DOUBLE,
        model_version VARCHAR
    )
""")

# Generate predictions for each agent
for agent in stats.iter_rows(named=True):
    pred = "high" if agent["avg_success_rate"] > 0.9 else "needs-improvement"
    hati.execute(f"""
        INSERT INTO agent_predictions VALUES (
            '{agent["agent_id"]}', '{pred}',
            {agent["avg_success_rate"]:.3f}, 'rf-v1'
        )
    """)

results = hati.query("SELECT * FROM agent_predictions ORDER BY confidence DESC")
print("=== Predictions Stored ===")
for r in results:
    print(f"  {r['agent_id']}: {r['predicted_performance']} (confidence: {r['confidence']:.1%})")
Expected Output
=== Predictions Stored ===
  agent-0: high (confidence: 93.0%)
  agent-2: high (confidence: 91.0%)
  agent-1: needs-improvement (confidence: 89.0%)
  agent-4: needs-improvement (confidence: 87.0%)
  agent-3: needs-improvement (confidence: 86.0%)

Note: The full pipeline: HatiData (source) -> Arrow IPC -> Polars (features) -> scikit-learn (model) -> HatiData (results). Zero JSON serialization.

Ready to build?

Install HatiData locally and start building with Arrow/Polars in minutes.

Join Waitlist