Arrow-Native Queries + Polars: Zero-Copy Analytics
Query HatiData and load results directly into polars or pandas DataFrames via Apache Arrow IPC — zero parsing, full type fidelity.
What You'll Build
A data analysis pipeline that queries HatiData's Arrow endpoint and processes results in polars and pandas with zero serialization overhead.
Prerequisites
$pip install pyarrow polars requests
$hati init
$HatiData running on port 5440
Architecture
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Python │───▶│ HatiData │───▶│ DuckDB │
│ (polars) │ │ /v1/query/ │ │ (Arrow) │
└──────────────┘ │ arrow │ └──────────────┘
└──────────────┘
Zero-Copy: Arrow IPC → polars DataFrameKey Concepts
- ●Zero-copy Arrow IPC: HatiData returns query results in Apache Arrow IPC format, which polars and pandas can load directly without JSON parsing or type conversion
- ●polars and pandas interop: the same Arrow IPC response works with both libraries, and you can convert freely between them via PyArrow as the common interchange format
- ●No JSON serialization overhead: traditional REST APIs return JSON that must be parsed and type-coerced row by row, while Arrow IPC transfers columnar binary data already in the correct memory layout
- ●Type fidelity from DuckDB to DataFrame: DuckDB native types (DOUBLE, TIMESTAMP, VARCHAR) map directly to polars and pandas types (f64, datetime64, str) with zero lossy conversions
Step-by-Step Implementation
Install Dependencies
Install PyArrow, polars, and requests for querying HatiData's Arrow endpoint.
pip install pyarrow polars pandas requestsNote: polars and pandas are both supported. PyArrow is the bridge that enables zero-copy data transfer from HatiData's DuckDB engine.
Start HatiData and Create Sample Data
Initialize HatiData, start the server, and create a table with analytics data for testing the Arrow endpoint.
from hatidata_agent import HatiDataAgent
client = HatiDataAgent(host="localhost", port=5439, agent_id="analytics-agent", framework="polars")
# Create a table with analytics data
client.execute("""
CREATE TABLE IF NOT EXISTS agent_metrics (
agent_id VARCHAR NOT NULL,
metric_name VARCHAR NOT NULL,
value DOUBLE NOT NULL,
region VARCHAR,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert sample metrics
client.execute("""
INSERT INTO agent_metrics (agent_id, metric_name, value, region) VALUES
('agent_001', 'response_time_ms', 142.5, 'us-east'),
('agent_001', 'memory_searches', 847.0, 'us-east'),
('agent_001', 'tokens_used', 15420.0, 'us-east'),
('agent_002', 'response_time_ms', 98.3, 'eu-west'),
('agent_002', 'memory_searches', 1203.0, 'eu-west'),
('agent_002', 'tokens_used', 22150.0, 'eu-west'),
('agent_003', 'response_time_ms', 215.7, 'ap-south'),
('agent_003', 'memory_searches', 456.0, 'ap-south'),
('agent_003', 'tokens_used', 8930.0, 'ap-south'),
('agent_001', 'response_time_ms', 135.2, 'us-east'),
('agent_002', 'response_time_ms', 102.1, 'eu-west'),
('agent_003', 'response_time_ms', 198.4, 'ap-south')
""")
row_count = client.query("SELECT COUNT(*) AS n FROM agent_metrics")
print(f"Created agent_metrics table with {row_count[0]['n']} rows.")Created agent_metrics table with 12 rows.Query via Arrow Endpoint
POST a SQL query to HatiData's /v1/query/arrow endpoint and load the Arrow IPC response directly into a polars DataFrame with zero serialization.
import pyarrow as pa
import pyarrow.ipc as ipc
import polars as pl
import requests
import io
import time
HATIDATA_URL = "http://localhost:5440/v1/query/arrow"
def query_arrow(sql: str) -> pl.DataFrame:
"""Query HatiData and return a polars DataFrame via Arrow IPC."""
start = time.perf_counter()
response = requests.post(
HATIDATA_URL,
json={"sql": sql},
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
# Read Arrow IPC stream directly into polars — zero parsing
reader = ipc.open_stream(io.BytesIO(response.content))
arrow_table = reader.read_all()
df = pl.from_arrow(arrow_table)
elapsed = (time.perf_counter() - start) * 1000
print(f"Query returned {len(df)} rows in {elapsed:.1f}ms (Arrow IPC)")
return df
# Run a query via the Arrow endpoint
df = query_arrow("""
SELECT agent_id, metric_name, value, region, recorded_at
FROM agent_metrics
ORDER BY recorded_at DESC
""")
print(df)Query returned 12 rows in 8.3ms (Arrow IPC)
shape: (12, 5)
+-----------+------------------+---------+----------+---------------------+
| agent_id | metric_name | value | region | recorded_at |
| --- | --- | --- | --- | --- |
| str | str | f64 | str | datetime[us] |
+-----------+------------------+---------+----------+---------------------+
| agent_001 | response_time_ms | 142.5 | us-east | 2025-01-15 14:30:00 |
| agent_001 | memory_searches | 847.0 | us-east | 2025-01-15 14:30:00 |
| agent_001 | tokens_used | 15420.0 | us-east | 2025-01-15 14:30:00 |
| agent_002 | response_time_ms | 98.3 | eu-west | 2025-01-15 14:30:00 |
| ... | ... | ... | ... | ... |
+-----------+------------------+---------+----------+---------------------+Note: The Arrow IPC format preserves DuckDB's native types (f64, str, datetime) exactly. No JSON parsing, no type coercion, no string-to-number conversion.
Analyze with polars
Run polars operations on the Arrow-loaded DataFrame: group-by aggregations, filters, and performance analysis.
# Group by agent and compute summary statistics
summary = df.filter(
pl.col("metric_name") == "response_time_ms"
).group_by("agent_id", "region").agg([
pl.col("value").mean().alias("avg_response_ms"),
pl.col("value").min().alias("min_response_ms"),
pl.col("value").max().alias("max_response_ms"),
pl.col("value").count().alias("sample_count"),
]).sort("avg_response_ms")
print("=== Response Time by Agent ===")
print(summary)
# Pivot: metrics as columns per agent
pivot = df.pivot(
on="metric_name",
index=["agent_id", "region"],
values="value",
aggregate_function="mean",
)
print("\n=== Agent Metrics Pivot ===")
print(pivot)
# Filter: find agents with high response times
slow_agents = df.filter(
(pl.col("metric_name") == "response_time_ms") &
(pl.col("value") > 150.0)
)
print(f"\nAgents with response time > 150ms: {len(slow_agents)}")
print(slow_agents.select("agent_id", "value", "region"))=== Response Time by Agent ===
shape: (3, 6)
+-----------+----------+-----------------+-----------------+-----------------+--------------+
| agent_id | region | avg_response_ms | min_response_ms | max_response_ms | sample_count |
| --- | --- | --- | --- | --- | --- |
| str | str | f64 | f64 | f64 | u32 |
+-----------+----------+-----------------+-----------------+-----------------+--------------+
| agent_002 | eu-west | 100.2 | 98.3 | 102.1 | 2 |
| agent_001 | us-east | 138.85 | 135.2 | 142.5 | 2 |
| agent_003 | ap-south | 207.05 | 198.4 | 215.7 | 2 |
+-----------+----------+-----------------+-----------------+-----------------+--------------+
=== Agent Metrics Pivot ===
shape: (3, 5)
+-----------+----------+------------------+-----------------+-------------+
| agent_id | region | response_time_ms | memory_searches | tokens_used |
+-----------+----------+------------------+-----------------+-------------+
| agent_001 | us-east | 138.85 | 847.0 | 15420.0 |
| agent_002 | eu-west | 100.2 | 1203.0 | 22150.0 |
| agent_003 | ap-south | 207.05 | 456.0 | 8930.0 |
+-----------+----------+------------------+-----------------+-------------+
Agents with response time > 150ms: 2Note: polars operations run at native speed because the data arrived via Arrow IPC with correct types. No time wasted parsing JSON strings into numbers.
Compare with pandas
Load the same Arrow IPC data into pandas to show seamless interoperability. Both polars and pandas work with the same Arrow response.
import pandas as pd
import pyarrow as pa
import pyarrow.ipc as ipc
import polars as pl
import requests
import io
# Same query, same Arrow endpoint
response = requests.post(
"http://localhost:5440/v1/query/arrow",
json={"sql": "SELECT * FROM agent_metrics ORDER BY agent_id"},
headers={"Content-Type": "application/json"},
)
# Load Arrow IPC into pandas via PyArrow
reader = ipc.open_stream(io.BytesIO(response.content))
arrow_table = reader.read_all()
pdf = arrow_table.to_pandas()
print(f"pandas DataFrame: {pdf.shape[0]} rows, {pdf.shape[1]} columns")
print(f"dtypes:\n{pdf.dtypes}\n")
# pandas operations work as expected
print("=== pandas GroupBy ===")
print(pdf[pdf["metric_name"] == "response_time_ms"]
.groupby("region")["value"]
.agg(["mean", "count"])
.round(1))
# Convert between polars and pandas freely
df_polars = pl.from_pandas(pdf)
print(f"\nConverted to polars: {df_polars.shape}")
pdf_back = df.to_pandas()
print(f"Converted to pandas: {pdf_back.shape}")
print("\nArrow IPC enables seamless interop between polars and pandas.")pandas DataFrame: 12 rows, 5 columns
dtypes:
agent_id object
metric_name object
value float64
region object
recorded_at datetime64[us]
=== pandas GroupBy ===
mean count
region
ap-south 207.1 2
eu-west 100.2 2
us-east 138.9 2
Converted to polars: (12, 5)
Converted to pandas: (12, 5)
Arrow IPC enables seamless interop between polars and pandas.Note: Both polars and pandas consume the same Arrow IPC bytes. Types are preserved end-to-end: DuckDB DOUBLE becomes f64 in polars and float64 in pandas. No manual type casting required.
Ready to build?
Install HatiData locally and start building with Polars / pandas in minutes.
Join Waitlist