""" Audit Trail System for Governance Bot. This module generates human-readable audit trails that explain: - What decision was made - Why it was made (reasoning) - What constitutional rules apply - What calculations were performed - What precedent exists Auditability is a core design goal - every governance action must be explainable and inspectable by community members. """ from datetime import datetime from typing import Dict, Any, List, Optional from .memory import ProcessMemory, Decision, Event class AuditTrail: """ Generate human-readable audit trails for governance decisions. """ @staticmethod def format_decision( decision: Decision, process: ProcessMemory, include_precedent: bool = True ) -> str: """ Format a decision as a human-readable audit trail. Args: decision: The decision to format process: The process this decision belongs to include_precedent: Whether to include precedent references Returns: Formatted audit trail as markdown """ lines = [] # Header lines.append("# GOVERNANCE DECISION AUDIT TRAIL") lines.append("") lines.append(f"**Decision ID**: {process.id}_{len(process.decisions)}") lines.append(f"**Timestamp**: {decision.timestamp.strftime('%Y-%m-%d %H:%M:%S UTC')}") lines.append(f"**Decision Type**: {decision.decision_type}") lines.append(f"**Result**: {decision.result}") lines.append("") # Process context lines.append("## Process Context") lines.append("") lines.append(f"- **Process ID**: {process.id}") lines.append(f"- **Process Type**: {process.type}") lines.append(f"- **Created By**: {process.created_by}") lines.append(f"- **Created At**: {process.created_at.strftime('%Y-%m-%d %H:%M:%S UTC')}") if process.deadline: lines.append(f"- **Deadline**: {process.deadline.strftime('%Y-%m-%d %H:%M:%S UTC')}") lines.append("") # Constitutional basis lines.append("## Constitutional Basis") lines.append("") if decision.constitution_citations: for citation in decision.constitution_citations: lines.append(f"- {citation}") else: lines.append("- No specific citations (general interpretation)") lines.append("") # Reasoning lines.append("## Reasoning") lines.append("") lines.append(decision.reasoning) lines.append("") # Calculation details (if any) if decision.calculation_used: lines.append("## Calculation Details") lines.append("") lines.append(f"**Expression**: `{decision.calculation_used}`") lines.append("") lines.append("**Variables**:") if decision.calculation_variables: for var, value in decision.calculation_variables.items(): lines.append(f"- `{var}` = {value}") lines.append("") lines.append(f"**Result**: `{decision.calculation_result}`") lines.append("") # Precedent (if requested and available) if include_precedent and decision.precedent_references: lines.append("## Related Precedent") lines.append("") for ref in decision.precedent_references: lines.append(f"- {ref}") lines.append("") # Footer lines.append("---") lines.append("") lines.append("*This audit trail was generated automatically by the governance bot.*") lines.append("*All decisions can be reviewed and appealed according to the constitution.*") return "\n".join(lines) @staticmethod def format_process_summary(process: ProcessMemory, include_events: bool = True) -> str: """ Format a process summary for human review. Args: process: Process to summarize include_events: Whether to include event timeline Returns: Formatted summary as markdown """ lines = [] # Header lines.append(f"# Process Summary: {process.id}") lines.append("") lines.append(f"**Type**: {process.type}") lines.append(f"**Status**: {process.status.value}") lines.append(f"**Created By**: {process.created_by}") lines.append(f"**Created At**: {process.created_at.strftime('%Y-%m-%d %H:%M:%S UTC')}") if process.deadline: lines.append(f"**Deadline**: {process.deadline.strftime('%Y-%m-%d %H:%M:%S UTC')}") lines.append("") # Constitutional basis lines.append("## Constitutional Basis") lines.append("") if process.constitution_basis: for citation in process.constitution_basis: lines.append(f"- {citation}") else: lines.append("- No specific citations recorded") lines.append("") # Current state lines.append("## Current State") lines.append("") if process.state: for key, value in process.state.items(): if isinstance(value, dict): lines.append(f"- **{key}**: {len(value)} items") elif isinstance(value, list): lines.append(f"- **{key}**: {len(value)} items") else: lines.append(f"- **{key}**: {value}") else: lines.append("- No state data recorded") lines.append("") # Events timeline if include_events and process.events: lines.append("## Event Timeline") lines.append("") for event in process.events: timestamp = event.timestamp.strftime('%Y-%m-%d %H:%M') lines.append(f"**[{timestamp}]** {event.event_type}") lines.append(f"- Actor: {event.actor}") lines.append(f"- Context: {event.context}") lines.append("") # Decisions if process.decisions: lines.append("## Decisions") lines.append("") for i, decision in enumerate(process.decisions, 1): timestamp = decision.timestamp.strftime('%Y-%m-%d %H:%M') lines.append(f"**{i}. [{timestamp}]** {decision.decision_type}") lines.append(f"- Result: {decision.result}") lines.append(f"- Reasoning: {decision.reasoning[:100]}...") # Truncate lines.append("") return "\n".join(lines) @staticmethod def format_vote_tally( process: ProcessMemory, votes: Dict[str, Any], threshold_expression: str, threshold_result: bool ) -> str: """ Format a vote tally with threshold evaluation. Args: process: Process being voted on votes: Vote data (tally of vote types) threshold_expression: Expression used to check threshold threshold_result: Whether threshold was met Returns: Formatted vote tally as markdown """ lines = [] lines.append(f"# Vote Tally: {process.id}") lines.append("") lines.append(f"**Process**: {process.type}") lines.append(f"**Deadline**: {process.deadline.strftime('%Y-%m-%d %H:%M:%S UTC') if process.deadline else 'N/A'}") lines.append("") # Vote counts lines.append("## Vote Counts") lines.append("") total = sum(v for v in votes.values() if isinstance(v, int)) for vote_type, count in sorted(votes.items()): if isinstance(count, int): percentage = (count / total * 100) if total > 0 else 0 lines.append(f"- **{vote_type.capitalize()}**: {count} ({percentage:.1f}%)") lines.append(f"- **Total**: {total}") lines.append("") # Threshold evaluation lines.append("## Threshold Evaluation") lines.append("") lines.append(f"**Expression**: `{threshold_expression}`") lines.append(f"**Variables**: {votes}") lines.append(f"**Result**: {'✅ THRESHOLD MET' if threshold_result else '❌ THRESHOLD NOT MET'}") lines.append("") # Outcome lines.append("## Outcome") lines.append("") if threshold_result: lines.append("**The proposal PASSES** based on the constitutional threshold.") else: lines.append("**The proposal FAILS** based on the constitutional threshold.") lines.append("") return "\n".join(lines) @staticmethod def generate_process_audit_file(process: ProcessMemory) -> str: """ Generate a complete audit file for a process. This creates a comprehensive document that can be saved for permanent record. Args: process: Process to audit Returns: Complete audit document as markdown """ lines = [] # Title lines.append(f"# Governance Process Audit: {process.id}") lines.append("") lines.append(f"**Generated**: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}") lines.append("") lines.append("---") lines.append("") # Process summary lines.append(AuditTrail.format_process_summary(process, include_events=True)) lines.append("") lines.append("---") lines.append("") # All decisions if process.decisions: lines.append("# Detailed Decision Log") lines.append("") for decision in process.decisions: lines.append(AuditTrail.format_decision(decision, process, include_precedent=True)) lines.append("") lines.append("---") lines.append("") # Metadata if process.metadata: lines.append("# Metadata") lines.append("") for key, value in process.metadata.items(): lines.append(f"- **{key}**: {value}") lines.append("") # Footer lines.append("---") lines.append("") lines.append("## Audit Trail Integrity") lines.append("") lines.append("This audit trail represents the complete record of this governance process.") lines.append("All decisions were made according to the community constitution and are") lines.append("subject to review and appeal by community members.") lines.append("") lines.append("For questions or appeals, please refer to the constitution's dispute") lines.append("resolution procedures.") return "\n".join(lines) class AuditQuery: """ Query and analyze audit trails. """ @staticmethod def find_similar_decisions( decision: Decision, all_decisions: List[Decision], similarity_threshold: int = 1 ) -> List[Decision]: """ Find similar past decisions (for precedent). Args: decision: Decision to find matches for all_decisions: All past decisions to search similarity_threshold: Minimum number of matching citations Returns: List of similar decisions """ similar = [] for past_decision in all_decisions: # Count matching citations matching_citations = sum( 1 for citation in decision.constitution_citations if citation in past_decision.constitution_citations ) if matching_citations >= similarity_threshold: similar.append(past_decision) return similar @staticmethod def generate_precedent_summary(decisions: List[Decision]) -> str: """ Generate a summary of precedent decisions. Args: decisions: List of decisions to summarize Returns: Summary as markdown """ lines = [] lines.append("# Precedent Summary") lines.append("") lines.append(f"**Found {len(decisions)} relevant past decisions**") lines.append("") for i, decision in enumerate(decisions, 1): lines.append(f"## {i}. {decision.decision_type} ({decision.timestamp.strftime('%Y-%m-%d')})") lines.append("") lines.append(f"**Result**: {decision.result}") lines.append(f"**Constitutional Basis**: {', '.join(decision.constitution_citations)}") lines.append(f"**Reasoning**: {decision.reasoning[:200]}...") # Truncated lines.append("") return "\n".join(lines) def create_audit_trail( process: ProcessMemory, decision: Decision, include_precedent: bool = True ) -> str: """ Convenience function to create an audit trail. Args: process: Process the decision belongs to decision: Decision to audit include_precedent: Whether to include precedent Returns: Formatted audit trail """ return AuditTrail.format_decision(decision, process, include_precedent)