← All Cookbooks
Google GeminiIntermediate30 min

Gemini + HatiData: Fraud Detection Pipeline

Build an intelligent fraud detection agent using Google Gemini and HatiData's hybrid SQL.

What You'll Build

A Gemini-powered agent that monitors transactions, builds fraud pattern memory, and flags suspicious activity using JOIN_VECTOR and semantic_rank().

Prerequisites

$pip install hatidata-agent google-generativeai

$hati init

$Google AI API key

Architecture

┌──────────────┐    ┌──────────────┐    ┌──────────────┐
│   Gemini     │───▶│  HatiData    │───▶│ Transactions │
│   Agent      │    │  MCP Server  │    │   (DuckDB)   │
└──────┬───────┘    └──────┬───────┘    └──────────────┘
       │                   │
       │            ┌──────▼───────┐
       └───────────▶│ Fraud Memory │
                    │  + CoT Log   │
                    └──────────────┘

Key Concepts

  • JOIN_VECTOR merges structured transaction data with semantic fraud pattern memory in a single SQL query
  • Semantic fraud detection finds novel patterns that rule-based systems miss by matching against learned fraud signatures
  • Chain-of-thought audit trails provide hash-chained, tamper-evident reasoning logs for regulatory compliance
  • Real-time pattern memory lets the agent learn from every flagged transaction and improve detection over time

Step-by-Step Implementation

1

Initialize project & install dependencies

Set up a new Python project with HatiData and the Google Generative AI SDK.

Bash
pip install hatidata-agent google-generativeai
hati init

Note: hati init creates a .hati/config.toml config and initializes a local DuckDB database on port 5439.

2

Configure HatiData with a transactions schema

Define the transactions table that the fraud detection agent will query and populate it with sample data.

Python
import psycopg2

# Connect via Postgres wire protocol (DuckDB-backed)
conn = psycopg2.connect(
    host="localhost", port=5439,
    user="admin", password="admin",
    dbname="fraud_detection"
)
cur = conn.cursor()

cur.execute("""
    CREATE TABLE IF NOT EXISTS transactions (
        id VARCHAR PRIMARY KEY,
        customer_id VARCHAR NOT NULL,
        amount DECIMAL(12, 2),
        counterparty VARCHAR,
        country VARCHAR,
        timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        category VARCHAR,
        flagged BOOLEAN DEFAULT FALSE
    )
""")

# Insert sample transactions
cur.execute("""
    INSERT INTO transactions (id, customer_id, amount, counterparty, country, category)
    VALUES
        ('tx_001', 'cust_100', 150000.00, 'OffshoreHoldings Ltd', 'KY', 'wire'),
        ('tx_002', 'cust_100', 2400.00, 'Amazon', 'US', 'purchase'),
        ('tx_003', 'cust_101', 89000.00, 'ShellCorp SA', 'PA', 'wire'),
        ('tx_004', 'cust_102', 45.99, 'Netflix', 'US', 'subscription'),
        ('tx_005', 'cust_101', 120000.00, 'OffshoreHoldings Ltd', 'KY', 'wire')
""")
conn.commit()
cur.close()
conn.close()
print("Schema created and sample data loaded.")
Expected Output
Schema created and sample data loaded.
3

Build the Gemini fraud detection agent

Create an agent class that uses Google Gemini for reasoning and HatiData for persistent fraud pattern memory.

Python
import google.generativeai as genai
from hatidata_agent import HatiDataAgent

genai.configure(api_key="YOUR_GOOGLE_AI_KEY")

class FraudDetectionAgent:
    def __init__(self):
        self.model = genai.GenerativeModel("gemini-1.5-pro")
        self.hati = HatiDataAgent(host="localhost", port=5439, agent_id="fraud-detector", framework="gemini")

    def analyze_transaction(self, tx_id: str) -> dict:
        """Analyze a single transaction for fraud indicators."""
        # Fetch transaction details via SQL
        rows = self.hati.query(f"""
            SELECT id, customer_id, amount, counterparty, country, category
            FROM transactions WHERE id = '{tx_id}'
        """)
        tx = rows[0]

        # Ask Gemini to assess the transaction
        prompt = f"""Analyze this financial transaction for fraud risk:
        Amount: ${tx['amount']}, Counterparty: {tx['counterparty']},
        Country: {tx['country']}, Category: {tx['category']}

        Consider: unusual amounts, high-risk jurisdictions, shell companies.
        Return a JSON with: risk_score (0-1), reasons (list), recommended_action."""

        response = self.model.generate_content(prompt)
        analysis = response.text

        # Store the fraud pattern in HatiData memory
        self.hati.execute(
            "INSERT INTO _hatidata_memory.memories (content, tags, namespace) "
            f"VALUES ('Transaction {tx_id}: {analysis}', "
            f"'fraud-analysis,{tx["country"]},{tx["category"]}', 'fraud_patterns')"
        )

        return {"tx_id": tx_id, "analysis": analysis}

agent = FraudDetectionAgent()

Note: The agent stores every analysis as a memory so it can reference past fraud patterns when evaluating new transactions.

4

Store and query transaction patterns

Use SQL INSERTs to store fraud patterns in HatiData memory and JOIN_VECTOR to find transactions semantically similar to known fraud.

