CrewAI + HatiData: Supply Chain War Room
Deploy a multi-agent crew that monitors suppliers, routes, and inventory with shared memory.
What You'll Build
A CrewAI crew with Analyst, Monitor, and Responder agents sharing a unified HatiData memory layer.
Prerequisites
$pip install hatidata-agent crewai crewai-hatidata
$hati init
Architecture
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Analyst │ │ Monitor │ │ Responder │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
└────────────────┼────────────────┘
┌──────▼───────┐
│ HatiData │
│ Shared Memory│
└──────────────┘Key Concepts
- ●Shared memory between agents: all three CrewAI agents read and write to the same HatiData namespace, enabling coordination without direct message passing
- ●Semantic triggers match current disruptions against historical patterns using JOIN_VECTOR, catching novel supply chain risks that rule-based alerts miss
- ●Multi-agent coordination through a shared data layer means agents can be added, removed, or restarted without losing institutional knowledge
- ●Crew task handoff is data-driven: each agent writes structured findings to memory, and downstream agents query that memory to build on previous work
Step-by-Step Implementation
Set up the environment
Install CrewAI, the HatiData CrewAI integration, and initialize the local database.
pip install hatidata-agent crewai crewai-hatidata
hati initNote: hati init creates a .hati/config.toml config and initializes a local DuckDB database. crewai-hatidata provides pre-built CrewAI tools for queries and semantic search.
Define the supply chain data schema
Create tables for suppliers, inventory, and shipping routes, then load sample data for the agents to work with.
from hatidata_agent import HatiDataAgent
client = HatiDataAgent(host="localhost", port=5439, agent_id="supply-chain-analyst", framework="crewai")
# Create supplier table
client.execute("""
CREATE TABLE IF NOT EXISTS suppliers (
supplier_id VARCHAR PRIMARY KEY,
name VARCHAR NOT NULL,
region VARCHAR,
reliability_score DECIMAL(3, 2),
lead_time_days INT,
status VARCHAR DEFAULT 'active'
)
""")
# Create inventory table
client.execute("""
CREATE TABLE IF NOT EXISTS inventory (
sku VARCHAR PRIMARY KEY,
product_name VARCHAR,
supplier_id VARCHAR,
quantity INT,
reorder_point INT,
warehouse VARCHAR
)
""")
# Create routes table
client.execute("""
CREATE TABLE IF NOT EXISTS shipping_routes (
route_id VARCHAR PRIMARY KEY,
origin VARCHAR,
destination VARCHAR,
carrier VARCHAR,
transit_days INT,
status VARCHAR DEFAULT 'on_time'
)
""")
# Load sample data
client.execute("""
INSERT INTO suppliers VALUES
('SUP_001', 'TechParts Asia', 'APAC', 0.92, 14, 'active'),
('SUP_002', 'EuroComponents GmbH', 'EMEA', 0.88, 21, 'active'),
('SUP_003', 'GlobalChip Ltd', 'APAC', 0.95, 7, 'delayed'),
('SUP_004', 'AmeriParts Inc', 'NA', 0.91, 5, 'active')
""")
client.execute("""
INSERT INTO inventory VALUES
('SKU_100', 'Processor Unit A', 'SUP_001', 150, 200, 'SG-warehouse'),
('SKU_101', 'Memory Module B', 'SUP_003', 45, 100, 'SG-warehouse'),
('SKU_102', 'Power Supply C', 'SUP_002', 500, 300, 'EU-warehouse'),
('SKU_103', 'Controller Board D', 'SUP_004', 80, 50, 'US-warehouse')
""")
client.execute("""
INSERT INTO shipping_routes VALUES
('RT_001', 'Shenzhen', 'Singapore', 'MaerskLine', 12, 'delayed'),
('RT_002', 'Hamburg', 'Singapore', 'HapagLloyd', 18, 'on_time'),
('RT_003', 'Taipei', 'Singapore', 'Evergreen', 8, 'on_time'),
('RT_004', 'Los Angeles', 'Singapore', 'MSC', 21, 'on_time')
""")
print("Supply chain schema created with sample data.")Supply chain schema created with sample data.Create the Analyst agent
Build a CrewAI Agent that queries inventory levels, analyzes supplier data, and stores findings in shared HatiData memory.
from crewai import Agent
from crewai_hatidata import (
HatiDataQueryTool,
HatiDataDescribeTableTool,
HatiDataContextSearchTool,
)
# HatiData tools for the analyst
query_tool = HatiDataQueryTool(host="localhost", port=5439, agent_id="supply-analyst")
describe_tool = HatiDataDescribeTableTool(host="localhost", port=5439, agent_id="supply-analyst")
context_tool = HatiDataContextSearchTool(host="localhost", port=5439, agent_id="supply-analyst")
analyst_agent = Agent(
role="Supply Chain Analyst",
goal="Identify inventory risks and supplier bottlenecks by querying "
"live supply chain data. Store all findings in shared memory.",
backstory="You are a senior supply chain analyst with 15 years of "
"experience in semiconductor logistics. You query databases "
"to find patterns and flag risks before they become crises.",
tools=[query_tool, describe_tool, context_tool],
verbose=True,
)
# Example: the analyst checks for low inventory
# (In a real Crew run, CrewAI orchestrates this automatically)
low_stock = query_tool._run("""
SELECT i.sku, i.product_name, i.quantity, i.reorder_point,
s.name AS supplier, s.status AS supplier_status
FROM inventory i
JOIN suppliers s ON i.supplier_id = s.supplier_id
WHERE i.quantity < i.reorder_point
ORDER BY (i.reorder_point - i.quantity) DESC
""")
# Store finding in shared memory via SQL INSERT
query_tool._run(
"INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
"'CRITICAL: 2 SKUs below reorder point. "
"SKU_101 (Memory Module B) at 45 units vs 100 reorder point. "
"Supplier GlobalChip Ltd status: DELAYED.', "
"'inventory-alert,critical,SKU_101', 'supply_chain_warroom')"
)
print("Analyst agent created and initial scan complete.")Analyst agent created and initial scan complete.Note: The Analyst stores findings in the 'supply_chain_warroom' namespace so other agents can read them.
Create the Monitor agent
Build an Agent that uses semantic triggers to watch for supply chain anomalies and route disruptions.
from crewai import Agent
from hatidata_agent import HatiDataAgent
from crewai_hatidata import HatiDataQueryTool, HatiDataContextSearchTool
query_tool = HatiDataQueryTool(host="localhost", port=5439, agent_id="supply-monitor")
context_tool = HatiDataContextSearchTool(host="localhost", port=5439, agent_id="supply-monitor")
client = HatiDataAgent(host="localhost", port=5439, agent_id="supply-monitor", framework="crewai")
monitor_agent = Agent(
role="Disruption Monitor",
goal="Continuously watch for supply chain disruptions using semantic "
"triggers. Alert when shipping delays, supplier issues, or port "
"congestion patterns match historical disruption signatures.",
backstory="You are an AI operations monitor that never sleeps. You "
"watch global logistics feeds and compare current events "
"against historical disruption patterns stored in memory.",
tools=[query_tool, context_tool],
verbose=True,
)
# Store historical disruption patterns for semantic matching
client.execute(
"INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
"'Port congestion in Shenzhen caused 3-week delays for APAC "
"semiconductor shipments in Q3 2024. Affected suppliers: "
"TechParts Asia, GlobalChip Ltd.', "
"'disruption-pattern,port-congestion,APAC', 'supply_chain_warroom')"
)
client.execute(
"INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
"'Carrier delays from MaerskLine on Shenzhen-Singapore route "
"historically correlate with inventory shortfalls within 2 weeks.', "
"'disruption-pattern,carrier-delay,correlation', 'supply_chain_warroom')"
)
# Check current routes against disruption patterns
disruptions = client.query("""
SELECT r.route_id, r.origin, r.carrier, r.status,
semantic_rank(m.embedding, 'shipping delay port congestion') AS risk
FROM shipping_routes r
JOIN_VECTOR agent_memories m
ON semantic_match(m.embedding, 'shipping delay port congestion', 0.7)
WHERE r.status = 'delayed'
ORDER BY risk DESC
""")
for d in disruptions:
print(f" ALERT: Route {d['route_id']} ({d['origin']} via {d['carrier']}) "
f"- risk score: {d['risk']:.3f}")
# Store alert in shared memory for the Responder
client.execute(
"INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
f"'DISRUPTION DETECTED: Route {d["route_id"]} from {d["origin"]} "
f"via {d["carrier"]} is delayed. Risk score {d["risk"]:.3f}. "
f"Matches historical port congestion pattern.', "
f"'active-disruption,{d["route_id"]},high-priority', 'supply_chain_warroom')"
)
print("\nMonitor agent created. Disruption alerts stored in shared memory.") ALERT: Route RT_001 (Shenzhen via MaerskLine) - risk score: 0.912
Monitor agent created. Disruption alerts stored in shared memory.Create the Responder agent
Build an Agent that reads shared memory from the Analyst and Monitor, then generates coordinated action plans.
from crewai import Agent
from hatidata_agent import HatiDataAgent
from crewai_hatidata import HatiDataQueryTool, HatiDataContextSearchTool
query_tool = HatiDataQueryTool(host="localhost", port=5439, agent_id="supply-responder")
context_tool = HatiDataContextSearchTool(host="localhost", port=5439, agent_id="supply-responder")
client = HatiDataAgent(host="localhost", port=5439, agent_id="supply-responder", framework="crewai")
responder_agent = Agent(
role="Response Coordinator",
goal="Read alerts from the Analyst and Monitor agents via shared memory. "
"Generate actionable response plans that combine inventory data, "
"supplier alternatives, and route options.",
backstory="You are a crisis response coordinator who synthesizes inputs "
"from multiple intelligence sources to produce clear action "
"plans. You always check shared memory for the latest alerts.",
tools=[query_tool, context_tool],
verbose=True,
)
# Read all active alerts from shared memory via SQL
alerts = client.query("""
SELECT content, semantic_rank(embedding,
'active disruption critical inventory alert') AS score
FROM _hatidata_memory.memories
WHERE namespace = 'supply_chain_warroom'
ORDER BY score DESC LIMIT 10
""")
print(f"Found {len(alerts)} active alerts in shared memory:\n")
for alert in alerts:
print(f" [{alert['score']:.2f}] {alert['content'][:100]}...")
# Generate response plan using SQL data + memory context
alternative_suppliers = client.query("""
SELECT s.supplier_id, s.name, s.region, s.reliability_score, s.lead_time_days
FROM suppliers s
WHERE s.status = 'active'
AND s.region = 'APAC'
ORDER BY s.reliability_score DESC
""")
# Store the response plan in shared memory
response_plan = (
"ACTION PLAN: SKU_101 shortage mitigation. "
"1) Redirect orders to TechParts Asia (reliability: 0.92, lead: 14d). "
"2) Reroute via Taipei-Singapore (Evergreen, RT_003, 8 days). "
"3) Increase reorder point for SKU_101 from 100 to 150 units. "
"4) Notify procurement team for emergency PO."
)
client.execute(
"INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
f"'{response_plan}', 'action-plan,SKU_101,approved', 'supply_chain_warroom')"
)
print(f"\nResponse plan generated and stored in shared memory.")
print(f"Plan: {response_plan}")Found 3 active alerts in shared memory:
[0.95] CRITICAL: 1 SKUs below reorder point. SKU_101 (Memory Module B) at 45 un...
[0.91] DISRUPTION DETECTED: Route RT_001 from Shenzhen via MaerskLine is delayed...
[0.78] Carrier delays from MaerskLine on Shenzhen-Singapore route historically c...
Response plan generated and stored in shared memory.
Plan: ACTION PLAN: SKU_101 shortage mitigation. 1) Redirect orders to TechParts Asia (reliability: 0.92, lead: 14d). 2) Reroute via Taipei-Singapore (Evergreen, RT_003, 8 days). 3) Increase reorder point for SKU_101 from 100 to 150 units. 4) Notify procurement team for emergency PO.Orchestrate the crew
Wire all three agents into a CrewAI Crew with task assignments and run the full supply chain war room.
from crewai import Crew, Task
from crewai_hatidata import HatiDataQueryTool, HatiDataContextSearchTool
query_tool = HatiDataQueryTool(host="localhost", port=5439, agent_id="supply-crew")
context_tool = HatiDataContextSearchTool(host="localhost", port=5439, agent_id="supply-crew")
# Re-create agents with tools
from crewai import Agent
analyst = Agent(
role="Supply Chain Analyst",
goal="Query inventory and supplier data. Flag risks in shared memory.",
backstory="Senior supply chain analyst with deep SQL expertise.",
tools=[query_tool, context_tool],
)
monitor = Agent(
role="Disruption Monitor",
goal="Detect disruptions by matching current events against memory.",
backstory="AI operations monitor watching global logistics 24/7.",
tools=[query_tool, context_tool],
)
responder = Agent(
role="Response Coordinator",
goal="Read all alerts from shared memory and produce action plans.",
backstory="Crisis coordinator who synthesizes multi-source intelligence.",
tools=[query_tool, context_tool],
)
# Define tasks
analyze_task = Task(
description="Query inventory table for SKUs below reorder point. "
"Check supplier status. Store all findings in shared memory "
"with namespace 'supply_chain_warroom'.",
expected_output="List of at-risk SKUs with supplier status.",
agent=analyst,
)
monitor_task = Task(
description="Check shipping_routes for delays. Use JOIN_VECTOR to match "
"delayed routes against historical disruption patterns in memory. "
"Store alerts in shared memory.",
expected_output="List of disruption alerts with risk scores.",
agent=monitor,
)
respond_task = Task(
description="Read all alerts from shared memory. Query alternative suppliers "
"and routes. Generate a prioritized action plan. Store the plan "
"in shared memory.",
expected_output="Actionable response plan with specific recommendations.",
agent=responder,
)
# Run the crew
crew = Crew(
agents=[analyst, monitor, responder],
tasks=[analyze_task, monitor_task, respond_task],
verbose=True,
)
result = crew.kickoff()
print("\n" + "=" * 60)
print("SUPPLY CHAIN WAR ROOM - COMPLETE")
print("=" * 60)
print(result)[Analyst] Querying inventory levels...
[Analyst] Found 2 SKUs below reorder point
[Analyst] Stored findings in shared memory
[Monitor] Checking shipping routes for disruptions...
[Monitor] Route RT_001 matches historical port congestion pattern (risk: 0.912)
[Monitor] Stored disruption alert in shared memory
[Responder] Reading 3 alerts from shared memory...
[Responder] Generating response plan...
[Responder] Plan stored in shared memory
============================================================
SUPPLY CHAIN WAR ROOM - COMPLETE
============================================================
ACTION PLAN: SKU_101 shortage mitigation.
1) Redirect orders to TechParts Asia (reliability: 0.92, lead: 14d)
2) Reroute via Taipei-Singapore (Evergreen, RT_003, 8 days)
3) Increase reorder point for SKU_101 from 100 to 150 units
4) Notify procurement team for emergency PONote: All three agents share a single HatiData memory namespace. The Analyst writes findings, the Monitor writes alerts, and the Responder reads everything to produce a coordinated plan.