← All Cookbooks
CrewAIIntermediate45 min

CrewAI + HatiData: Supply Chain War Room

Deploy a multi-agent crew that monitors suppliers, routes, and inventory with shared memory.

What You'll Build

A CrewAI crew with Analyst, Monitor, and Responder agents sharing a unified HatiData memory layer.

Prerequisites

$pip install hatidata-agent crewai crewai-hatidata

$hati init

Architecture

┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│  Analyst    │  │  Monitor    │  │  Responder  │
└──────┬──────┘  └──────┬──────┘  └──────┬──────┘
       └────────────────┼────────────────┘
                 ┌──────▼───────┐
                 │  HatiData    │
                 │ Shared Memory│
                 └──────────────┘

Key Concepts

  • Shared memory between agents: all three CrewAI agents read and write to the same HatiData namespace, enabling coordination without direct message passing
  • Semantic triggers match current disruptions against historical patterns using JOIN_VECTOR, catching novel supply chain risks that rule-based alerts miss
  • Multi-agent coordination through a shared data layer means agents can be added, removed, or restarted without losing institutional knowledge
  • Crew task handoff is data-driven: each agent writes structured findings to memory, and downstream agents query that memory to build on previous work

Step-by-Step Implementation

1

Set up the environment

Install CrewAI, the HatiData CrewAI integration, and initialize the local database.

Bash
pip install hatidata-agent crewai crewai-hatidata
hati init

Note: hati init creates a .hati/config.toml config and initializes a local DuckDB database. crewai-hatidata provides pre-built CrewAI tools for queries and semantic search.

2

Define the supply chain data schema

Create tables for suppliers, inventory, and shipping routes, then load sample data for the agents to work with.

Python
from hatidata_agent import HatiDataAgent

client = HatiDataAgent(host="localhost", port=5439, agent_id="supply-chain-analyst", framework="crewai")

# Create supplier table
client.execute("""
    CREATE TABLE IF NOT EXISTS suppliers (
        supplier_id VARCHAR PRIMARY KEY,
        name VARCHAR NOT NULL,
        region VARCHAR,
        reliability_score DECIMAL(3, 2),
        lead_time_days INT,
        status VARCHAR DEFAULT 'active'
    )
""")

# Create inventory table
client.execute("""
    CREATE TABLE IF NOT EXISTS inventory (
        sku VARCHAR PRIMARY KEY,
        product_name VARCHAR,
        supplier_id VARCHAR,
        quantity INT,
        reorder_point INT,
        warehouse VARCHAR
    )
""")

# Create routes table
client.execute("""
    CREATE TABLE IF NOT EXISTS shipping_routes (
        route_id VARCHAR PRIMARY KEY,
        origin VARCHAR,
        destination VARCHAR,
        carrier VARCHAR,
        transit_days INT,
        status VARCHAR DEFAULT 'on_time'
    )
""")

# Load sample data
client.execute("""
    INSERT INTO suppliers VALUES
        ('SUP_001', 'TechParts Asia', 'APAC', 0.92, 14, 'active'),
        ('SUP_002', 'EuroComponents GmbH', 'EMEA', 0.88, 21, 'active'),
        ('SUP_003', 'GlobalChip Ltd', 'APAC', 0.95, 7, 'delayed'),
        ('SUP_004', 'AmeriParts Inc', 'NA', 0.91, 5, 'active')
""")

client.execute("""
    INSERT INTO inventory VALUES
        ('SKU_100', 'Processor Unit A', 'SUP_001', 150, 200, 'SG-warehouse'),
        ('SKU_101', 'Memory Module B', 'SUP_003', 45, 100, 'SG-warehouse'),
        ('SKU_102', 'Power Supply C', 'SUP_002', 500, 300, 'EU-warehouse'),
        ('SKU_103', 'Controller Board D', 'SUP_004', 80, 50, 'US-warehouse')
""")

