Gemini + HatiData: Fraud Detection Pipeline
Build an intelligent fraud detection agent using Google Gemini and HatiData's hybrid SQL.
What You'll Build
A Gemini-powered agent that monitors transactions, builds fraud pattern memory, and flags suspicious activity using JOIN_VECTOR and semantic_rank().
Prerequisites
$pip install hatidata-agent google-generativeai
$hati init
$Google AI API key
Architecture
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Gemini │───▶│ HatiData │───▶│ Transactions │
│ Agent │ │ MCP Server │ │ (DuckDB) │
└──────┬───────┘ └──────┬───────┘ └──────────────┘
│ │
│ ┌──────▼───────┐
└───────────▶│ Fraud Memory │
│ + CoT Log │
└──────────────┘Key Concepts
- ●JOIN_VECTOR merges structured transaction data with semantic fraud pattern memory in a single SQL query
- ●Semantic fraud detection finds novel patterns that rule-based systems miss by matching against learned fraud signatures
- ●Chain-of-thought audit trails provide hash-chained, tamper-evident reasoning logs for regulatory compliance
- ●Real-time pattern memory lets the agent learn from every flagged transaction and improve detection over time
Step-by-Step Implementation
Initialize project & install dependencies
Set up a new Python project with HatiData and the Google Generative AI SDK.
pip install hatidata-agent google-generativeai
hati initNote: hati init creates a .hati/config.toml config and initializes a local DuckDB database on port 5439.
Configure HatiData with a transactions schema
Define the transactions table that the fraud detection agent will query and populate it with sample data.
import psycopg2
# Connect via Postgres wire protocol (DuckDB-backed)
conn = psycopg2.connect(
host="localhost", port=5439,
user="admin", password="admin",
dbname="fraud_detection"
)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS transactions (
id VARCHAR PRIMARY KEY,
customer_id VARCHAR NOT NULL,
amount DECIMAL(12, 2),
counterparty VARCHAR,
country VARCHAR,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
category VARCHAR,
flagged BOOLEAN DEFAULT FALSE
)
""")
# Insert sample transactions
cur.execute("""
INSERT INTO transactions (id, customer_id, amount, counterparty, country, category)
VALUES
('tx_001', 'cust_100', 150000.00, 'OffshoreHoldings Ltd', 'KY', 'wire'),
('tx_002', 'cust_100', 2400.00, 'Amazon', 'US', 'purchase'),
('tx_003', 'cust_101', 89000.00, 'ShellCorp SA', 'PA', 'wire'),
('tx_004', 'cust_102', 45.99, 'Netflix', 'US', 'subscription'),
('tx_005', 'cust_101', 120000.00, 'OffshoreHoldings Ltd', 'KY', 'wire')
""")
conn.commit()
cur.close()
conn.close()
print("Schema created and sample data loaded.")Schema created and sample data loaded.Build the Gemini fraud detection agent
Create an agent class that uses Google Gemini for reasoning and HatiData for persistent fraud pattern memory.
import google.generativeai as genai
from hatidata_agent import HatiDataAgent
genai.configure(api_key="YOUR_GOOGLE_AI_KEY")
class FraudDetectionAgent:
def __init__(self):
self.model = genai.GenerativeModel("gemini-1.5-pro")
self.hati = HatiDataAgent(host="localhost", port=5439, agent_id="fraud-detector", framework="gemini")
def analyze_transaction(self, tx_id: str) -> dict:
"""Analyze a single transaction for fraud indicators."""
# Fetch transaction details via SQL
rows = self.hati.query(f"""
SELECT id, customer_id, amount, counterparty, country, category
FROM transactions WHERE id = '{tx_id}'
""")
tx = rows[0]
# Ask Gemini to assess the transaction
prompt = f"""Analyze this financial transaction for fraud risk:
Amount: ${tx['amount']}, Counterparty: {tx['counterparty']},
Country: {tx['country']}, Category: {tx['category']}
Consider: unusual amounts, high-risk jurisdictions, shell companies.
Return a JSON with: risk_score (0-1), reasons (list), recommended_action."""
response = self.model.generate_content(prompt)
analysis = response.text
# Store the fraud pattern in HatiData memory
self.hati.execute(
"INSERT INTO _hatidata_memory.memories (content, tags, namespace) "
f"VALUES ('Transaction {tx_id}: {analysis}', "
f"'fraud-analysis,{tx["country"]},{tx["category"]}', 'fraud_patterns')"
)
return {"tx_id": tx_id, "analysis": analysis}
agent = FraudDetectionAgent()Note: The agent stores every analysis as a memory so it can reference past fraud patterns when evaluating new transactions.
Store and query transaction patterns
Use SQL INSERTs to store fraud patterns in HatiData memory and JOIN_VECTOR to find transactions semantically similar to known fraud.
# Store known fraud patterns as agent memories
agent.hati.execute(
"INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
"'Large wire transfers to offshore jurisdictions (Cayman Islands, Panama) "
"with shell company counterparties are high-risk indicators.', "
"'fraud-pattern,wire,offshore', 'fraud_patterns')"
)
agent.hati.execute(
"INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
"'Rapid successive transactions from the same customer to the same "
"counterparty within 24 hours suggest structuring.', "
"'fraud-pattern,structuring', 'fraud_patterns')"
)
# Query: find transactions similar to known fraud patterns
results = agent.hati.query("""
SELECT t.id, t.amount, t.counterparty, t.country,
semantic_rank(m.embedding, 'offshore wire transfer shell company') AS risk_score
FROM transactions t
JOIN_VECTOR agent_memories m
ON semantic_match(m.embedding, 'offshore wire transfer shell company', 0.7)
WHERE t.amount > 10000
ORDER BY risk_score DESC
LIMIT 10
""")
for row in results:
print(f" {row['id']}: ${row['amount']:,.2f} to {row['counterparty']} "
f"({row['country']}) - risk: {row['risk_score']:.3f}") tx_001: $150,000.00 to OffshoreHoldings Ltd (KY) - risk: 0.934
tx_005: $120,000.00 to OffshoreHoldings Ltd (KY) - risk: 0.921
tx_003: $89,000.00 to ShellCorp SA (PA) - risk: 0.887Add chain-of-thought logging
Log every detection decision with hash-chained reasoning steps for regulatory compliance and audit trails.
# Log reasoning steps for each flagged transaction
for row in results:
if row["risk_score"] > 0.8:
# Step 1: Log the initial detection
agent.hati.execute(
"INSERT INTO _hatidata_cot.agent_traces "
"(session_id, step_type, content, confidence) VALUES ("
f"'fraud_review_{row["id"]}', 'observation', "
f"'Transaction {row["id"]}: ${row["amount"]:,.2f} wire to "
f"{row["counterparty"]} in {row["country"]}', {row['risk_score']})"
)
# Step 2: Log the Gemini analysis
analysis = agent.analyze_transaction(row["id"])
agent.hati.execute(
"INSERT INTO _hatidata_cot.agent_traces "
"(session_id, step_type, content, confidence) VALUES ("
f"'fraud_review_{row["id"]}', 'analysis', "
f"'Gemini assessment: {analysis["analysis"]}', {row['risk_score']})"
)
# Step 3: Log the recommendation
agent.hati.execute(
"INSERT INTO _hatidata_cot.agent_traces "
"(session_id, step_type, content, confidence) VALUES ("
f"'fraud_review_{row["id"]}', 'recommendation', "
f"'FLAGGED for human review. Risk score: {row["risk_score"]:.3f}. "
f"Matched known offshore shell company pattern.', {row['risk_score']})"
)
print("Chain-of-thought logged for all flagged transactions.")
print("Verify with: SELECT * FROM _hatidata_cot.agent_traces "
"WHERE session_id LIKE 'fraud_review_%' ORDER BY step_number;")Chain-of-thought logged for all flagged transactions.Note: Each CoT entry is SHA-256 hash-chained to the previous step. This creates a tamper-evident audit trail that regulators can verify.
Run the pipeline end-to-end
Execute the full fraud detection pipeline and review the results.
def run_fraud_pipeline():
agent = FraudDetectionAgent()
# Scan all recent transactions
transactions = agent.hati.query("""
SELECT id FROM transactions
WHERE timestamp > CURRENT_TIMESTAMP - INTERVAL '24 hours'
""")
print(f"Scanning {len(transactions)} transactions...\n")
flagged = []
for tx in transactions:
# Check against known fraud patterns via SQL
matches = agent.hati.query(
"SELECT content, semantic_rank(embedding, "
f"'{tx["id"]}') AS score "
"FROM _hatidata_memory.memories "
"WHERE namespace = 'fraud_patterns' "
"ORDER BY score DESC LIMIT 1"
)
risk_score = matches[0]["score"] if matches else 0.0
if risk_score > 0.8:
result = agent.analyze_transaction(tx["id"])
flagged.append(result)
print(f"FLAGGED: {tx['id']} (risk: {risk_score:.3f})")
else:
print(f" CLEAR: {tx['id']}")
print(f"\nPipeline complete: {len(flagged)}/{len(transactions)} flagged")
patterns = agent.hati.query(
"SELECT COUNT(*) AS cnt FROM _hatidata_memory.memories "
"WHERE namespace = 'fraud_patterns'"
)
print(f"Fraud patterns in memory: {patterns[0]['cnt']}")
run_fraud_pipeline()Scanning 5 transactions...
FLAGGED: tx_001 (risk: 0.934)
FLAGGED: tx_005 (risk: 0.921)
FLAGGED: tx_003 (risk: 0.887)
CLEAR: tx_002
CLEAR: tx_004
Pipeline complete: 3/5 flagged
Fraud patterns in memory: 5Ready to build?
Install HatiData locally and start building with Google Gemini in minutes.
Join Waitlist