AutoGen + HatiData: Research Team
Build a multi-agent research team with AutoGen GroupChat and shared HatiData memory.
What You'll Build
An AutoGen GroupChat with Researcher, Analyst, and Writer agents sharing HatiData memory.
Prerequisites
$pip install hatidata-agent autogen-agentchat
$hati init
Architecture
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Researcher │ │ Analyst │ │ Writer │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
└────────────────┼────────────────┘
┌──────▼───────┐
│ HatiData │
│ Shared Memory│
└──────────────┘Key Concepts
- ●Shared memory between GroupChat agents: all agents read and write to the same HatiData namespace, enabling true collaboration without passing messages through the chat alone.
- ●Role-based memory access: each agent writes with its own agent_role tag, making it easy to trace which agent contributed which finding and audit the research process.
- ●SQL analytics on research data: the Analyst agent runs SQL queries directly on research tables to identify patterns, gaps, and statistical insights impossible with chat-only workflows.
- ●Automated synthesis: the Writer agent gathers all structured data from shared memory and produces a coherent report, with confidence scores and source attribution from the original agents.
Step-by-Step Implementation
Install dependencies
Install the HatiData client library and AutoGen framework.
pip install hatidata-agent autogen-agentchat autogen-ext[openai]Note: Requires Python 3.10+. The autogen-ext[openai] package provides the OpenAI model client for AutoGen agents.
Set up HatiData for shared research memory
Create the database schema for collaborative research -- tables for notes, findings, and sources that all agents share.
from hatidata_agent import HatiDataAgent
client = HatiDataAgent(host="localhost", port=5439, agent_id="research-admin", framework="autogen")
client.execute("""
CREATE TABLE IF NOT EXISTS research_notes (
note_id VARCHAR PRIMARY KEY,
agent_role VARCHAR NOT NULL,
topic VARCHAR NOT NULL,
content TEXT NOT NULL,
confidence FLOAT DEFAULT 0.0,
sources TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
client.execute("""
CREATE TABLE IF NOT EXISTS findings (
finding_id VARCHAR PRIMARY KEY,
agent_role VARCHAR NOT NULL,
category VARCHAR NOT NULL,
title VARCHAR NOT NULL,
summary TEXT NOT NULL,
evidence TEXT,
status VARCHAR DEFAULT 'draft',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
client.execute("""
CREATE TABLE IF NOT EXISTS sources (
source_id VARCHAR PRIMARY KEY,
url TEXT,
title VARCHAR NOT NULL,
credibility_score FLOAT DEFAULT 0.0,
added_by VARCHAR NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Store initial research context in agent memory
client.execute("""
INSERT INTO _hatidata_memory.memories
(content, tags, namespace)
VALUES
('Research topic: Impact of large language models on software development productivity. Focus areas: code generation, debugging, documentation, testing.',
'research-brief,llm,productivity', 'research_team')
""")
print("Research database initialized.")
print("Tables: research_notes, findings, sources")Research database initialized.
Tables: research_notes, findings, sourcesNote: All three agents share these tables and the 'research_team' memory namespace. Each agent writes with its own agent_role for traceability.
Create the Researcher agent
Build an AutoGen AssistantAgent that searches for information and stores findings in HatiData shared memory.
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from hatidata_agent import HatiDataAgent
import uuid
hati = HatiDataAgent(host="localhost", port=5439, agent_id="researcher", framework="autogen")
model_client = OpenAIChatCompletionClient(model="gpt-4o")
async def store_research_note(
topic: str, content: str, sources: str
) -> str:
"""Store a research note in shared HatiData memory."""
note_id = f"note_{uuid.uuid4().hex[:8]}"
hati.execute(f"""
INSERT INTO research_notes
(note_id, agent_role, topic, content,
confidence, sources)
VALUES
('{note_id}', 'researcher', '{topic}',
'{content}', 0.85, '{sources}')
""")
hati.execute(f"""
INSERT INTO _hatidata_memory.memories
(content, tags, namespace)
VALUES
('Research note on {topic}: {content}',
'research,note,{topic.lower().replace(" ", "-")}',
'research_team')
""")
return f"Stored research note {note_id} on '{topic}'"
async def search_existing_research(query: str) -> str:
"""Search shared memory for existing research."""
results = hati.query(f"""
SELECT content, tags,
semantic_rank(embedding, '{query}') AS relevance
FROM _hatidata_memory.memories
WHERE namespace = 'research_team'
ORDER BY relevance DESC
LIMIT 5
""")
if not results:
return "No existing research found on this topic."
summaries = []
for r in results:
summaries.append(
f"- [{r['relevance']:.2f}] {r['content'][:150]}"
)
return "Existing research:\n" + "\n".join(summaries)
researcher = AssistantAgent(
name="Researcher",
model_client=model_client,
system_message=(
"You are a research specialist. Your job is to:\n"
"1. Search existing shared memory for prior research\n"
"2. Gather new information on the assigned topic\n"
"3. Store findings using store_research_note\n"
"4. Cite sources for every claim\n"
"Always check shared memory first to avoid duplicates."
),
tools=[store_research_note, search_existing_research],
)
print("Researcher agent created with HatiData memory tools.")Researcher agent created with HatiData memory tools.Create the Analyst agent
Build an AssistantAgent that queries shared HatiData memory and runs SQL analytics on collected findings.
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from hatidata_agent import HatiDataAgent
hati = HatiDataAgent(host="localhost", port=5439, agent_id="analyst", framework="autogen")
model_client = OpenAIChatCompletionClient(model="gpt-4o")
async def analyze_findings(category: str) -> str:
"""Run SQL analytics on research findings."""
results = hati.query(f"""
SELECT category,
COUNT(*) AS total_notes,
AVG(confidence) AS avg_confidence,
MIN(created_at) AS earliest,
MAX(created_at) AS latest
FROM research_notes
WHERE topic LIKE '%{category}%'
GROUP BY category
ORDER BY total_notes DESC
""")
if not results:
return f"No findings yet for '{category}'."
lines = [f"Analysis for '{category}':"]
for row in results:
lines.append(
f" - {row['total_notes']} notes, "
f"avg confidence: {row['avg_confidence']:.2f}"
)
return "\n".join(lines)
async def query_research_data(sql: str) -> str:
"""Run a custom SQL query against the research database."""
results = hati.query(sql)
if not results:
return "Query returned no results."
lines = []
for row in results:
lines.append(str(dict(row)))
return (
f"Query returned {len(results)} rows:\n"
+ "\n".join(lines)
)
async def find_gaps(topic: str) -> str:
"""Identify gaps in research coverage."""
existing = hati.query(f"""
SELECT content, tags,
semantic_rank(embedding, '{topic}') AS relevance
FROM _hatidata_memory.memories
WHERE namespace = 'research_team'
ORDER BY relevance DESC
LIMIT 10
""")
covered = [
r["content"][:100]
for r in existing
]
findings = hati.query("""
SELECT DISTINCT category, COUNT(*) AS count
FROM findings
GROUP BY category
ORDER BY count ASC LIMIT 5
""")
gaps = [f["category"] for f in findings] if findings else []
return (
f"Covered topics ({len(covered)} memories): "
f"{', '.join(covered[:3])}...\n"
f"Under-researched: "
f"{', '.join(gaps) if gaps else 'None yet'}"
)
analyst = AssistantAgent(
name="Analyst",
model_client=model_client,
system_message=(
"You are a research analyst. Your job is to:\n"
"1. Run SQL queries on shared data with query_research_data\n"
"2. Analyze patterns with analyze_findings\n"
"3. Identify gaps with find_gaps\n"
"4. Provide data-driven insights to guide the team\n"
"Focus on quantitative analysis and evidence quality."
),
tools=[analyze_findings, query_research_data, find_gaps],
)
print("Analyst agent created with SQL analytics tools.")Analyst agent created with SQL analytics tools.Create the Writer agent
Build an AssistantAgent that reads shared memory from both the Researcher and Analyst, then synthesizes a final report.
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from hatidata_agent import HatiDataAgent
hati = HatiDataAgent(host="localhost", port=5439, agent_id="writer", framework="autogen")
model_client = OpenAIChatCompletionClient(model="gpt-4o")
async def gather_all_research() -> str:
"""Read all research notes and findings from shared memory."""
notes = hati.query("""
SELECT agent_role, topic, content, confidence, sources
FROM research_notes
ORDER BY confidence DESC, created_at DESC
""")
findings = hati.query("""
SELECT category, title, summary, evidence, status
FROM findings
WHERE status IN ('draft', 'verified')
ORDER BY category, created_at DESC
""")
report_data = "=== RESEARCH NOTES ===\n"
for note in notes:
report_data += (
f"[{note['agent_role']}] {note['topic']} "
f"(confidence: {note['confidence']:.2f})\n"
f" {note['content']}\n\n"
)
report_data += "=== VERIFIED FINDINGS ===\n"
for f in findings:
report_data += (
f"[{f['category']}] {f['title']}\n"
f" {f['summary']}\n"
f" Evidence: {f['evidence']}\n\n"
)
return report_data
async def store_final_report(title: str, content: str) -> str:
"""Store the synthesized report in HatiData memory."""
hati.execute(f"""
INSERT INTO _hatidata_memory.memories
(content, tags, namespace)
VALUES
('Final Report - {title}: {content[:500]}',
'report,final,synthesis', 'research_team')
""")
hati.execute(f"""
INSERT INTO findings
(finding_id, agent_role, category,
title, summary, status)
VALUES
('report_final', 'writer', 'synthesis',
'{title}', '{content[:500]}', 'published')
""")
return f"Final report '{title}' stored in shared memory."
writer = AssistantAgent(
name="Writer",
model_client=model_client,
system_message=(
"You are a research report writer. Your job is to:\n"
"1. Gather all research with gather_all_research\n"
"2. Synthesize findings into a structured report\n"
"3. Highlight key insights and contradictions\n"
"4. Store the final report with store_final_report\n"
"Write for a technical audience. Cite confidence scores."
),
tools=[gather_all_research, store_final_report],
)
print("Writer agent created with synthesis tools.")Writer agent created with synthesis tools.Run the GroupChat
Orchestrate all three agents in an AutoGen GroupChat where they collaborate through shared HatiData memory.
import asyncio
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
termination = TextMentionTermination("RESEARCH_COMPLETE")
team = RoundRobinGroupChat(
participants=[researcher, analyst, writer],
termination_condition=termination,
max_turns=12,
)
async def run_research():
"""Execute the collaborative research workflow."""
task = (
"Research the impact of large language models on software "
"development productivity. The Researcher should gather "
"findings on code generation, debugging, and testing. "
"The Analyst should identify patterns and gaps. "
"The Writer should synthesize a final report. "
"Say RESEARCH_COMPLETE when the report is stored."
)
result = await team.run(task=task)
print("\n=== GroupChat Complete ===")
print(f"Total messages: {len(result.messages)}")
from hatidata_agent import HatiDataAgent
hati = HatiDataAgent(host="localhost", port=5439, agent_id="research-admin", framework="autogen")
notes = hati.query(
"SELECT COUNT(*) AS cnt FROM research_notes"
)
findings = hati.query(
"SELECT COUNT(*) AS cnt FROM findings"
)
memories = hati.query(
"SELECT COUNT(*) AS cnt FROM _hatidata_memory.memories WHERE namespace = 'research_team'"
)
print(f"Research notes stored: {notes[0]['cnt']}")
print(f"Findings stored: {findings[0]['cnt']}")
print(f"Shared memories: {memories[0]['cnt']}")
print("\nAll data queryable via SQL at localhost:5439")
asyncio.run(run_research())Researcher: Searching shared memory for existing research...
Researcher: No prior research found. Starting fresh.
Researcher: Stored note on 'LLM Code Generation' -- studies show
30-50% productivity gains for boilerplate tasks.
Researcher: Stored note on 'LLM Debugging' -- developers report
25% faster bug resolution with AI-assisted debugging.
Researcher: Stored note on 'LLM Testing' -- auto-generated test
suites achieve 60-80% coverage on first pass.
Analyst: Running SQL analytics on research notes...
Analyst: 3 notes collected, avg confidence: 0.83
Analyst: Gap identified: no research on documentation generation.
Researcher: Storing additional note on 'LLM Documentation'...
Researcher: Stored note -- 40% reduction in documentation time.
Writer: Gathering all research from shared memory...
Writer: Synthesizing report from 4 notes and 2 findings...
Writer: Final report stored: "LLM Impact on Developer Productivity"
Writer: RESEARCH_COMPLETE
=== GroupChat Complete ===
Total messages: 9
Research notes stored: 4
Findings stored: 3
Shared memories: 6
All data queryable via SQL at localhost:5439Note: Each agent reads and writes to the same HatiData namespace. The Researcher stores notes, the Analyst queries them with SQL, and the Writer synthesizes the results. All data persists after the GroupChat ends.
Related Use Case
Operations
Customer Support
Agents That Remember Every Customer