client.execute("""
    INSERT INTO shipping_routes VALUES
        ('RT_001', 'Shenzhen', 'Singapore', 'MaerskLine', 12, 'delayed'),
        ('RT_002', 'Hamburg', 'Singapore', 'HapagLloyd', 18, 'on_time'),
        ('RT_003', 'Taipei', 'Singapore', 'Evergreen', 8, 'on_time'),
        ('RT_004', 'Los Angeles', 'Singapore', 'MSC', 21, 'on_time')
""")

print("Supply chain schema created with sample data.")
Expected Output
Supply chain schema created with sample data.
3

Create the Analyst agent

Build a CrewAI Agent that queries inventory levels, analyzes supplier data, and stores findings in shared HatiData memory.

Python
from crewai import Agent
from crewai_hatidata import (
    HatiDataQueryTool,
    HatiDataDescribeTableTool,
    HatiDataContextSearchTool,
)

# HatiData tools for the analyst
query_tool = HatiDataQueryTool(host="localhost", port=5439, agent_id="supply-analyst")
describe_tool = HatiDataDescribeTableTool(host="localhost", port=5439, agent_id="supply-analyst")
context_tool = HatiDataContextSearchTool(host="localhost", port=5439, agent_id="supply-analyst")

analyst_agent = Agent(
    role="Supply Chain Analyst",
    goal="Identify inventory risks and supplier bottlenecks by querying "
         "live supply chain data. Store all findings in shared memory.",
    backstory="You are a senior supply chain analyst with 15 years of "
              "experience in semiconductor logistics. You query databases "
              "to find patterns and flag risks before they become crises.",
    tools=[query_tool, describe_tool, context_tool],
    verbose=True,
)

# Example: the analyst checks for low inventory
# (In a real Crew run, CrewAI orchestrates this automatically)
low_stock = query_tool._run("""
    SELECT i.sku, i.product_name, i.quantity, i.reorder_point,
           s.name AS supplier, s.status AS supplier_status
    FROM inventory i
    JOIN suppliers s ON i.supplier_id = s.supplier_id
    WHERE i.quantity < i.reorder_point
    ORDER BY (i.reorder_point - i.quantity) DESC
""")

# Store finding in shared memory via SQL INSERT
query_tool._run(
    "INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
    "'CRITICAL: 2 SKUs below reorder point. "
    "SKU_101 (Memory Module B) at 45 units vs 100 reorder point. "
    "Supplier GlobalChip Ltd status: DELAYED.', "
    "'inventory-alert,critical,SKU_101', 'supply_chain_warroom')"
)

print("Analyst agent created and initial scan complete.")
Expected Output
Analyst agent created and initial scan complete.

Note: The Analyst stores findings in the 'supply_chain_warroom' namespace so other agents can read them.

4

Create the Monitor agent

Build an Agent that uses semantic triggers to watch for supply chain anomalies and route disruptions.

Python
from crewai import Agent
from hatidata_agent import HatiDataAgent
from crewai_hatidata import HatiDataQueryTool, HatiDataContextSearchTool

query_tool = HatiDataQueryTool(host="localhost", port=5439, agent_id="supply-monitor")
context_tool = HatiDataContextSearchTool(host="localhost", port=5439, agent_id="supply-monitor")
client = HatiDataAgent(host="localhost", port=5439, agent_id="supply-monitor", framework="crewai")

monitor_agent = Agent(
    role="Disruption Monitor",
    goal="Continuously watch for supply chain disruptions using semantic "
         "triggers. Alert when shipping delays, supplier issues, or port "
         "congestion patterns match historical disruption signatures.",
    backstory="You are an AI operations monitor that never sleeps. You "
              "watch global logistics feeds and compare current events "
              "against historical disruption patterns stored in memory.",
    tools=[query_tool, context_tool],
    verbose=True,
)

# Store historical disruption patterns for semantic matching
client.execute(
    "INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
    "'Port congestion in Shenzhen caused 3-week delays for APAC "
    "semiconductor shipments in Q3 2024. Affected suppliers: "
    "TechParts Asia, GlobalChip Ltd.', "
    "'disruption-pattern,port-congestion,APAC', 'supply_chain_warroom')"
)

client.execute(
    "INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
    "'Carrier delays from MaerskLine on Shenzhen-Singapore route "
    "historically correlate with inventory shortfalls within 2 weeks.', "
    "'disruption-pattern,carrier-delay,correlation', 'supply_chain_warroom')"
)

