Vertex AI + HatiData: Enterprise Reasoning Engine
Build an enterprise reasoning engine with Vertex AI and HatiData. Deploy in your VPC with full compliance.
What You'll Build
A Vertex AI Reasoning Engine with HatiData tools for memory, CoT logging, and branch isolation.
Prerequisites
$pip install hatidata-agent google-cloud-aiplatform
$hati init
$GCP project
Architecture
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Vertex AI │───▶│ HatiData │───▶│ Enterprise │
│ Reasoning │ │ (In-VPC) │ │ Data Lake │
└──────┬───────┘ └──────┬───────┘ └──────────────┘
│ ┌──────▼───────┐
└───────────▶│ Memory + CoT │
└──────────────┘Key Concepts
- ●VPC deployment: HatiData runs inside your cloud VPC with PrivateLink connectivity, so enterprise data never leaves your network perimeter.
- ●Branch isolation for safe speculation: agents create isolated DuckDB schemas to run hypothetical analyses without risk to production data -- zero-copy on create, copy-on-write on mutation.
- ●Compliance audit trails: the Chain-of-Thought Ledger records every reasoning step with SHA-256 hash chaining, making tampering detectable and enabling full decision replay for regulators.
- ●Enterprise-grade memory: decisions are stored as both structured SQL rows and semantic embeddings, enabling precedent search, analytics, and long-term institutional knowledge.
Step-by-Step Implementation
Set up GCP environment
Install dependencies and authenticate with Google Cloud for Vertex AI access.
# Install Python dependencies
pip install hatidata-agent google-cloud-aiplatform
# Authenticate with GCP
gcloud auth application-default login
gcloud config set project your-gcp-project-id
# Verify Vertex AI API is enabled
gcloud services enable aiplatform.googleapis.comNote: Requires an active GCP project with billing enabled and the Vertex AI API enabled. The gcloud CLI must be installed.
Configure HatiData for enterprise deployment
Set up HatiData with enterprise settings including VPC-ready configuration and compliance features.
# .hati/config.toml
[storage]
path = "./enterprise_data"
[memory]
default_namespace = "vertex_enterprise"
embedding_dimensions = 768
[proxy]
port = 5439
host = "127.0.0.1"
[cot]
enabled = true
retention_days = 2555
[branching]
enabled = true
max_branches = 100
# Initialize HatiData
# In your terminal:
# hati initNote: Run 'hati init' to create the local database. For VPC deployment, use the cloud provisioning flow.
Define enterprise data schemas
Create the tables that the Vertex AI agent will query -- decisions, policies, and audit logs.
-- Connect to HatiData: psql -h localhost -p 5439 -U admin
-- Decision tracking table
CREATE TABLE decisions (
decision_id VARCHAR PRIMARY KEY,
department VARCHAR NOT NULL,
category VARCHAR NOT NULL,
description TEXT NOT NULL,
amount DECIMAL(15, 2),
risk_level VARCHAR DEFAULT 'medium',
status VARCHAR DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Policy rules table
CREATE TABLE policies (
policy_id VARCHAR PRIMARY KEY,
name VARCHAR NOT NULL,
department VARCHAR NOT NULL,
rule_text TEXT NOT NULL,
max_amount DECIMAL(15, 2),
requires_approval BOOLEAN DEFAULT false,
active BOOLEAN DEFAULT true
);
-- Audit log with immutable trail
CREATE TABLE audit_log (
log_id VARCHAR PRIMARY KEY,
decision_id VARCHAR REFERENCES decisions(decision_id),
action VARCHAR NOT NULL,
actor VARCHAR NOT NULL,
reasoning TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Insert sample policies
INSERT INTO policies VALUES
('POL-001', 'Procurement Limit', 'finance',
'Purchases over $50,000 require VP approval', 50000, true, true),
('POL-002', 'Vendor Onboarding', 'procurement',
'New vendors require security review', NULL, true, true),
('POL-003', 'Travel Policy', 'operations',
'International travel requires director approval', 10000, true, true);Note: These tables are stored in HatiData's DuckDB engine with Iceberg format. The agent queries them via standard SQL through the Postgres wire protocol.
Build the Vertex AI reasoning agent
Create a Vertex AI agent that uses HatiData for persistent memory and decision tracking.
import vertexai
from vertexai.generative_models import GenerativeModel
from hatidata_agent import HatiDataAgent
vertexai.init(project="your-gcp-project-id", location="us-central1")
hati = HatiDataAgent(host="localhost", port=5439, agent_id="vertex-enterprise", framework="vertex")
def check_policy(department: str, amount: float) -> dict:
"""Check if a decision complies with department policies."""
results = hati.query(f"""
SELECT policy_id, name, rule_text, max_amount,
requires_approval
FROM policies
WHERE department = '{department}'
AND active = true
AND (max_amount IS NULL OR max_amount <= {amount})
ORDER BY max_amount DESC
""")
return {"policies": results, "compliant": len(results) == 0}
def store_decision(decision_id: str, department: str,
category: str, description: str,
amount: float, risk_level: str) -> dict:
"""Store a business decision with full audit trail."""
hati.execute(f"""
INSERT INTO decisions
(decision_id, department, category,
description, amount, risk_level)
VALUES
('{decision_id}', '{department}', '{category}',
'{description}', {amount}, '{risk_level}')
""")
hati.execute(f"""
INSERT INTO _hatidata_memory.memories
(content, tags, namespace)
VALUES
('Decision {decision_id}: {description} (${amount:,.2f}, {risk_level} risk)',
'{department},{category},{risk_level}', 'vertex_enterprise')
""")
return {"stored": True, "decision_id": decision_id}
def search_precedents(query: str) -> dict:
"""Search for similar past decisions."""
results = hati.query(f"""
SELECT content, tags,
semantic_rank(embedding, '{query}') AS relevance
FROM _hatidata_memory.memories
WHERE namespace = 'vertex_enterprise'
ORDER BY relevance DESC
LIMIT 5
""")
return {"precedents": results}
model = GenerativeModel(
"gemini-1.5-pro",
system_instruction=(
"You are an enterprise reasoning agent. For every "
"decision request:\n"
"1. Search for precedents\n"
"2. Check applicable policies\n"
"3. Store the decision\n"
"4. Provide a clear recommendation with reasoning"
),
)
precedents = search_precedents("software procurement finance")
policy_check = check_policy("finance", 75000)
chat = model.start_chat()
response = chat.send_message(
f"Approve a $75,000 software procurement for the finance "
f"department. Vendor: DataCorp. Category: infrastructure.\n"
f"Precedents: {precedents}\n"
f"Policy check: {policy_check}"
)
print(response.text)
store_decision(
"DEC-2025-0142", "finance", "infrastructure",
"DataCorp software procurement", 75000.0, "medium"
)I've analyzed this procurement request:
**Policy Check:** This exceeds the $50,000 procurement limit
(POL-001) and requires VP approval.
**Precedent Search:** Found 2 similar past decisions -- both
required additional security review for new vendors above $50K.
**Recommendation:** APPROVE WITH CONDITIONS
- Requires VP Finance sign-off (policy POL-001)
- Requires vendor security review (policy POL-002)
- Decision stored as DEC-2025-0142Implement branch isolation for safe reasoning
Use HatiData branches to let the agent run hypothetical analyses without touching production data.
from hatidata_agent import HatiDataAgent
hati = HatiDataAgent(host="localhost", port=5439, agent_id="vertex-enterprise", framework="vertex")
# Create an isolated branch schema for hypothetical analysis
hati.execute("CREATE SCHEMA branch_budget_q3")
print("Created branch: branch_budget_q3")
# Run analysis in the branch -- production data is untouched
impact = hati.query("""
SELECT department,
SUM(amount) AS total_spend,
SUM(amount) + 75000 AS projected_spend,
100000 - (SUM(amount) + 75000) AS remaining
FROM decisions
WHERE department = 'finance'
AND status = 'approved'
AND created_at >= '2025-01-01'
GROUP BY department
""")
print("Budget impact analysis:")
for row in impact:
print(f" Department: {row['department']}")
print(f" Current spend: ${row['total_spend']:,.2f}")
print(f" Projected: ${row['projected_spend']:,.2f}")
print(f" Remaining: ${row['remaining']:,.2f}")
# Discard the branch -- zero impact on production
hati.execute("DROP SCHEMA branch_budget_q3 CASCADE")
print("\nBranch branch_budget_q3 discarded. Production unchanged.")Created branch: branch_budget_q3
Budget impact analysis:
Department: finance
Current spend: $142,500.00
Projected: $217,500.00
Remaining: -$117,500.00
Branch branch_budget_q3 discarded. Production unchanged.Note: Branches use DuckDB schema isolation. Zero-copy on creation and copy-on-write on first mutation.
Add compliance audit trail
Log every reasoning step to the Chain-of-Thought Ledger and replay decisions for compliance review.
from hatidata_agent import HatiDataAgent
hati = HatiDataAgent(host="localhost", port=5439, agent_id="vertex-enterprise", framework="vertex")
session_id = "audit_session_001"
# Log reasoning steps to the CoT ledger via SQL
hati.execute(f"""
INSERT INTO _hatidata_cot.agent_traces
(session_id, step_type, content, confidence)
VALUES
('{session_id}', 'observation',
'Received procurement request: $75K for DataCorp', 1.0)
""")
hati.execute(f"""
INSERT INTO _hatidata_cot.agent_traces
(session_id, step_type, content, confidence)
VALUES
('{session_id}', 'analysis',
'Policy POL-001 triggered: amount exceeds $50K', 0.95)
""")
hati.execute(f"""
INSERT INTO _hatidata_cot.agent_traces
(session_id, step_type, content, confidence)
VALUES
('{session_id}', 'decision',
'APPROVE WITH CONDITIONS: VP sign-off + security review', 0.88)
""")
# Replay the decision trail
replay = hati.query(f"""
SELECT step_number, step_type, content, confidence, hash
FROM _hatidata_cot.agent_traces
WHERE session_id = '{session_id}'
ORDER BY step_number
""")
print("=== Compliance Audit Trail ===")
for step in replay:
print(f"Step {step['step_number']}: [{step['step_type'].upper()}]")
print(f" {step['content']}")
print(f" Confidence: {step['confidence']}")
print(f" Hash: {step['hash'][:16]}...")
print()=== Compliance Audit Trail ===
Step 1: [OBSERVATION]
Received procurement request: $75K for DataCorp
Confidence: 1.0
Hash: a3f8b2c1d4e5f6a7...
Step 2: [ANALYSIS]
Policy POL-001 triggered: amount exceeds $50K
Confidence: 0.95
Hash: b7d2e9f1a3c4b5d6...
Step 3: [DECISION]
APPROVE WITH CONDITIONS: VP sign-off + security review
Confidence: 0.88
Hash: c1a5d8e2f6b3c4a7...Note: Every step is SHA-256 hash-chained. Tampering breaks the chain. The CoT Ledger is append-only -- UPDATE, DELETE, and TRUNCATE are blocked.
Deploy and test end-to-end
Run the complete pipeline: receive a request, check policies, branch for analysis, and produce an auditable decision.
import time
import vertexai
from vertexai.generative_models import GenerativeModel
from hatidata_agent import HatiDataAgent
vertexai.init(project="your-gcp-project-id", location="us-central1")
hati = HatiDataAgent(host="localhost", port=5439, agent_id="vertex-enterprise", framework="vertex")
def enterprise_decision_pipeline(request: str) -> dict:
"""Full enterprise reasoning pipeline."""
session_id = f"pipeline_{int(time.time())}"
# Step 1: Log the incoming request to CoT ledger
hati.execute(f"""
INSERT INTO _hatidata_cot.agent_traces
(session_id, step_type, content, confidence)
VALUES
('{session_id}', 'observation',
'Incoming request: {request}', 1.0)
""")
# Step 2: Search for precedents in memory
precedents = hati.query(f"""
SELECT content, tags,
semantic_rank(embedding, '{request}') AS relevance
FROM _hatidata_memory.memories
WHERE namespace = 'vertex_enterprise'
ORDER BY relevance DESC
LIMIT 3
""")
# Step 3: Create branch for safe analysis
branch_name = f"analysis_{session_id}"
hati.execute(f"CREATE SCHEMA {branch_name}")
analysis = hati.query("""
SELECT COUNT(*) AS total_decisions,
SUM(CASE WHEN risk_level = 'high'
THEN 1 ELSE 0 END) AS high_risk,
AVG(amount) AS avg_amount
FROM decisions WHERE status = 'approved'
""")
# Step 4: Get AI recommendation
model = GenerativeModel("gemini-1.5-pro")
response = model.generate_content(
f"Request: {request}\n"
f"Precedents: {precedents}\n"
f"Portfolio: {analysis}\n"
f"Provide a recommendation with risk assessment."
)
# Step 5: Log the decision
hati.execute(f"""
INSERT INTO _hatidata_cot.agent_traces
(session_id, step_type, content, confidence)
VALUES
('{session_id}', 'decision',
'{response.text[:500]}', 0.90)
""")
# Step 6: Clean up branch
hati.execute(f"DROP SCHEMA {branch_name} CASCADE")
# Step 7: Store decision in memory for future precedent search
hati.execute(f"""
INSERT INTO _hatidata_memory.memories
(content, tags, namespace)
VALUES
('Decision: {request} -> {response.text[:200]}',
'decision,enterprise', 'vertex_enterprise')
""")
return {
"session_id": session_id,
"recommendation": response.text,
"precedents_found": len(precedents),
}
result = enterprise_decision_pipeline(
"Approve $120,000 annual contract with CloudVendor "
"for data infrastructure in engineering"
)
print(f"Session: {result['session_id']}")
print(f"Precedents found: {result['precedents_found']}")
print(f"\nRecommendation:\n{result['recommendation']}")Session: pipeline_1705312800
Precedents found: 2
Recommendation:
APPROVE WITH CONDITIONS: The $120,000 CloudVendor contract
falls within engineering budget limits but requires:
1. Director of Engineering approval (amount > $100K)
2. Security review for new vendor onboarding (POL-002)
3. 90-day performance review clause recommended
Risk Assessment: MEDIUM -- similar past decisions were approved
with the above conditions.Note: This pipeline demonstrates the three pillars of enterprise AI: persistent memory (precedent search), safe speculation (branch isolation), and compliance (CoT audit trail).