Implement LLM-driven governance architecture with structured memory

This commit completes the transition to a pure LLM-driven agentic
governance system with no hard-coded governance logic.

Core Architecture Changes:
- Add structured memory system (memory.py) for tracking governance processes
- Add LLM tools (tools.py) for deterministic operations (math, dates, random)
- Add audit trail system (audit.py) for human-readable decision explanations
- Add LLM-driven agent (agent_refactored.py) that interprets constitution

Documentation:
- Add ARCHITECTURE.md describing process-centric design
- Add ARCHITECTURE_EXAMPLE.md with complete workflow walkthrough
- Update README.md to reflect current LLM-driven architecture
- Simplify constitution.md to benevolent dictator model for testing

Templates:
- Add 8 governance templates (petition, consensus, do-ocracy, jury, etc.)
- Add 8 dispute resolution templates
- All templates work with generic process-based architecture

Key Design Principles:
- "Process" is central abstraction (not "proposal")
- No hard-coded process types or thresholds
- LLM interprets constitution to understand governance rules
- Tools ensure correctness for calculations
- Complete auditability with reasoning and citations

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Nathan Schneider
2026-02-08 14:24:23 -07:00
parent 5fe22060e1
commit bda868cb45
26 changed files with 8683 additions and 187 deletions

View File