# Check current routes against disruption patterns
disruptions = client.query("""
    SELECT r.route_id, r.origin, r.carrier, r.status,
           semantic_rank(m.embedding, 'shipping delay port congestion') AS risk
    FROM shipping_routes r
    JOIN_VECTOR agent_memories m
      ON semantic_match(m.embedding, 'shipping delay port congestion', 0.7)
    WHERE r.status = 'delayed'
    ORDER BY risk DESC
""")

for d in disruptions:
    print(f"  ALERT: Route {d['route_id']} ({d['origin']} via {d['carrier']}) "
          f"- risk score: {d['risk']:.3f}")

    # Store alert in shared memory for the Responder
    client.execute(
        "INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
        f"'DISRUPTION DETECTED: Route {d["route_id"]} from {d["origin"]} "
        f"via {d["carrier"]} is delayed. Risk score {d["risk"]:.3f}. "
        f"Matches historical port congestion pattern.', "
        f"'active-disruption,{d["route_id"]},high-priority', 'supply_chain_warroom')"
    )

print("\nMonitor agent created. Disruption alerts stored in shared memory.")
Expected Output
  ALERT: Route RT_001 (Shenzhen via MaerskLine) - risk score: 0.912

Monitor agent created. Disruption alerts stored in shared memory.
5

Create the Responder agent

Build an Agent that reads shared memory from the Analyst and Monitor, then generates coordinated action plans.

Python
from crewai import Agent
from hatidata_agent import HatiDataAgent
from crewai_hatidata import HatiDataQueryTool, HatiDataContextSearchTool

query_tool = HatiDataQueryTool(host="localhost", port=5439, agent_id="supply-responder")
context_tool = HatiDataContextSearchTool(host="localhost", port=5439, agent_id="supply-responder")
client = HatiDataAgent(host="localhost", port=5439, agent_id="supply-responder", framework="crewai")

responder_agent = Agent(
    role="Response Coordinator",
    goal="Read alerts from the Analyst and Monitor agents via shared memory. "
         "Generate actionable response plans that combine inventory data, "
         "supplier alternatives, and route options.",
    backstory="You are a crisis response coordinator who synthesizes inputs "
              "from multiple intelligence sources to produce clear action "
              "plans. You always check shared memory for the latest alerts.",
    tools=[query_tool, context_tool],
    verbose=True,
)

# Read all active alerts from shared memory via SQL
alerts = client.query("""
    SELECT content, semantic_rank(embedding,
        'active disruption critical inventory alert') AS score
    FROM _hatidata_memory.memories
    WHERE namespace = 'supply_chain_warroom'
    ORDER BY score DESC LIMIT 10
""")

print(f"Found {len(alerts)} active alerts in shared memory:\n")
for alert in alerts:
    print(f"  [{alert['score']:.2f}] {alert['content'][:100]}...")

# Generate response plan using SQL data + memory context
alternative_suppliers = client.query("""
    SELECT s.supplier_id, s.name, s.region, s.reliability_score, s.lead_time_days
    FROM suppliers s
    WHERE s.status = 'active'
      AND s.region = 'APAC'
    ORDER BY s.reliability_score DESC
""")

# Store the response plan in shared memory
response_plan = (
    "ACTION PLAN: SKU_101 shortage mitigation. "
    "1) Redirect orders to TechParts Asia (reliability: 0.92, lead: 14d). "
    "2) Reroute via Taipei-Singapore (Evergreen, RT_003, 8 days). "
    "3) Increase reorder point for SKU_101 from 100 to 150 units. "
    "4) Notify procurement team for emergency PO."
)

client.execute(
    "INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
    f"'{response_plan}', 'action-plan,SKU_101,approved', 'supply_chain_warroom')"
)

print(f"\nResponse plan generated and stored in shared memory.")
print(f"Plan: {response_plan}")
Expected Output
Found 3 active alerts in shared memory:

  [0.95] CRITICAL: 1 SKUs below reorder point. SKU_101 (Memory Module B) at 45 un...
  [0.91] DISRUPTION DETECTED: Route RT_001 from Shenzhen via MaerskLine is delayed...
  [0.78] Carrier delays from MaerskLine on Shenzhen-Singapore route historically c...