Python
# Store known fraud patterns as agent memories
agent.hati.execute(
    "INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
    "'Large wire transfers to offshore jurisdictions (Cayman Islands, Panama) "
    "with shell company counterparties are high-risk indicators.', "
    "'fraud-pattern,wire,offshore', 'fraud_patterns')"
)

agent.hati.execute(
    "INSERT INTO _hatidata_memory.memories (content, tags, namespace) VALUES ("
    "'Rapid successive transactions from the same customer to the same "
    "counterparty within 24 hours suggest structuring.', "
    "'fraud-pattern,structuring', 'fraud_patterns')"
)

# Query: find transactions similar to known fraud patterns
results = agent.hati.query("""
    SELECT t.id, t.amount, t.counterparty, t.country,
           semantic_rank(m.embedding, 'offshore wire transfer shell company') AS risk_score
    FROM transactions t
    JOIN_VECTOR agent_memories m
      ON semantic_match(m.embedding, 'offshore wire transfer shell company', 0.7)
    WHERE t.amount > 10000
    ORDER BY risk_score DESC
    LIMIT 10
""")

for row in results:
    print(f"  {row['id']}: ${row['amount']:,.2f} to {row['counterparty']} "
          f"({row['country']}) - risk: {row['risk_score']:.3f}")
Expected Output
  tx_001: $150,000.00 to OffshoreHoldings Ltd (KY) - risk: 0.934
  tx_005: $120,000.00 to OffshoreHoldings Ltd (KY) - risk: 0.921
  tx_003: $89,000.00 to ShellCorp SA (PA) - risk: 0.887
5

Add chain-of-thought logging

Log every detection decision with hash-chained reasoning steps for regulatory compliance and audit trails.

Python
# Log reasoning steps for each flagged transaction
for row in results:
    if row["risk_score"] > 0.8:
        # Step 1: Log the initial detection
        agent.hati.execute(
            "INSERT INTO _hatidata_cot.agent_traces "
            "(session_id, step_type, content, confidence) VALUES ("
            f"'fraud_review_{row["id"]}', 'observation', "
            f"'Transaction {row["id"]}: ${row["amount"]:,.2f} wire to "
            f"{row["counterparty"]} in {row["country"]}', {row['risk_score']})"
        )

        # Step 2: Log the Gemini analysis
        analysis = agent.analyze_transaction(row["id"])
        agent.hati.execute(
            "INSERT INTO _hatidata_cot.agent_traces "
            "(session_id, step_type, content, confidence) VALUES ("
            f"'fraud_review_{row["id"]}', 'analysis', "
            f"'Gemini assessment: {analysis["analysis"]}', {row['risk_score']})"
        )

        # Step 3: Log the recommendation
        agent.hati.execute(
            "INSERT INTO _hatidata_cot.agent_traces "
            "(session_id, step_type, content, confidence) VALUES ("
            f"'fraud_review_{row["id"]}', 'recommendation', "
            f"'FLAGGED for human review. Risk score: {row["risk_score"]:.3f}. "
            f"Matched known offshore shell company pattern.', {row['risk_score']})"
        )

print("Chain-of-thought logged for all flagged transactions.")
print("Verify with: SELECT * FROM _hatidata_cot.agent_traces "
      "WHERE session_id LIKE 'fraud_review_%' ORDER BY step_number;")
Expected Output
Chain-of-thought logged for all flagged transactions.

Note: Each CoT entry is SHA-256 hash-chained to the previous step. This creates a tamper-evident audit trail that regulators can verify.

6

Run the pipeline end-to-end

Execute the full fraud detection pipeline and review the results.

Python
def run_fraud_pipeline():
    agent = FraudDetectionAgent()

    # Scan all recent transactions
    transactions = agent.hati.query("""
        SELECT id FROM transactions
        WHERE timestamp > CURRENT_TIMESTAMP - INTERVAL '24 hours'
    """)

    print(f"Scanning {len(transactions)} transactions...\n")

    flagged = []
    for tx in transactions:
        # Check against known fraud patterns via SQL
        matches = agent.hati.query(
            "SELECT content, semantic_rank(embedding, "
            f"'{tx["id"]}') AS score "
            "FROM _hatidata_memory.memories "
            "WHERE namespace = 'fraud_patterns' "
            "ORDER BY score DESC LIMIT 1"
        )

        risk_score = matches[0]["score"] if matches else 0.0

        if risk_score > 0.8:
            result = agent.analyze_transaction(tx["id"])
            flagged.append(result)
            print(f"FLAGGED: {tx['id']} (risk: {risk_score:.3f})")
        else:
            print(f"  CLEAR: {tx['id']}")

    print(f"\nPipeline complete: {len(flagged)}/{len(transactions)} flagged")
    patterns = agent.hati.query(
        "SELECT COUNT(*) AS cnt FROM _hatidata_memory.memories "
        "WHERE namespace = 'fraud_patterns'"
    )
    print(f"Fraud patterns in memory: {patterns[0]['cnt']}")

run_fraud_pipeline()
Expected Output
Scanning 5 transactions...

FLAGGED: tx_001 (risk: 0.934)
FLAGGED: tx_005 (risk: 0.921)
FLAGGED: tx_003 (risk: 0.887)
  CLEAR: tx_002
  CLEAR: tx_004

Pipeline complete: 3/5 flagged
Fraud patterns in memory: 5

Ready to build?

Install HatiData locally and start building with Google Gemini in minutes.

Join Waitlist