@@ -0,0 +1,561 @@
"""
Refactored Agentic Governance Bot.
This refactored agent relies on:
1. LLM interpretation (not hard-coded logic)
2. Structured memory (not just database records)
3. Tools for correctness (calculator, datetime, etc.)
4. Audit trails (human-readable explanations)
Key principle: Constitution defines ALL governance rules in natural language.
The agent interprets and executes based on constitution + memory + tools.
"""
import json
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from .governance.constitution import ConstitutionalReasoner
from .memory import GovernanceMemory, ProcessMemory, ProcessStatus, Event, Decision
from .tools import GovernanceTools, ToolRegistry
from .audit import AuditTrail, create_audit_trail
class AgenticGovernanceBot:
"""
Agentic governance bot that interprets constitution and uses memory + tools.
This bot does NOT have hard-coded governance logic. Instead:
- Reads constitution to understand rules
- Uses memory to track state
- Uses tools for calculations
- Makes decisions through LLM reasoning
"""
def __init__(
self,
db_session: Session,
constitution_path: str,
model: Optional[str] = None,
api_keys: Optional[Dict[str, str]] = None,
):
"""
Initialize the agentic governance bot.
Args:
db_session: Database session
constitution_path: Path to constitution file
model: LLM model to use (None for default)
api_keys: Dict with 'openai' and/or 'anthropic' API keys
"""
self.db = db_session
self.constitution = ConstitutionalReasoner(constitution_path, model, api_keys)
self.memory = GovernanceMemory(db_session)
self.tools = GovernanceTools()
self.model = model
self.api_keys = api_keys or {}
def process_request(
self,
request: str,
actor: str,
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Process a governance request using agentic interpretation.
Flow:
1. Understand what the user wants (intent parsing)
2. Query constitution for relevant rules
3. Query memory for current state
4. Reason about what to do (using LLM)
5. Execute actions using tools
6. Update memory with decision
7. Generate audit trail
Args:
request: Natural language request
actor: Who made the request
context: Optional context (thread ID, etc.)
Returns:
Response dictionary with action taken and audit trail
"""
# Step 1: Parse intent
intent_result = self._parse_intent_with_llm(request, actor)
if "error" in intent_result:
return {"response": intent_result["error"], "success": False}
# Step 2: Query constitution
constitutional_guidance = self.constitution.query(
question=intent_result["query"],
context=f"Actor: {actor}\nRequest: {request}"
)
# Step 3: Query memory for relevant state
memory_context = self._gather_memory_context(intent_result, actor)
# Step 4: Use LLM to decide what to do
decision_result = self._make_decision_with_llm(
intent=intent_result,
constitution=constitutional_guidance,
memory=memory_context,
actor=actor,
context=context
)
# Step 5: Execute the decision
execution_result = self._execute_decision(
decision_result,
actor,
context
)
# Step 6: Generate audit trail
if execution_result.get("process_id"):
process = self.memory.get_process(execution_result["process_id"])
if process and process.decisions:
audit = create_audit_trail(process, process.decisions[-1])
execution_result["audit_trail"] = audit
return execution_result
def _parse_intent_with_llm(self, request: str, actor: str) -> Dict[str, Any]:
"""
Use LLM to understand what the user wants to do.
Returns:
Dict with:
- intent_type: "create_proposal", "cast_vote", "query", etc.
- query: Question to ask constitution
- parameters: Extracted parameters
"""
prompt = f"""Analyze this governance request and determine the intent.
Request: {request}
Actor: {actor}
What is the user trying to do? Choose from:
- create_proposal: User wants to create a governance proposal
- cast_vote: User wants to vote on something
- query_status: User wants to know the status of something
- query_constitution: User wants to know governance rules
- dispute_initiation: User wants to start a dispute resolution process
- other: Something else
Also extract any relevant parameters (proposal text, vote type, process ID, etc.).
Return your analysis as JSON:
{{
"intent_type": "...",
"query": "question to ask the constitution",
"parameters": {{}},
"confidence": "high|medium|low"
}}
"""
try:
result = self.constitution._call_llm(prompt)
# Parse JSON from response
# (In production, would use proper JSON parsing from LLM response)
return json.loads(result.get("answer", "{}"))
except Exception as e:
return {"error": f"Failed to parse intent: {e}"}
def _gather_memory_context(
self,
intent: Dict[str, Any],
actor: str
) -> Dict[str, Any]:
"""
Gather relevant information from memory based on intent.
Returns:
Dict with memory context
"""
context = {}
# Get active processes
active_processes = self.memory.get_active_processes()
context["active_processes"] = [
self.memory.summarize_for_llm(p.id) for p in active_processes[:5]
]
# Get overdue processes
overdue = self.memory.get_overdue_processes()
if overdue:
context["overdue_processes"] = [p.id for p in overdue]
# If specific process mentioned, get its details
if "process_id" in intent.get("parameters", {}):
process_id = intent["parameters"]["process_id"]
process = self.memory.get_process(process_id)
if process:
context["target_process"] = self.memory.summarize_for_llm(process_id)
# Get recent precedent if relevant
if intent.get("intent_type") == "query_constitution":
recent_decisions = self.memory.search_decisions(limit=5)
context["recent_precedent"] = [
f"{d.decision_type}: {d.result}" for d in recent_decisions
]
return context
def _make_decision_with_llm(
self,
intent: Dict[str, Any],
constitution: Dict[str, Any],
memory: Dict[str, Any],
actor: str,
context: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Use LLM to decide what action to take.
This is where the agent interprets the constitution and decides
how to handle the request.
Returns:
Decision dict with:
- action: What to do
- reasoning: Why
- constitution_citations: Which articles apply
- calculations: Any math needed
- state_updates: Changes to memory
"""
prompt = f"""You are a governance bot interpreting a community constitution.
INTENT:
{json.dumps(intent, indent=2)}
CONSTITUTIONAL GUIDANCE:
{json.dumps(constitution, indent=2)}
CURRENT MEMORY STATE:
{json.dumps(memory, indent=2)}
ACTOR: {actor}
Based on the constitution and current state, decide what action to take.
For proposals:
- What type of proposal is this? (standard, urgent, constitutional, etc.)
- What discussion period does the constitution specify?
- What voting threshold is required?
- Are there any special requirements?
For votes:
- Is this person eligible to vote on this process?
- What vote types are allowed?
- Should this vote be recorded?
For deadline checks:
- Has the deadline passed?
- What votes were cast?
- What threshold does the constitution require?
- Does the proposal pass or fail?
Available tools for calculations:
- calculate(expression, variables): Evaluate math expressions
- get_datetime(): Get current time
- is_past_deadline(deadline): Check if deadline passed
- tally(votes, key): Count votes by type
- random_select(items, count): Random selection
Return your decision as JSON:
{{
"action": "create_process|record_vote|complete_process|query_response",
"reasoning": "explain your interpretation",
"constitution_citations": ["Article X, Section Y", ...],
"parameters": {{
// Action-specific parameters
"process_type": "...",
"deadline_days": X,
"threshold_expression": "agree > disagree",
// etc.
}},
"calculations": [
{{
"tool": "calculate",
"expression": "agree > disagree",
"variables": {{"agree": 10, "disagree": 3}}
}}
]
}}
"""
try:
result = self.constitution._call_llm(prompt)
decision = json.loads(result.get("answer", "{}"))
# Execute any calculations using tools
if "calculations" in decision:
for calc in decision["calculations"]:
tool_name = calc["tool"]
if tool_name == "calculate":
calc["result"] = self.tools.calculate(
calc["expression"],
calc["variables"]
)
# Handle other tools...
return decision
except Exception as e:
return {
"action": "error",
"reasoning": f"Failed to make decision: {e}",
"constitution_citations": []
}
def _execute_decision(
self,
decision: Dict[str, Any],
actor: str,
context: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Execute the decision made by the LLM.
This updates memory and performs platform actions.
"""
action = decision.get("action")
params = decision.get("parameters", {})
try:
if action == "create_process":
return self._create_process_from_decision(decision, actor, context)
elif action == "record_vote":
return self._record_vote_from_decision(decision, actor, context)
elif action == "complete_process":
return self._complete_process_from_decision(decision, actor)
elif action == "query_response":
return {
"response": decision.get("reasoning"),
"constitution_citations": decision.get("constitution_citations", []),
"success": True
}
else:
return {
"response": f"Unknown action: {action}",
"success": False
}
except Exception as e:
return {
"response": f"Error executing decision: {e}",
"success": False
}
def _create_process_from_decision(
self,
decision: Dict[str, Any],
actor: str,
context: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""Create a new governance process based on LLM decision"""
params = decision["parameters"]
# Calculate deadline
deadline_days = params.get("deadline_days", 7)
deadline = self.tools.datetime_add(
self.tools.get_datetime(),
days=deadline_days
)
# Generate process ID
process_id = f"process_{int(datetime.utcnow().timestamp())}"
# Create in memory
process = self.memory.create_process(
process_id=process_id,
process_type=params.get("process_type", "unknown"),
created_by=actor,
constitution_basis=decision.get("constitution_citations", []),
deadline=deadline,
initial_state=params.get("initial_state", {}),
metadata=context or {}
)
# Log creation event
self.memory.add_event(
process_id=process_id,
actor=actor,
event_type="process_created",
data=params,
context=f"Process created by {actor}"
)
# Log creation decision
self.memory.add_decision(
process_id=process_id,
decision_type="process_creation",
reasoning=decision["reasoning"],
constitution_citations=decision.get("constitution_citations", []),
result="created"
)
return {
"response": f"Created {params.get('process_type')} (ID: {process_id}). Deadline: {deadline.strftime('%Y-%m-%d %H:%M UTC')}",
"process_id": process_id,
"deadline": deadline.isoformat(),
"success": True
}
def _record_vote_from_decision(
self,
decision: Dict[str, Any],
actor: str,
context: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""Record a vote based on LLM decision"""
params = decision["parameters"]
process_id = params.get("process_id")
if not process_id:
return {"response": "No process ID specified", "success": False}
# Get process
process = self.memory.get_process(process_id)
if not process:
return {"response": f"Process {process_id} not found", "success": False}
# Record vote in state
votes = process.state.get("votes", {})
votes[actor] = {
"vote": params.get("vote_type"),
"timestamp": datetime.utcnow().isoformat()
}
self.memory.update_process(
process_id=process_id,
state_updates={"votes": votes}
)
# Log event
self.memory.add_event(
process_id=process_id,
actor=actor,
event_type="vote_cast",
data={"vote": params.get("vote_type")},
context=f"{actor} voted {params.get('vote_type')}"
)
return {
"response": f"Vote recorded: {params.get('vote_type')}",
"process_id": process_id,
"success": True
}
def _complete_process_from_decision(
self,
decision: Dict[str, Any],
actor: str
) -> Dict[str, Any]:
"""Complete a process based on LLM decision"""
params = decision["parameters"]
process_id = params.get("process_id")
process = self.memory.get_process(process_id)
if not process:
return {"response": f"Process {process_id} not found", "success": False}
# Update status
self.memory.update_process(
process_id=process_id,
status=ProcessStatus.COMPLETED
)
# Log completion decision
calculation_used = None
calculation_vars = None
calculation_result = None
if decision.get("calculations"):
calc = decision["calculations"][0] # Use first calculation
calculation_used = calc.get("expression")
calculation_vars = calc.get("variables")
calculation_result = calc.get("result")
self.memory.add_decision(
process_id=process_id,
decision_type="process_completion",
reasoning=decision["reasoning"],
constitution_citations=decision.get("constitution_citations", []),
result=params.get("outcome", "completed"),
calculation_used=calculation_used,
calculation_variables=calculation_vars,
calculation_result=calculation_result
)
# Log event
self.memory.add_event(
process_id=process_id,
actor="bot",
event_type="process_completed",
data={"outcome": params.get("outcome")},
context=f"Process completed with outcome: {params.get('outcome')}"
)
return {
"response": f"Process {process_id} completed: {params.get('outcome')}",
"process_id": process_id,
"outcome": params.get("outcome"),
"success": True
}
def check_deadlines(self) -> List[Dict[str, Any]]:
"""
Check for processes that have reached their deadline.
This is called periodically (e.g., every hour) to evaluate
processes that need decisions.
Returns:
List of completed process results
"""
results = []
overdue = self.memory.get_overdue_processes()
for process in overdue:
# Ask LLM to evaluate this process
evaluation = self._evaluate_process_deadline(process)
results.append(evaluation)
return results
def _evaluate_process_deadline(self, process: ProcessMemory) -> Dict[str, Any]:
"""
Use LLM to evaluate a process that reached its deadline.
"""
# Get vote tally
votes = process.state.get("votes", {})
vote_tally = self.tools.tally(
[{"vote": v["vote"]} for v in votes.values()],
"vote"
)
# Ask constitution what threshold is needed
threshold_query = self.constitution.query(
question=f"For a {process.type}, what voting threshold is required for passage?",
context=self.memory.summarize_for_llm(process.id)
)
# Use LLM to decide if threshold is met
decision_result = self._make_decision_with_llm(
intent={"intent_type": "deadline_check"},
constitution=threshold_query,
memory={"votes": vote_tally, "process": process.to_dict()},
actor="bot",
context=None
)
# Execute the decision
return self._execute_decision(decision_result, "bot", None)

386
src/govbot/audit.py Normal file
View File

@@ -0,0 +1,386 @@
"""
Audit Trail System for Governance Bot.
This module generates human-readable audit trails that explain:
- What decision was made
- Why it was made (reasoning)
- What constitutional rules apply
- What calculations were performed
- What precedent exists
Auditability is a core design goal - every governance action must be explainable
and inspectable by community members.
"""
from datetime import datetime
from typing import Dict, Any, List, Optional
from .memory import ProcessMemory, Decision, Event
class AuditTrail:
"""
Generate human-readable audit trails for governance decisions.
"""
@staticmethod
def format_decision(
decision: Decision,
process: ProcessMemory,
include_precedent: bool = True
) -> str:
"""
Format a decision as a human-readable audit trail.
Args:
decision: The decision to format
process: The process this decision belongs to
include_precedent: Whether to include precedent references
Returns:
Formatted audit trail as markdown
"""
lines = []
# Header
lines.append("# GOVERNANCE DECISION AUDIT TRAIL")
lines.append("")
lines.append(f"**Decision ID**: {process.id}_{len(process.decisions)}")
lines.append(f"**Timestamp**: {decision.timestamp.strftime('%Y-%m-%d %H:%M:%S UTC')}")
lines.append(f"**Decision Type**: {decision.decision_type}")
lines.append(f"**Result**: {decision.result}")
lines.append("")
# Process context
lines.append("## Process Context")
lines.append("")
lines.append(f"- **Process ID**: {process.id}")
lines.append(f"- **Process Type**: {process.type}")
lines.append(f"- **Created By**: {process.created_by}")
lines.append(f"- **Created At**: {process.created_at.strftime('%Y-%m-%d %H:%M:%S UTC')}")
if process.deadline:
lines.append(f"- **Deadline**: {process.deadline.strftime('%Y-%m-%d %H:%M:%S UTC')}")
lines.append("")
# Constitutional basis
lines.append("## Constitutional Basis")
lines.append("")
if decision.constitution_citations:
for citation in decision.constitution_citations:
lines.append(f"- {citation}")
else:
lines.append("- No specific citations (general interpretation)")
lines.append("")
# Reasoning
lines.append("## Reasoning")
lines.append("")
lines.append(decision.reasoning)
lines.append("")
# Calculation details (if any)
if decision.calculation_used:
lines.append("## Calculation Details")
lines.append("")
lines.append(f"**Expression**: `{decision.calculation_used}`")
lines.append("")
lines.append("**Variables**:")
if decision.calculation_variables:
for var, value in decision.calculation_variables.items():
lines.append(f"- `{var}` = {value}")
lines.append("")
lines.append(f"**Result**: `{decision.calculation_result}`")
lines.append("")
# Precedent (if requested and available)
if include_precedent and decision.precedent_references:
lines.append("## Related Precedent")
lines.append("")
for ref in decision.precedent_references:
lines.append(f"- {ref}")
lines.append("")
# Footer
lines.append("---")
lines.append("")
lines.append("*This audit trail was generated automatically by the governance bot.*")
lines.append("*All decisions can be reviewed and appealed according to the constitution.*")
return "\n".join(lines)
@staticmethod
def format_process_summary(process: ProcessMemory, include_events: bool = True) -> str:
"""
Format a process summary for human review.
Args:
process: Process to summarize
include_events: Whether to include event timeline
Returns:
Formatted summary as markdown
"""
lines = []
# Header
lines.append(f"# Process Summary: {process.id}")
lines.append("")
lines.append(f"**Type**: {process.type}")
lines.append(f"**Status**: {process.status.value}")
lines.append(f"**Created By**: {process.created_by}")
lines.append(f"**Created At**: {process.created_at.strftime('%Y-%m-%d %H:%M:%S UTC')}")
if process.deadline:
lines.append(f"**Deadline**: {process.deadline.strftime('%Y-%m-%d %H:%M:%S UTC')}")
lines.append("")
# Constitutional basis
lines.append("## Constitutional Basis")
lines.append("")
if process.constitution_basis:
for citation in process.constitution_basis:
lines.append(f"- {citation}")
else:
lines.append("- No specific citations recorded")
lines.append("")
# Current state
lines.append("## Current State")
lines.append("")
if process.state:
for key, value in process.state.items():
if isinstance(value, dict):
lines.append(f"- **{key}**: {len(value)} items")
elif isinstance(value, list):
lines.append(f"- **{key}**: {len(value)} items")
else:
lines.append(f"- **{key}**: {value}")
else:
lines.append("- No state data recorded")
lines.append("")
# Events timeline
if include_events and process.events:
lines.append("## Event Timeline")
lines.append("")
for event in process.events:
timestamp = event.timestamp.strftime('%Y-%m-%d %H:%M')
lines.append(f"**[{timestamp}]** {event.event_type}")
lines.append(f"- Actor: {event.actor}")
lines.append(f"- Context: {event.context}")
lines.append("")
# Decisions
if process.decisions:
lines.append("## Decisions")
lines.append("")
for i, decision in enumerate(process.decisions, 1):
timestamp = decision.timestamp.strftime('%Y-%m-%d %H:%M')
lines.append(f"**{i}. [{timestamp}]** {decision.decision_type}")
lines.append(f"- Result: {decision.result}")
lines.append(f"- Reasoning: {decision.reasoning[:100]}...") # Truncate
lines.append("")
return "\n".join(lines)
@staticmethod
def format_vote_tally(
process: ProcessMemory,
votes: Dict[str, Any],
threshold_expression: str,
threshold_result: bool
) -> str:
"""
Format a vote tally with threshold evaluation.
Args:
process: Process being voted on
votes: Vote data (tally of vote types)
threshold_expression: Expression used to check threshold
threshold_result: Whether threshold was met
Returns:
Formatted vote tally as markdown
"""
lines = []
lines.append(f"# Vote Tally: {process.id}")
lines.append("")
lines.append(f"**Process**: {process.type}")
lines.append(f"**Deadline**: {process.deadline.strftime('%Y-%m-%d %H:%M:%S UTC') if process.deadline else 'N/A'}")
lines.append("")
# Vote counts
lines.append("## Vote Counts")
lines.append("")
total = sum(v for v in votes.values() if isinstance(v, int))
for vote_type, count in sorted(votes.items()):
if isinstance(count, int):
percentage = (count / total * 100) if total > 0 else 0
lines.append(f"- **{vote_type.capitalize()}**: {count} ({percentage:.1f}%)")
lines.append(f"- **Total**: {total}")
lines.append("")
# Threshold evaluation
lines.append("## Threshold Evaluation")
lines.append("")
lines.append(f"**Expression**: `{threshold_expression}`")
lines.append(f"**Variables**: {votes}")
lines.append(f"**Result**: {'✅ THRESHOLD MET' if threshold_result else '❌ THRESHOLD NOT MET'}")
lines.append("")
# Outcome
lines.append("## Outcome")
lines.append("")
if threshold_result:
lines.append("**The proposal PASSES** based on the constitutional threshold.")
else:
lines.append("**The proposal FAILS** based on the constitutional threshold.")
lines.append("")
return "\n".join(lines)
@staticmethod
def generate_process_audit_file(process: ProcessMemory) -> str:
"""
Generate a complete audit file for a process.
This creates a comprehensive document that can be saved for permanent record.
Args:
process: Process to audit
Returns:
Complete audit document as markdown
"""
lines = []
# Title
lines.append(f"# Governance Process Audit: {process.id}")
lines.append("")
lines.append(f"**Generated**: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
lines.append("")
lines.append("---")
lines.append("")
# Process summary
lines.append(AuditTrail.format_process_summary(process, include_events=True))
lines.append("")
lines.append("---")
lines.append("")
# All decisions
if process.decisions:
lines.append("# Detailed Decision Log")
lines.append("")
for decision in process.decisions:
lines.append(AuditTrail.format_decision(decision, process, include_precedent=True))
lines.append("")
lines.append("---")
lines.append("")
# Metadata
if process.metadata:
lines.append("# Metadata")
lines.append("")
for key, value in process.metadata.items():
lines.append(f"- **{key}**: {value}")
lines.append("")
# Footer
lines.append("---")
lines.append("")
lines.append("## Audit Trail Integrity")
lines.append("")
lines.append("This audit trail represents the complete record of this governance process.")
lines.append("All decisions were made according to the community constitution and are")
lines.append("subject to review and appeal by community members.")
lines.append("")
lines.append("For questions or appeals, please refer to the constitution's dispute")
lines.append("resolution procedures.")
return "\n".join(lines)
class AuditQuery:
"""
Query and analyze audit trails.
"""
@staticmethod
def find_similar_decisions(
decision: Decision,
all_decisions: List[Decision],
similarity_threshold: int = 1
) -> List[Decision]:
"""
Find similar past decisions (for precedent).
Args:
decision: Decision to find matches for
all_decisions: All past decisions to search
similarity_threshold: Minimum number of matching citations
Returns:
List of similar decisions
"""
similar = []
for past_decision in all_decisions:
# Count matching citations
matching_citations = sum(
1 for citation in decision.constitution_citations
if citation in past_decision.constitution_citations
)
if matching_citations >= similarity_threshold:
similar.append(past_decision)
return similar
@staticmethod
def generate_precedent_summary(decisions: List[Decision]) -> str:
"""
Generate a summary of precedent decisions.
Args:
decisions: List of decisions to summarize
Returns:
Summary as markdown
"""
lines = []
lines.append("# Precedent Summary")
lines.append("")
lines.append(f"**Found {len(decisions)} relevant past decisions**")
lines.append("")
for i, decision in enumerate(decisions, 1):
lines.append(f"## {i}. {decision.decision_type} ({decision.timestamp.strftime('%Y-%m-%d')})")
lines.append("")
lines.append(f"**Result**: {decision.result}")
lines.append(f"**Constitutional Basis**: {', '.join(decision.constitution_citations)}")
lines.append(f"**Reasoning**: {decision.reasoning[:200]}...") # Truncated
lines.append("")
return "\n".join(lines)
def create_audit_trail(
process: ProcessMemory,
decision: Decision,
include_precedent: bool = True
) -> str:
"""
Convenience function to create an audit trail.
Args:
process: Process the decision belongs to
decision: Decision to audit
include_precedent: Whether to include precedent
Returns:
Formatted audit trail
"""
return AuditTrail.format_decision(decision, process, include_precedent)

476
src/govbot/memory.py Normal file
View File

@@ -0,0 +1,476 @@
"""
Structured Memory System for Agentic Governance.
This module provides a memory system that the LLM can query and update to track
governance state. The memory is structured, queryable, and human-readable for
auditability.
Key Design Goals:
1. LLM can query memory to understand current state
2. LLM can update memory with decisions and events
3. Memory is human-readable for audit
4. Memory preserves history and precedent
"""
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Dict, Any, List, Optional
from enum import Enum
import json
class ProcessStatus(Enum):
"""Status of a governance process"""
ACTIVE = "active"
COMPLETED = "completed"
CANCELLED = "cancelled"
SUSPENDED = "suspended"
@dataclass
class Event:
"""An event that occurred in a governance process"""
timestamp: datetime
actor: str # Who caused this event
event_type: str # "vote_cast", "proposal_submitted", "concern_raised", etc.
data: Dict[str, Any] # Event-specific data
context: str # Human-readable description
def to_dict(self) -> Dict[str, Any]:
return {
"timestamp": self.timestamp.isoformat(),
"actor": self.actor,
"event_type": self.event_type,
"data": self.data,
"context": self.context
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Event':
data = data.copy()
data['timestamp'] = datetime.fromisoformat(data['timestamp'])
return cls(**data)
@dataclass
class Decision:
"""A decision made by the governance bot"""
timestamp: datetime
decision_type: str # "threshold_evaluation", "deadline_reached", etc.
reasoning: str # Natural language explanation
constitution_citations: List[str] # References to constitution sections
calculation_used: Optional[str] = None # Math expression if tool was used
calculation_variables: Optional[Dict[str, Any]] = None
calculation_result: Optional[Any] = None
result: Any = None # Decision outcome
precedent_references: List[str] = field(default_factory=list) # Related past decisions
def to_dict(self) -> Dict[str, Any]:
return {
"timestamp": self.timestamp.isoformat(),
"decision_type": self.decision_type,
"reasoning": self.reasoning,
"constitution_citations": self.constitution_citations,
"calculation_used": self.calculation_used,
"calculation_variables": self.calculation_variables,
"calculation_result": self.calculation_result,
"result": self.result,
"precedent_references": self.precedent_references
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Decision':
data = data.copy()
data['timestamp'] = datetime.fromisoformat(data['timestamp'])
if 'precedent_references' not in data:
data['precedent_references'] = []
return cls(**data)
@dataclass
class ProcessMemory:
"""Memory of a governance process (proposal, dispute, election, etc.)"""
id: str # Unique identifier
type: str # "proposal", "dispute_resolution", "election", etc.
status: ProcessStatus
created_at: datetime
created_by: str
deadline: Optional[datetime] = None
constitution_basis: List[str] = field(default_factory=list) # Article/section citations
state: Dict[str, Any] = field(default_factory=dict) # Flexible process-specific state
events: List[Event] = field(default_factory=list) # Timeline of events
decisions: List[Decision] = field(default_factory=list) # Bot decisions
metadata: Dict[str, Any] = field(default_factory=dict) # Platform-specific data (thread IDs, etc.)
def add_event(self, event: Event):
"""Add an event to the process timeline"""
self.events.append(event)
def add_decision(self, decision: Decision):
"""Add a decision to the process"""
self.decisions.append(decision)
def update_state(self, updates: Dict[str, Any]):
"""Update process state"""
self.state.update(updates)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"type": self.type,
"status": self.status.value,
"created_at": self.created_at.isoformat(),
"created_by": self.created_by,
"deadline": self.deadline.isoformat() if self.deadline else None,
"constitution_basis": self.constitution_basis,
"state": self.state,
"events": [e.to_dict() for e in self.events],
"decisions": [d.to_dict() for d in self.decisions],
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ProcessMemory':
data = data.copy()
data['status'] = ProcessStatus(data['status'])
data['created_at'] = datetime.fromisoformat(data['created_at'])
if data.get('deadline'):
data['deadline'] = datetime.fromisoformat(data['deadline'])
data['events'] = [Event.from_dict(e) for e in data.get('events', [])]
data['decisions'] = [Decision.from_dict(d) for d in data.get('decisions', [])]
return cls(**data)
class GovernanceMemory:
"""
Memory system for tracking governance state.
This provides a queryable interface for the LLM to understand current state
and update it with new events and decisions.
"""
def __init__(self, db_session):
"""Initialize with database session for persistence"""
self.db = db_session
self._cache: Dict[str, ProcessMemory] = {}
# Query operations
def get_process(self, process_id: str) -> Optional[ProcessMemory]:
"""Retrieve a specific process by ID"""
if process_id in self._cache:
return self._cache[process_id]
# Load from database
from .db import queries
db_process = queries.get_process(self.db, int(process_id.split('_')[-1]))
if db_process:
memory = self._db_to_memory(db_process)
self._cache[process_id] = memory
return memory
return None
def query_processes(
self,
status: Optional[ProcessStatus] = None,
process_type: Optional[str] = None,
created_by: Optional[str] = None,
has_deadline: Optional[bool] = None,
deadline_before: Optional[datetime] = None,
limit: int = 50
) -> List[ProcessMemory]:
"""
Query processes by criteria.
This is the primary way the LLM discovers relevant processes.
"""
from .db import queries
# Build query
db_processes = queries.get_active_processes(self.db)
# Filter in memory (could optimize with SQL)
results = []
for db_proc in db_processes:
memory = self._db_to_memory(db_proc)
# Apply filters
if status and memory.status != status:
continue
if process_type and memory.type != process_type:
continue
if created_by and memory.created_by != created_by:
continue
if has_deadline is not None:
if has_deadline and not memory.deadline:
continue
if not has_deadline and memory.deadline:
continue
if deadline_before and memory.deadline:
if memory.deadline >= deadline_before:
continue
results.append(memory)
self._cache[memory.id] = memory
if len(results) >= limit:
break
return results
def get_active_processes(self) -> List[ProcessMemory]:
"""Get all active processes (convenience method)"""
return self.query_processes(status=ProcessStatus.ACTIVE)
def get_overdue_processes(self) -> List[ProcessMemory]:
"""Get processes past their deadline"""
return self.query_processes(
status=ProcessStatus.ACTIVE,
deadline_before=datetime.utcnow()
)
def search_decisions(
self,
decision_type: Optional[str] = None,
has_citation: Optional[str] = None,
limit: int = 20
) -> List[Decision]:
"""
Search past decisions (for precedent).
This helps the LLM find similar past decisions.
"""
all_decisions = []
# Get completed processes
completed = self.query_processes(status=ProcessStatus.COMPLETED, limit=100)
for proc in completed:
for decision in proc.decisions:
# Apply filters
if decision_type and decision.decision_type != decision_type:
continue
if has_citation and has_citation not in decision.constitution_citations:
continue
all_decisions.append(decision)
if len(all_decisions) >= limit:
return all_decisions
return all_decisions
# Update operations
def create_process(
self,
process_id: str,
process_type: str,
created_by: str,
constitution_basis: List[str],
deadline: Optional[datetime] = None,
initial_state: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> ProcessMemory:
"""Create a new process in memory"""
memory = ProcessMemory(
id=process_id,
type=process_type,
status=ProcessStatus.ACTIVE,
created_at=datetime.utcnow(),
created_by=created_by,
deadline=deadline,
constitution_basis=constitution_basis,
state=initial_state or {},
metadata=metadata or {}
)
self._cache[process_id] = memory
self._persist_memory(memory)
return memory
def update_process(
self,
process_id: str,
status: Optional[ProcessStatus] = None,
state_updates: Optional[Dict[str, Any]] = None,
new_deadline: Optional[datetime] = None
) -> ProcessMemory:
"""Update an existing process"""
memory = self.get_process(process_id)
if not memory:
raise ValueError(f"Process {process_id} not found")
if status:
memory.status = status
if state_updates:
memory.update_state(state_updates)
if new_deadline:
memory.deadline = new_deadline
self._persist_memory(memory)
return memory
def add_event(
self,
process_id: str,
actor: str,
event_type: str,
data: Dict[str, Any],
context: str
) -> Event:
"""Add an event to a process"""
memory = self.get_process(process_id)
if not memory:
raise ValueError(f"Process {process_id} not found")
event = Event(
timestamp=datetime.utcnow(),
actor=actor,
event_type=event_type,
data=data,
context=context
)
memory.add_event(event)
self._persist_memory(memory)
return event
def add_decision(
self,
process_id: str,
decision_type: str,
reasoning: str,
constitution_citations: List[str],
result: Any,
calculation_used: Optional[str] = None,
calculation_variables: Optional[Dict[str, Any]] = None,
calculation_result: Optional[Any] = None,
precedent_references: Optional[List[str]] = None
) -> Decision:
"""Add a decision to a process"""
memory = self.get_process(process_id)
if not memory:
raise ValueError(f"Process {process_id} not found")
decision = Decision(
timestamp=datetime.utcnow(),
decision_type=decision_type,
reasoning=reasoning,
constitution_citations=constitution_citations,
calculation_used=calculation_used,
calculation_variables=calculation_variables,
calculation_result=calculation_result,
result=result,
precedent_references=precedent_references or []
)
memory.add_decision(decision)
self._persist_memory(memory)
return decision
# Persistence
def _persist_memory(self, memory: ProcessMemory):
"""Save memory to database"""
from .db import queries
# Convert process_id to int for database
db_id = int(memory.id.split('_')[-1]) if '_' in memory.id else int(memory.id)
# Check if exists
db_process = queries.get_process(self.db, db_id)
if db_process:
# Update existing
queries.update_process_state(
self.db,
db_id,
{
"status": memory.status.value,
"state": memory.state,
"events": [e.to_dict() for e in memory.events],
"decisions": [d.to_dict() for d in memory.decisions]
}
)
else:
# Create new
queries.create_process(
session=self.db,
process_type=memory.type,
creator=memory.created_by,
constitutional_basis=", ".join(memory.constitution_basis),
deadline=memory.deadline or datetime.utcnow(),
state_data=memory.state
)
def _db_to_memory(self, db_process) -> ProcessMemory:
"""Convert database process to memory format"""
# Extract events and decisions from state_data if present
state_data = db_process.state_data or {}
events = [Event.from_dict(e) for e in state_data.get('events', [])]
decisions = [Decision.from_dict(d) for d in state_data.get('decisions', [])]
return ProcessMemory(
id=f"process_{db_process.id}",
type=db_process.process_type,
status=ProcessStatus(db_process.status),
created_at=db_process.created_at,
created_by=db_process.creator,
deadline=db_process.deadline,
constitution_basis=[db_process.constitutional_basis] if db_process.constitutional_basis else [],
state=state_data.get('state', state_data), # Migrate format
events=events,
decisions=decisions,
metadata={"mastodon_thread_id": db_process.mastodon_thread_id} if db_process.mastodon_thread_id else {}
)
# Utility
def summarize_for_llm(self, process_id: str) -> str:
"""
Create a human-readable summary of a process for LLM context.
This helps the LLM understand process state without overwhelming context.
"""
memory = self.get_process(process_id)
if not memory:
return f"Process {process_id} not found."
summary = f"""
Process: {memory.id}
Type: {memory.type}
Status: {memory.status.value}
Created: {memory.created_at.strftime('%Y-%m-%d %H:%M UTC')} by {memory.created_by}
Constitutional Basis: {', '.join(memory.constitution_basis)}
"""
if memory.deadline:
summary += f"Deadline: {memory.deadline.strftime('%Y-%m-%d %H:%M UTC')}\n"
# Add state summary
summary += f"\nCurrent State:\n"
for key, value in memory.state.items():
if isinstance(value, dict):
summary += f" {key}: {len(value)} items\n"
elif isinstance(value, list):
summary += f" {key}: {len(value)} items\n"
else:
summary += f" {key}: {value}\n"
# Add recent events
if memory.events:
summary += f"\nRecent Events ({len(memory.events)} total):\n"
for event in memory.events[-5:]: # Last 5 events
summary += f" [{event.timestamp.strftime('%m-%d %H:%M')}] {event.context}\n"
# Add decisions
if memory.decisions:
summary += f"\nDecisions ({len(memory.decisions)} total):\n"
for decision in memory.decisions:
summary += f" [{decision.timestamp.strftime('%m-%d %H:%M')}] {decision.decision_type}: {decision.result}\n"
return summary

424
src/govbot/tools.py Normal file
View File

@@ -0,0 +1,424 @@
"""
Tools for LLM to use in governance decisions.
These tools provide deterministic operations that the LLM can call to:
- Perform calculations correctly
- Work with dates and times
- Generate random selections
- Query structured data
Design principle: LLM reasons about WHAT to do, tools provide HOW to do it correctly.
"""
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
import random
import re
class GovernanceTools:
"""
Tools that the LLM can use for governance operations.
Each tool is designed to be:
1. Deterministic (same input → same output)
2. Verifiable (result can be checked by humans)
3. Safe (no arbitrary code execution)
"""
@staticmethod
def calculate(expression: str, variables: Dict[str, Any]) -> Any:
"""
Safely evaluate a mathematical expression.
Args:
expression: Math expression (e.g., "agree > disagree * 3")
variables: Variable values (e.g., {"agree": 10, "disagree": 3})
Returns:
Result of calculation
Examples:
>>> calculate("agree > disagree", {"agree": 10, "disagree": 3})
True
>>> calculate("agree >= disagree * 3", {"agree": 9, "disagree": 3})
True
>>> calculate("agree / (agree + disagree)", {"agree": 10, "disagree": 5})
0.6666666666666666
"""
# Safe evaluation using eval with restricted globals
allowed_builtins = {
"abs": abs,
"max": max,
"min": min,
"sum": sum,
"len": len,
"int": int,
"float": float,
"round": round,
}
# Combine allowed builtins with variables
namespace = {"__builtins__": {}}
namespace.update(allowed_builtins)
namespace.update(variables)
try:
result = eval(expression, namespace)
return result
except Exception as e:
raise ValueError(f"Invalid expression '{expression}': {e}")
@staticmethod
def validate_expression(expression: str) -> bool:
"""
Check if an expression is safe to evaluate.
Args:
expression: Expression to validate
Returns:
True if safe, False otherwise
"""
# Check for dangerous patterns
dangerous_patterns = [
r'__', # Dunder attributes
r'import', # Import statements
r'eval', # Nested eval
r'exec', # Execution
r'open', # File operations
r'compile', # Code compilation
]
expression_lower = expression.lower()
for pattern in dangerous_patterns:
if re.search(pattern, expression_lower):
return False
return True
@staticmethod
def get_datetime() -> datetime:
"""Get current UTC datetime"""
return datetime.utcnow()
@staticmethod
def datetime_add(dt: datetime, days: int = 0, hours: int = 0, minutes: int = 0) -> datetime:
"""
Add time to a datetime.
Args:
dt: Starting datetime
days: Days to add
hours: Hours to add
minutes: Minutes to add
Returns:
New datetime
"""
return dt + timedelta(days=days, hours=hours, minutes=minutes)
@staticmethod
def datetime_diff(dt1: datetime, dt2: datetime) -> Dict[str, float]:
"""
Calculate difference between two datetimes.
Args:
dt1: First datetime
dt2: Second datetime
Returns:
Dictionary with 'days', 'hours', 'minutes', 'seconds'
"""
diff = dt1 - dt2
total_seconds = diff.total_seconds()
return {
"days": diff.days,
"hours": total_seconds / 3600,
"minutes": total_seconds / 60,
"seconds": total_seconds,
}
@staticmethod
def is_past_deadline(deadline: datetime, current_time: Optional[datetime] = None) -> bool:
"""
Check if a deadline has passed.
Args:
deadline: Deadline to check
current_time: Current time (defaults to now)
Returns:
True if deadline has passed
"""
if current_time is None:
current_time = datetime.utcnow()
return current_time >= deadline
@staticmethod
def random_select(items: List[Any], count: int, seed: Optional[int] = None) -> List[Any]:
"""
Randomly select items from a list (for jury selection, etc.).
Args:
items: List of items to select from
count: Number of items to select
seed: Random seed for reproducibility (optional)
Returns:
List of randomly selected items
Raises:
ValueError: If count > len(items)
"""
if count > len(items):
raise ValueError(f"Cannot select {count} items from list of {len(items)}")
if seed is not None:
random.seed(seed)
return random.sample(items, count)
@staticmethod
def count_occurrences(items: List[Any], target: Any) -> int:
"""
Count how many times a value appears in a list.
Args:
items: List to search
target: Value to count
Returns:
Count of occurrences
"""
return items.count(target)
@staticmethod
def group_by(items: List[Dict[str, Any]], key: str) -> Dict[Any, List[Dict[str, Any]]]:
"""
Group items by a key value.
Args:
items: List of dictionaries
key: Key to group by
Returns:
Dictionary mapping key values to lists of items
Example:
>>> votes = [
... {"user": "alice", "vote": "agree"},
... {"user": "bob", "vote": "disagree"},
... {"user": "carol", "vote": "agree"}
... ]
>>> group_by(votes, "vote")
{
"agree": [{"user": "alice", "vote": "agree"}, {"user": "carol", "vote": "agree"}],
"disagree": [{"user": "bob", "vote": "disagree"}]
}
"""
groups: Dict[Any, List[Dict[str, Any]]] = {}
for item in items:
group_key = item.get(key)
if group_key not in groups:
groups[group_key] = []
groups[group_key].append(item)
return groups
@staticmethod
def tally(items: List[Dict[str, Any]], value_key: str) -> Dict[str, int]:
"""
Count occurrences of values.
Args:
items: List of items
value_key: Key to tally
Returns:
Dictionary mapping values to counts
Example:
>>> votes = [
... {"user": "alice", "vote": "agree"},
... {"user": "bob", "vote": "disagree"},
... {"user": "carol", "vote": "agree"}
... ]
>>> tally(votes, "vote")
{"agree": 2, "disagree": 1}
"""
counts: Dict[str, int] = {}
for item in items:
value = item.get(value_key)
if value is not None:
counts[value] = counts.get(value, 0) + 1
return counts
@staticmethod
def filter_items(
items: List[Dict[str, Any]],
criteria: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Filter items by criteria.
Args:
items: List of items to filter
criteria: Dictionary of key-value pairs to match
Returns:
Filtered list
Example:
>>> users = [
... {"name": "alice", "role": "member"},
... {"name": "bob", "role": "admin"},
... {"name": "carol", "role": "member"}
... ]
>>> filter_items(users, {"role": "member"})
[{"name": "alice", "role": "member"}, {"name": "carol", "role": "member"}]
"""
return [
item for item in items
if all(item.get(k) == v for k, v in criteria.items())
]
@staticmethod
def percentage(numerator: float, denominator: float, decimals: int = 2) -> float:
"""
Calculate percentage.
Args:
numerator: Top number
denominator: Bottom number
decimals: Decimal places to round to
Returns:
Percentage value
Raises:
ValueError: If denominator is zero
"""
if denominator == 0:
raise ValueError("Cannot divide by zero")
return round((numerator / denominator) * 100, decimals)
class ToolRegistry:
"""
Registry of available tools for the LLM.
This provides metadata about tools for LLM function calling.
"""
@staticmethod
def get_tool_definitions() -> List[Dict[str, Any]]:
"""
Get tool definitions in a format suitable for LLM function calling.
Returns:
List of tool definitions
"""
return [
{
"name": "calculate",
"description": "Safely evaluate a mathematical expression with variables",
"parameters": {
"expression": "string: Math expression (e.g., 'agree > disagree * 3')",
"variables": "dict: Variable values (e.g., {'agree': 10, 'disagree': 3})"
},
"returns": "boolean or number: Result of calculation"
},
{
"name": "get_datetime",
"description": "Get the current UTC datetime",
"parameters": {},
"returns": "datetime: Current UTC time"
},
{
"name": "datetime_add",
"description": "Add days/hours/minutes to a datetime",
"parameters": {
"dt": "datetime: Starting datetime",
"days": "int: Days to add (optional, default 0)",
"hours": "int: Hours to add (optional, default 0)",
"minutes": "int: Minutes to add (optional, default 0)"
},
"returns": "datetime: New datetime"
},
{
"name": "is_past_deadline",
"description": "Check if a deadline has passed",
"parameters": {
"deadline": "datetime: Deadline to check",
"current_time": "datetime: Current time (optional, defaults to now)"
},
"returns": "boolean: True if deadline has passed"
},
{
"name": "random_select",
"description": "Randomly select items from a list (for jury selection, sortition)",
"parameters": {
"items": "list: Items to select from",
"count": "int: Number of items to select",
"seed": "int: Random seed for reproducibility (optional)"
},
"returns": "list: Randomly selected items"
},
{
"name": "tally",
"description": "Count occurrences of values in a list of dictionaries",
"parameters": {
"items": "list[dict]: Items to tally",
"value_key": "string: Key to tally by"
},
"returns": "dict: Mapping of values to counts"
},
{
"name": "filter_items",
"description": "Filter a list of dictionaries by criteria",
"parameters": {
"items": "list[dict]: Items to filter",
"criteria": "dict: Key-value pairs to match"
},
"returns": "list[dict]: Filtered items"
},
{
"name": "percentage",
"description": "Calculate a percentage",
"parameters": {
"numerator": "number: Top number",
"denominator": "number: Bottom number",
"decimals": "int: Decimal places (optional, default 2)"
},
"returns": "number: Percentage value"
}
]
@staticmethod
def execute_tool(tool_name: str, **kwargs) -> Any:
"""
Execute a tool by name.
Args:
tool_name: Name of tool to execute
**kwargs: Tool parameters
Returns:
Tool result
Raises:
ValueError: If tool not found
"""
tools = GovernanceTools()
if not hasattr(tools, tool_name):
raise ValueError(f"Tool '{tool_name}' not found")
tool_method = getattr(tools, tool_name)
return tool_method(**kwargs)