Response plan generated and stored in shared memory.
Plan: ACTION PLAN: SKU_101 shortage mitigation. 1) Redirect orders to TechParts Asia (reliability: 0.92, lead: 14d). 2) Reroute via Taipei-Singapore (Evergreen, RT_003, 8 days). 3) Increase reorder point for SKU_101 from 100 to 150 units. 4) Notify procurement team for emergency PO.
6

Orchestrate the crew

Wire all three agents into a CrewAI Crew with task assignments and run the full supply chain war room.

Python
from crewai import Crew, Task
from crewai_hatidata import HatiDataQueryTool, HatiDataContextSearchTool

query_tool = HatiDataQueryTool(host="localhost", port=5439, agent_id="supply-crew")
context_tool = HatiDataContextSearchTool(host="localhost", port=5439, agent_id="supply-crew")

# Re-create agents with tools
from crewai import Agent

analyst = Agent(
    role="Supply Chain Analyst",
    goal="Query inventory and supplier data. Flag risks in shared memory.",
    backstory="Senior supply chain analyst with deep SQL expertise.",
    tools=[query_tool, context_tool],
)

monitor = Agent(
    role="Disruption Monitor",
    goal="Detect disruptions by matching current events against memory.",
    backstory="AI operations monitor watching global logistics 24/7.",
    tools=[query_tool, context_tool],
)

responder = Agent(
    role="Response Coordinator",
    goal="Read all alerts from shared memory and produce action plans.",
    backstory="Crisis coordinator who synthesizes multi-source intelligence.",
    tools=[query_tool, context_tool],
)

# Define tasks
analyze_task = Task(
    description="Query inventory table for SKUs below reorder point. "
                "Check supplier status. Store all findings in shared memory "
                "with namespace 'supply_chain_warroom'.",
    expected_output="List of at-risk SKUs with supplier status.",
    agent=analyst,
)

monitor_task = Task(
    description="Check shipping_routes for delays. Use JOIN_VECTOR to match "
                "delayed routes against historical disruption patterns in memory. "
                "Store alerts in shared memory.",
    expected_output="List of disruption alerts with risk scores.",
    agent=monitor,
)

respond_task = Task(
    description="Read all alerts from shared memory. Query alternative suppliers "
                "and routes. Generate a prioritized action plan. Store the plan "
                "in shared memory.",
    expected_output="Actionable response plan with specific recommendations.",
    agent=responder,
)

# Run the crew
crew = Crew(
    agents=[analyst, monitor, responder],
    tasks=[analyze_task, monitor_task, respond_task],
    verbose=True,
)

result = crew.kickoff()
print("\n" + "=" * 60)
print("SUPPLY CHAIN WAR ROOM - COMPLETE")
print("=" * 60)
print(result)
Expected Output
[Analyst] Querying inventory levels...
[Analyst] Found 2 SKUs below reorder point
[Analyst] Stored findings in shared memory

[Monitor] Checking shipping routes for disruptions...
[Monitor] Route RT_001 matches historical port congestion pattern (risk: 0.912)
[Monitor] Stored disruption alert in shared memory

[Responder] Reading 3 alerts from shared memory...
[Responder] Generating response plan...
[Responder] Plan stored in shared memory

============================================================
SUPPLY CHAIN WAR ROOM - COMPLETE
============================================================
ACTION PLAN: SKU_101 shortage mitigation.
1) Redirect orders to TechParts Asia (reliability: 0.92, lead: 14d)
2) Reroute via Taipei-Singapore (Evergreen, RT_003, 8 days)
3) Increase reorder point for SKU_101 from 100 to 150 units
4) Notify procurement team for emergency PO

Note: All three agents share a single HatiData memory namespace. The Analyst writes findings, the Monitor writes alerts, and the Responder reads everything to produce a coordinated plan.

Ready to build?

Install HatiData locally and start building with CrewAI in minutes.

Join Waitlist