Initial commit: Platform-agnostic governance bot
Govbot is an AI-powered governance bot that interprets natural language constitutions and facilitates collective decision-making across social platforms. Core features: - Agentic architecture with constitutional reasoning (RAG) - Platform-agnostic design (Mastodon, Discord, Telegram, etc.) - Action primitives for flexible governance processes - Temporal awareness for multi-day proposals and voting - Audit trail with constitutional citations - Reversible actions with supermajority veto - Works with local (Ollama) and cloud AI models Platform support: - Mastodon: Full implementation with streaming, moderation, and admin skills - Discord/Telegram: Platform abstraction ready for implementation Documentation: - README.md: Architecture and overview - QUICKSTART.md: Getting started guide - PLATFORMS.md: Platform implementation guide for developers - MASTODON_SETUP.md: Complete Mastodon deployment guide - constitution.md: Example governance constitution Technical stack: - Python 3.11+ - SQLAlchemy for state management - llm CLI for model abstraction - Mastodon.py for Mastodon integration - Pydantic for configuration validation Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
0
src/govbot/__init__.py
Normal file
0
src/govbot/__init__.py
Normal file
8
src/govbot/__main__.py
Normal file
8
src/govbot/__main__.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""
|
||||
Makes the package runnable with: python -m src.govbot
|
||||
"""
|
||||
|
||||
from .cli import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
562
src/govbot/agent.py
Normal file
562
src/govbot/agent.py
Normal file
@@ -0,0 +1,562 @@
|
||||
"""
|
||||
AI Agent Orchestration for Governance Bot.
|
||||
|
||||
This is the core agentic system that:
|
||||
1. Receives governance requests
|
||||
2. Consults the constitution (via RAG)
|
||||
3. Plans appropriate actions
|
||||
4. Executes using primitives
|
||||
5. Maintains audit trail
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from typing import Dict, Any, Optional, List
|
||||
from datetime import datetime, timedelta
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .governance.constitution import ConstitutionalReasoner
|
||||
from .governance.primitives import GovernancePrimitives
|
||||
from .db import queries
|
||||
|
||||
|
||||
class GovernanceAgent:
|
||||
"""
|
||||
The AI agent that interprets requests and orchestrates governance actions.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
db_session: Session,
|
||||
constitution_path: str,
|
||||
model: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Initialize the governance agent.
|
||||
|
||||
Args:
|
||||
db_session: Database session
|
||||
constitution_path: Path to constitution file
|
||||
model: LLM model to use (None for default)
|
||||
"""
|
||||
self.db = db_session
|
||||
self.constitution = ConstitutionalReasoner(constitution_path, model)
|
||||
self.primitives = GovernancePrimitives(db_session)
|
||||
self.model = model
|
||||
|
||||
def process_request(
|
||||
self, request: str, actor: str, context: Optional[Dict[str, Any]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Process a governance request from a user.
|
||||
|
||||
This is the main agentic loop:
|
||||
1. Parse intent
|
||||
2. Consult constitution
|
||||
3. Plan actions
|
||||
4. Execute with audit trail
|
||||
5. Return response
|
||||
|
||||
Args:
|
||||
request: Natural language request from user
|
||||
actor: Who made the request (Mastodon handle)
|
||||
context: Optional context (thread ID, etc.)
|
||||
|
||||
Returns:
|
||||
Dict with 'response', 'actions_taken', 'process_id', etc.
|
||||
"""
|
||||
# Step 1: Parse intent
|
||||
intent = self._parse_intent(request, actor)
|
||||
|
||||
if intent.get("error"):
|
||||
return {"response": intent["error"], "success": False}
|
||||
|
||||
# Step 2: Consult constitution
|
||||
constitutional_guidance = self.constitution.query(
|
||||
question=intent["intent_description"],
|
||||
context=f"Actor: {actor}, Request: {request}",
|
||||
)
|
||||
|
||||
# Step 3: Check for ambiguity
|
||||
if constitutional_guidance.get("confidence") == "low":
|
||||
return self._handle_ambiguity(
|
||||
request, actor, constitutional_guidance
|
||||
)
|
||||
|
||||
# Step 4: Plan actions
|
||||
action_plan = self._plan_actions(
|
||||
intent, constitutional_guidance, actor, context
|
||||
)
|
||||
|
||||
# Step 5: Execute plan
|
||||
result = self._execute_plan(action_plan, actor)
|
||||
|
||||
return result
|
||||
|
||||
def _parse_intent(self, request: str, actor: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Use AI to parse user intent from natural language.
|
||||
|
||||
Args:
|
||||
request: User's request
|
||||
actor: Who made the request
|
||||
|
||||
Returns:
|
||||
Dict with 'intent_type', 'intent_description', 'parameters'
|
||||
"""
|
||||
prompt = f"""Parse this governance request and extract structured information.
|
||||
|
||||
REQUEST: "{request}"
|
||||
ACTOR: {actor}
|
||||
|
||||
Identify:
|
||||
1. Intent type (e.g., "create_proposal", "cast_vote", "query_constitution", "appeal", etc.)
|
||||
2. Clear description of what the user wants
|
||||
3. Key parameters extracted from request
|
||||
|
||||
Respond with JSON:
|
||||
{{
|
||||
"intent_type": "the type of intent",
|
||||
"intent_description": "clear description of what user wants",
|
||||
"parameters": {{
|
||||
"key": "value"
|
||||
}}
|
||||
}}
|
||||
"""
|
||||
|
||||
try:
|
||||
result = self._call_llm(prompt)
|
||||
parsed = self._extract_json(result)
|
||||
return parsed
|
||||
except Exception as e:
|
||||
return {"error": f"Could not parse request: {str(e)}"}
|
||||
|
||||
def _plan_actions(
|
||||
self,
|
||||
intent: Dict[str, Any],
|
||||
constitutional_guidance: Dict[str, Any],
|
||||
actor: str,
|
||||
context: Optional[Dict[str, Any]],
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Plan the sequence of primitive actions to fulfill the intent.
|
||||
|
||||
Args:
|
||||
intent: Parsed intent
|
||||
constitutional_guidance: Constitutional interpretation
|
||||
actor: Who initiated
|
||||
context: Additional context
|
||||
|
||||
Returns:
|
||||
Action plan dictionary
|
||||
"""
|
||||
intent_type = intent.get("intent_type")
|
||||
|
||||
# Route to specific planning function based on intent
|
||||
if intent_type == "create_proposal":
|
||||
return self._plan_proposal_creation(
|
||||
intent, constitutional_guidance, actor, context
|
||||
)
|
||||
elif intent_type == "cast_vote":
|
||||
return self._plan_vote_casting(
|
||||
intent, constitutional_guidance, actor, context
|
||||
)
|
||||
elif intent_type == "query_constitution":
|
||||
return self._plan_constitutional_query(
|
||||
intent, constitutional_guidance, actor
|
||||
)
|
||||
elif intent_type == "appeal":
|
||||
return self._plan_appeal(
|
||||
intent, constitutional_guidance, actor, context
|
||||
)
|
||||
else:
|
||||
# Generic planning using AI
|
||||
return self._plan_generic(
|
||||
intent, constitutional_guidance, actor, context
|
||||
)
|
||||
|
||||
def _plan_proposal_creation(
|
||||
self,
|
||||
intent: Dict[str, Any],
|
||||
constitutional_guidance: Dict[str, Any],
|
||||
actor: str,
|
||||
context: Optional[Dict[str, Any]],
|
||||
) -> Dict[str, Any]:
|
||||
"""Plan actions for creating a proposal"""
|
||||
params = intent.get("parameters", {})
|
||||
proposal_text = params.get("proposal_text", intent.get("intent_description"))
|
||||
|
||||
# Interpret proposal to determine type and requirements
|
||||
proposal_info = self.constitution.interpret_proposal(proposal_text)
|
||||
|
||||
# Build action plan
|
||||
plan = {
|
||||
"intent_type": "create_proposal",
|
||||
"constitutional_basis": constitutional_guidance.get("citations", []),
|
||||
"actions": [
|
||||
{
|
||||
"primitive": "create_process",
|
||||
"args": {
|
||||
"process_type": f"{proposal_info['proposal_type']}_proposal",
|
||||
"creator": actor,
|
||||
"deadline_days": proposal_info.get("discussion_period_days", 6),
|
||||
"constitutional_basis": str(constitutional_guidance.get("citations")),
|
||||
"initial_state": {
|
||||
"proposal_text": proposal_text,
|
||||
"proposal_type": proposal_info["proposal_type"],
|
||||
"voting_threshold": proposal_info.get("voting_threshold"),
|
||||
"votes": {},
|
||||
},
|
||||
"mastodon_thread_id": context.get("thread_id")
|
||||
if context
|
||||
else None,
|
||||
},
|
||||
},
|
||||
{
|
||||
"primitive": "schedule_reminder",
|
||||
"args": {
|
||||
"when": "deadline", # Will be calculated from process deadline
|
||||
"message": f"Proposal by {actor} has reached its deadline. Counting votes.",
|
||||
},
|
||||
},
|
||||
],
|
||||
"response_template": f"""Proposal created: {proposal_text[:100]}...
|
||||
|
||||
Type: {proposal_info['proposal_type']}
|
||||
Discussion period: {proposal_info.get('discussion_period_days')} days
|
||||
Voting threshold: {proposal_info.get('voting_threshold')}
|
||||
|
||||
Constitutional basis: {', '.join(constitutional_guidance.get('citations', []))}
|
||||
|
||||
Reply with 'agree', 'disagree', 'abstain', or 'block' to vote.
|
||||
Process ID: {{process_id}}
|
||||
""",
|
||||
}
|
||||
|
||||
return plan
|
||||
|
||||
def _plan_vote_casting(
|
||||
self,
|
||||
intent: Dict[str, Any],
|
||||
constitutional_guidance: Dict[str, Any],
|
||||
actor: str,
|
||||
context: Optional[Dict[str, Any]],
|
||||
) -> Dict[str, Any]:
|
||||
"""Plan actions for casting a vote"""
|
||||
params = intent.get("parameters", {})
|
||||
vote_type = params.get("vote_type", "agree").lower()
|
||||
process_id = params.get("process_id")
|
||||
|
||||
if not process_id:
|
||||
return {
|
||||
"error": "Could not identify which proposal to vote on. Please reply to a proposal thread."
|
||||
}
|
||||
|
||||
plan = {
|
||||
"intent_type": "cast_vote",
|
||||
"constitutional_basis": constitutional_guidance.get("citations", []),
|
||||
"actions": [
|
||||
{
|
||||
"primitive": "update_process_state",
|
||||
"args": {
|
||||
"process_id": process_id,
|
||||
"state_updates": {
|
||||
f"votes.{actor}": {
|
||||
"vote": vote_type,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
}
|
||||
},
|
||||
"actor": actor,
|
||||
},
|
||||
}
|
||||
],
|
||||
"response_template": f"""Vote recorded: {vote_type}
|
||||
|
||||
Voter: {actor}
|
||||
Process: {{process_id}}
|
||||
""",
|
||||
}
|
||||
|
||||
return plan
|
||||
|
||||
def _plan_constitutional_query(
|
||||
self,
|
||||
intent: Dict[str, Any],
|
||||
constitutional_guidance: Dict[str, Any],
|
||||
actor: str,
|
||||
) -> Dict[str, Any]:
|
||||
"""Plan response for constitutional query"""
|
||||
return {
|
||||
"intent_type": "query_constitution",
|
||||
"actions": [], # No state changes needed
|
||||
"response_template": f"""Constitutional Interpretation:
|
||||
|
||||
{constitutional_guidance['answer']}
|
||||
|
||||
Citations: {', '.join(constitutional_guidance.get('citations', []))}
|
||||
Confidence: {constitutional_guidance.get('confidence', 'medium')}
|
||||
""",
|
||||
}
|
||||
|
||||
def _plan_appeal(
|
||||
self,
|
||||
intent: Dict[str, Any],
|
||||
constitutional_guidance: Dict[str, Any],
|
||||
actor: str,
|
||||
context: Optional[Dict[str, Any]],
|
||||
) -> Dict[str, Any]:
|
||||
"""Plan actions for an appeal"""
|
||||
params = intent.get("parameters", {})
|
||||
action_id = params.get("action_id")
|
||||
|
||||
plan = {
|
||||
"intent_type": "appeal",
|
||||
"constitutional_basis": constitutional_guidance.get("citations", []),
|
||||
"actions": [
|
||||
{
|
||||
"primitive": "create_process",
|
||||
"args": {
|
||||
"process_type": "appeal",
|
||||
"creator": actor,
|
||||
"deadline_days": 3,
|
||||
"constitutional_basis": "Article 6: Appeals",
|
||||
"initial_state": {
|
||||
"appealed_action_id": action_id,
|
||||
"appellant": actor,
|
||||
"votes": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
],
|
||||
"response_template": f"""Appeal initiated by {actor}
|
||||
|
||||
Appealing action: {{action_id}}
|
||||
Discussion period: 3 days
|
||||
|
||||
Community members can vote on whether to override the action.
|
||||
""",
|
||||
}
|
||||
|
||||
return plan
|
||||
|
||||
def _plan_generic(
|
||||
self,
|
||||
intent: Dict[str, Any],
|
||||
constitutional_guidance: Dict[str, Any],
|
||||
actor: str,
|
||||
context: Optional[Dict[str, Any]],
|
||||
) -> Dict[str, Any]:
|
||||
"""Use AI to plan generic actions"""
|
||||
# This is a fallback for intents we haven't explicitly coded
|
||||
prompt = f"""Based on this intent and constitutional guidance, plan the primitive actions needed.
|
||||
|
||||
INTENT: {json.dumps(intent, indent=2)}
|
||||
|
||||
CONSTITUTIONAL GUIDANCE: {json.dumps(constitutional_guidance, indent=2)}
|
||||
|
||||
Available primitives:
|
||||
- create_process(process_type, creator, deadline_days, constitutional_basis, initial_state)
|
||||
- update_process_state(process_id, state_updates, actor)
|
||||
- store_record(record_type, data, actor, reasoning, citation)
|
||||
- schedule_reminder(when, message)
|
||||
|
||||
Plan the actions as JSON:
|
||||
{{
|
||||
"actions": [
|
||||
{{"primitive": "name", "args": {{...}}}}
|
||||
],
|
||||
"response_template": "Message to send user"
|
||||
}}
|
||||
"""
|
||||
|
||||
try:
|
||||
result = self._call_llm(prompt)
|
||||
plan = self._extract_json(result)
|
||||
plan["intent_type"] = intent.get("intent_type")
|
||||
plan["constitutional_basis"] = constitutional_guidance.get("citations", [])
|
||||
return plan
|
||||
except Exception as e:
|
||||
return {
|
||||
"error": f"Could not plan actions: {str(e)}",
|
||||
"intent": intent,
|
||||
"guidance": constitutional_guidance,
|
||||
}
|
||||
|
||||
def _execute_plan(
|
||||
self, plan: Dict[str, Any], actor: str
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute the planned actions using primitives.
|
||||
|
||||
Args:
|
||||
plan: Action plan
|
||||
actor: Who initiated
|
||||
|
||||
Returns:
|
||||
Execution result
|
||||
"""
|
||||
if plan.get("error"):
|
||||
return {"response": plan["error"], "success": False}
|
||||
|
||||
executed_actions = []
|
||||
process_id = None
|
||||
|
||||
try:
|
||||
for action in plan.get("actions", []):
|
||||
primitive = action["primitive"]
|
||||
args = action["args"]
|
||||
|
||||
# Get the primitive function
|
||||
if hasattr(self.primitives, primitive):
|
||||
func = getattr(self.primitives, primitive)
|
||||
|
||||
# Handle special cases like deadline calculation
|
||||
if "when" in args and args["when"] == "deadline":
|
||||
# Calculate from process deadline
|
||||
if process_id:
|
||||
process = queries.get_process(self.db, process_id)
|
||||
args["when"] = process.deadline
|
||||
|
||||
result = func(**args)
|
||||
|
||||
# Track process ID for response
|
||||
if primitive == "create_process":
|
||||
process_id = result
|
||||
|
||||
executed_actions.append(
|
||||
{"primitive": primitive, "result": result}
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown primitive: {primitive}")
|
||||
|
||||
# Build response
|
||||
response_template = plan.get("response_template", "Action completed.")
|
||||
response = response_template.format(
|
||||
process_id=process_id, action_id=executed_actions[0].get("result")
|
||||
if executed_actions
|
||||
else None
|
||||
)
|
||||
|
||||
return {
|
||||
"response": response,
|
||||
"success": True,
|
||||
"process_id": process_id,
|
||||
"actions_taken": executed_actions,
|
||||
"constitutional_basis": plan.get("constitutional_basis"),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"response": f"Error executing actions: {str(e)}",
|
||||
"success": False,
|
||||
"partial_actions": executed_actions,
|
||||
}
|
||||
|
||||
def _handle_ambiguity(
|
||||
self,
|
||||
request: str,
|
||||
actor: str,
|
||||
constitutional_guidance: Dict[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Handle constitutional ambiguity by requesting clarification.
|
||||
|
||||
Args:
|
||||
request: Original request
|
||||
actor: Who made request
|
||||
constitutional_guidance: The ambiguous guidance
|
||||
|
||||
Returns:
|
||||
Response explaining ambiguity
|
||||
"""
|
||||
ambiguity = constitutional_guidance.get("ambiguity", "Constitutional interpretation unclear")
|
||||
|
||||
# Create clarification request
|
||||
clarification = queries.create_clarification(
|
||||
session=self.db,
|
||||
question=f"Ambiguity in request '{request}': {ambiguity}",
|
||||
)
|
||||
|
||||
response = f"""I encountered constitutional ambiguity in processing your request.
|
||||
|
||||
Question: {ambiguity}
|
||||
|
||||
This requires community clarification. Members can discuss and provide guidance.
|
||||
|
||||
Clarification ID: {clarification.id}
|
||||
"""
|
||||
|
||||
return {
|
||||
"response": response,
|
||||
"success": False,
|
||||
"requires_clarification": True,
|
||||
"clarification_id": clarification.id,
|
||||
}
|
||||
|
||||
def check_deadlines(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Check for processes that have passed their deadline.
|
||||
This should be called periodically by a background task.
|
||||
|
||||
Returns:
|
||||
List of processes that were completed
|
||||
"""
|
||||
overdue_processes = queries.get_processes_past_deadline(self.db)
|
||||
completed = []
|
||||
|
||||
for process in overdue_processes:
|
||||
# Count votes
|
||||
counts = self.primitives.count_votes(process.id)
|
||||
|
||||
# Determine threshold from process state
|
||||
threshold_type = process.state_data.get(
|
||||
"voting_threshold", "simple_majority"
|
||||
)
|
||||
|
||||
# Check if passed
|
||||
passed = self.primitives.check_threshold(counts, threshold_type)
|
||||
|
||||
outcome = "passed" if passed else "failed"
|
||||
|
||||
# Complete the process
|
||||
self.primitives.complete_process(
|
||||
process_id=process.id,
|
||||
outcome=outcome,
|
||||
reasoning=f"Vote counts: {counts}. Threshold: {threshold_type}. Result: {outcome}",
|
||||
)
|
||||
|
||||
completed.append(
|
||||
{
|
||||
"process_id": process.id,
|
||||
"outcome": outcome,
|
||||
"vote_counts": counts,
|
||||
}
|
||||
)
|
||||
|
||||
return completed
|
||||
|
||||
def _call_llm(self, prompt: str) -> str:
|
||||
"""Call the LLM via llm CLI"""
|
||||
cmd = ["llm", "prompt"]
|
||||
if self.model:
|
||||
cmd.extend(["-m", self.model])
|
||||
cmd.append(prompt)
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
||||
return result.stdout.strip()
|
||||
|
||||
def _extract_json(self, text: str) -> Dict[str, Any]:
|
||||
"""Extract JSON from LLM response"""
|
||||
# Handle markdown code blocks
|
||||
if "```json" in text:
|
||||
start = text.find("```json") + 7
|
||||
end = text.find("```", start)
|
||||
json_str = text[start:end].strip()
|
||||
elif "```" in text:
|
||||
start = text.find("```") + 3
|
||||
end = text.find("```", start)
|
||||
json_str = text[start:end].strip()
|
||||
else:
|
||||
json_str = text
|
||||
|
||||
return json.loads(json_str)
|
||||
277
src/govbot/bot.py
Normal file
277
src/govbot/bot.py
Normal file
@@ -0,0 +1,277 @@
|
||||
"""
|
||||
Main Govbot entry point.
|
||||
|
||||
This module provides the main bot class that integrates:
|
||||
- Platform adapter (Mastodon, Discord, etc.)
|
||||
- AI agent for governance
|
||||
- Background task scheduler
|
||||
- Audit logging
|
||||
|
||||
The bot is platform-agnostic and works with any platform that implements
|
||||
the PlatformAdapter interface.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from typing import Optional
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .utils.config import load_config, BotConfig
|
||||
from .db.models import init_db, get_session
|
||||
from .agent import GovernanceAgent
|
||||
from .scheduler import GovernanceScheduler
|
||||
from .platforms.base import PlatformAdapter, PlatformMessage, MockPlatformAdapter
|
||||
from .platforms.mastodon import MastodonAdapter
|
||||
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
)
|
||||
logger = logging.getLogger("govbot")
|
||||
|
||||
|
||||
class Govbot:
|
||||
"""
|
||||
Main governance bot class.
|
||||
|
||||
Integrates all components:
|
||||
- Platform adapter (for message handling)
|
||||
- AI agent (for governance logic)
|
||||
- Database (for state and audit trail)
|
||||
- Scheduler (for deadlines and reminders)
|
||||
|
||||
Platform-agnostic design allows the same governance logic to work
|
||||
across Mastodon, Discord, Telegram, Matrix, etc.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config_path: str = "config/config.yaml",
|
||||
platform_adapter: Optional[PlatformAdapter] = None,
|
||||
):
|
||||
"""
|
||||
Initialize the governance bot.
|
||||
|
||||
Args:
|
||||
config_path: Path to configuration file
|
||||
platform_adapter: Pre-configured platform adapter (optional).
|
||||
If None, will be created from config.
|
||||
"""
|
||||
logger.info("Initializing Govbot...")
|
||||
|
||||
# Load configuration
|
||||
try:
|
||||
self.config = load_config(config_path)
|
||||
except FileNotFoundError as e:
|
||||
logger.error(f"Configuration error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Set log level from config
|
||||
log_level = getattr(logging, self.config.log_level.upper(), logging.INFO)
|
||||
logger.setLevel(log_level)
|
||||
|
||||
# Initialize database
|
||||
logger.info(f"Initializing database: {self.config.governance.db_path}")
|
||||
self.engine = init_db(self.config.governance.db_path)
|
||||
self.db_session = get_session(self.engine)
|
||||
|
||||
# Initialize AI agent
|
||||
logger.info("Initializing AI agent...")
|
||||
self.agent = GovernanceAgent(
|
||||
db_session=self.db_session,
|
||||
constitution_path=self.config.governance.constitution_path,
|
||||
model=self.config.ai.default_model,
|
||||
)
|
||||
|
||||
# Initialize scheduler for background tasks
|
||||
logger.info("Initializing scheduler...")
|
||||
self.scheduler = GovernanceScheduler(self.agent, self.db_session)
|
||||
|
||||
# Initialize platform adapter
|
||||
if platform_adapter:
|
||||
self.platform = platform_adapter
|
||||
else:
|
||||
self.platform = self._create_platform_adapter()
|
||||
|
||||
logger.info("Govbot initialized successfully!")
|
||||
|
||||
def _create_platform_adapter(self) -> PlatformAdapter:
|
||||
"""
|
||||
Create platform adapter from configuration.
|
||||
|
||||
Returns:
|
||||
Configured platform adapter
|
||||
|
||||
Raises:
|
||||
ValueError: If platform type unknown or config invalid
|
||||
"""
|
||||
platform_type = self.config.platform.type.lower()
|
||||
|
||||
logger.info(f"Creating {platform_type} platform adapter...")
|
||||
|
||||
if platform_type == "mastodon":
|
||||
return MastodonAdapter(self.config.platform.mastodon.model_dump())
|
||||
elif platform_type == "mock":
|
||||
return MockPlatformAdapter({})
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Unknown platform type: {platform_type}. "
|
||||
f"Supported: mastodon, mock"
|
||||
)
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Start the bot.
|
||||
|
||||
This will:
|
||||
1. Connect to platform
|
||||
2. Start background scheduler
|
||||
3. Listen for messages and respond
|
||||
"""
|
||||
logger.info("Starting Govbot...")
|
||||
|
||||
# Start scheduler in background
|
||||
self.scheduler.start()
|
||||
|
||||
# Connect to platform
|
||||
try:
|
||||
if self.platform.connect():
|
||||
logger.info("Connected to platform successfully")
|
||||
else:
|
||||
logger.error("Failed to connect to platform")
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Platform connection error: {e}")
|
||||
if isinstance(self.platform, MockPlatformAdapter):
|
||||
logger.info("Running in mock/test mode - use CLI for testing")
|
||||
else:
|
||||
return
|
||||
|
||||
# Start listening for messages
|
||||
try:
|
||||
self.platform.start_listening(self._handle_message)
|
||||
logger.info("Started listening for messages")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start listening: {e}")
|
||||
|
||||
try:
|
||||
# Keep running and let scheduler handle background tasks
|
||||
logger.info("Bot is running. Press Ctrl+C to stop.")
|
||||
while True:
|
||||
time.sleep(60) # Check every minute
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Shutting down...")
|
||||
self.scheduler.stop()
|
||||
self.platform.disconnect()
|
||||
logger.info("Govbot stopped.")
|
||||
|
||||
def _handle_message(self, message: PlatformMessage):
|
||||
"""
|
||||
Handle incoming message from platform.
|
||||
|
||||
Args:
|
||||
message: Normalized platform message
|
||||
"""
|
||||
logger.info(
|
||||
f"Received message from @{message.author_handle}: {message.text[:50]}..."
|
||||
)
|
||||
|
||||
# Process the request through the agent
|
||||
context = {
|
||||
"thread_id": message.thread_id,
|
||||
"reply_to_id": message.reply_to_id,
|
||||
"platform_message": message,
|
||||
}
|
||||
|
||||
result = self.agent.process_request(
|
||||
request=message.text,
|
||||
actor=f"@{message.author_handle}",
|
||||
context=context,
|
||||
)
|
||||
|
||||
# Post response
|
||||
response = result.get("response", "Sorry, I couldn't process that request.")
|
||||
|
||||
try:
|
||||
self.platform.post(
|
||||
message=response,
|
||||
thread_id=message.thread_id,
|
||||
reply_to_id=message.id, # Reply to the message that mentioned us
|
||||
visibility=message.visibility,
|
||||
)
|
||||
logger.info("Posted response")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to post response: {e}")
|
||||
|
||||
def process_mention(self, mention_text: str, author: str, thread_id: Optional[str] = None):
|
||||
"""
|
||||
Process a mention of the bot.
|
||||
|
||||
Args:
|
||||
mention_text: Text of the mention
|
||||
author: Who mentioned the bot
|
||||
thread_id: ID of the thread (for context)
|
||||
|
||||
Returns:
|
||||
Response text to post
|
||||
"""
|
||||
logger.info(f"Processing mention from {author}: {mention_text}")
|
||||
|
||||
context = {"thread_id": thread_id} if thread_id else None
|
||||
|
||||
result = self.agent.process_request(
|
||||
request=mention_text,
|
||||
actor=author,
|
||||
context=context,
|
||||
)
|
||||
|
||||
response = result.get("response", "Sorry, I couldn't process that request.")
|
||||
|
||||
if result.get("success"):
|
||||
logger.info(f"Successfully processed request. Process ID: {result.get('process_id')}")
|
||||
else:
|
||||
logger.warning(f"Request failed: {response}")
|
||||
|
||||
return response
|
||||
|
||||
def close(self):
|
||||
"""Clean up resources"""
|
||||
if self.scheduler:
|
||||
self.scheduler.stop()
|
||||
if self.platform and self.platform.connected:
|
||||
self.platform.disconnect()
|
||||
if self.db_session:
|
||||
self.db_session.close()
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point for running the bot"""
|
||||
# Check if config exists
|
||||
config_path = Path("config/config.yaml")
|
||||
|
||||
if not config_path.exists():
|
||||
logger.error(
|
||||
"Configuration file not found!\n"
|
||||
"Please copy config/config.example.yaml to config/config.yaml "
|
||||
"and edit with your settings."
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Create and run bot
|
||||
bot = Govbot()
|
||||
|
||||
try:
|
||||
bot.run()
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error: {e}", exc_info=True)
|
||||
bot.close()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
226
src/govbot/cli.py
Normal file
226
src/govbot/cli.py
Normal file
@@ -0,0 +1,226 @@
|
||||
"""
|
||||
Command-line interface for testing Govbot.
|
||||
|
||||
Allows you to interact with the bot without Mastodon,
|
||||
useful for development and testing.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
import logging
|
||||
|
||||
from .bot import Govbot
|
||||
from .db.models import init_db, get_session
|
||||
from .utils.config import load_config
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||||
)
|
||||
logger = logging.getLogger("govbot.cli")
|
||||
|
||||
|
||||
def print_banner():
|
||||
"""Print welcome banner"""
|
||||
print("\n" + "=" * 60)
|
||||
print("GOVBOT - Agentic Governance Bot")
|
||||
print("=" * 60)
|
||||
print("\nType 'help' for commands, 'exit' to quit\n")
|
||||
|
||||
|
||||
def print_help():
|
||||
"""Print help text"""
|
||||
help_text = """
|
||||
Available commands:
|
||||
|
||||
help Show this help
|
||||
exit, quit Exit the CLI
|
||||
constitution Show the full constitution
|
||||
sections List constitutional sections
|
||||
query <question> Query the constitution
|
||||
propose <text> Create a proposal
|
||||
vote <process_id> <vote> Cast a vote (agree/disagree/abstain/block)
|
||||
status <process_id> Check process status
|
||||
processes List active processes
|
||||
actions List recent actions
|
||||
veto <action_id> Cast veto vote on action
|
||||
|
||||
Examples:
|
||||
|
||||
query What are the rules for proposals?
|
||||
propose We should update the moderation policy
|
||||
vote 1 agree
|
||||
status 1
|
||||
"""
|
||||
print(help_text)
|
||||
|
||||
|
||||
class GovbotCLI:
|
||||
"""Interactive CLI for Govbot"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the CLI"""
|
||||
# Check config
|
||||
config_path = Path("config/config.yaml")
|
||||
if not config_path.exists():
|
||||
logger.error(
|
||||
"Config file not found. Copy config/config.example.yaml to config/config.yaml"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Initialize bot
|
||||
self.bot = Govbot(str(config_path))
|
||||
self.current_user = "@testuser" # Simulated user for testing
|
||||
|
||||
def run(self):
|
||||
"""Run the interactive CLI"""
|
||||
print_banner()
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
# Get input
|
||||
cmd_input = input(f"{self.current_user}> ").strip()
|
||||
|
||||
if not cmd_input:
|
||||
continue
|
||||
|
||||
# Parse command
|
||||
parts = cmd_input.split(maxsplit=1)
|
||||
command = parts[0].lower()
|
||||
args = parts[1] if len(parts) > 1 else ""
|
||||
|
||||
# Execute command
|
||||
self.execute_command(command, args)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nUse 'exit' to quit")
|
||||
continue
|
||||
except EOFError:
|
||||
break
|
||||
|
||||
finally:
|
||||
print("\nGoodbye!")
|
||||
self.bot.close()
|
||||
|
||||
def execute_command(self, command: str, args: str):
|
||||
"""Execute a CLI command"""
|
||||
|
||||
if command in ["exit", "quit"]:
|
||||
sys.exit(0)
|
||||
|
||||
elif command == "help":
|
||||
print_help()
|
||||
|
||||
elif command == "constitution":
|
||||
text = self.bot.agent.constitution.get_full_constitution()
|
||||
print("\n" + text + "\n")
|
||||
|
||||
elif command == "sections":
|
||||
sections = self.bot.agent.constitution.list_sections()
|
||||
print("\nConstitutional Sections:")
|
||||
for i, section in enumerate(sections, 1):
|
||||
print(f" {i}. {section}")
|
||||
print()
|
||||
|
||||
elif command == "query":
|
||||
if not args:
|
||||
print("Usage: query <question>")
|
||||
return
|
||||
result = self.bot.agent.constitution.query(args)
|
||||
print(f"\nAnswer: {result['answer']}")
|
||||
print(f"Citations: {', '.join(result['citations'])}")
|
||||
print(f"Confidence: {result['confidence']}\n")
|
||||
|
||||
elif command == "propose":
|
||||
if not args:
|
||||
print("Usage: propose <proposal text>")
|
||||
return
|
||||
response = self.bot.process_mention(
|
||||
mention_text=f"I propose: {args}",
|
||||
author=self.current_user,
|
||||
)
|
||||
print(f"\n{response}\n")
|
||||
|
||||
elif command == "vote":
|
||||
parts = args.split(maxsplit=1)
|
||||
if len(parts) < 2:
|
||||
print("Usage: vote <process_id> <agree|disagree|abstain|block>")
|
||||
return
|
||||
process_id = parts[0]
|
||||
vote_type = parts[1]
|
||||
response = self.bot.process_mention(
|
||||
mention_text=f"vote {vote_type} on process {process_id}",
|
||||
author=self.current_user,
|
||||
)
|
||||
print(f"\n{response}\n")
|
||||
|
||||
elif command == "status":
|
||||
if not args:
|
||||
print("Usage: status <process_id>")
|
||||
return
|
||||
try:
|
||||
process_id = int(args)
|
||||
from .db import queries
|
||||
|
||||
process = queries.get_process(self.bot.db_session, process_id)
|
||||
if process:
|
||||
print(f"\nProcess {process.id}:")
|
||||
print(f" Type: {process.process_type}")
|
||||
print(f" Creator: {process.creator}")
|
||||
print(f" Status: {process.status}")
|
||||
print(f" Deadline: {process.deadline}")
|
||||
print(f" State: {process.state_data}")
|
||||
print()
|
||||
else:
|
||||
print(f"Process {process_id} not found\n")
|
||||
except ValueError:
|
||||
print("Invalid process ID\n")
|
||||
|
||||
elif command == "processes":
|
||||
from .db import queries
|
||||
|
||||
processes = queries.get_active_processes(self.bot.db_session)
|
||||
if processes:
|
||||
print("\nActive Processes:")
|
||||
for p in processes:
|
||||
print(f" #{p.id}: {p.process_type} by {p.creator} (deadline: {p.deadline})")
|
||||
print()
|
||||
else:
|
||||
print("\nNo active processes\n")
|
||||
|
||||
elif command == "actions":
|
||||
from .db import queries
|
||||
|
||||
actions = queries.get_recent_actions(self.bot.db_session, limit=10)
|
||||
if actions:
|
||||
print("\nRecent Actions:")
|
||||
for a in actions:
|
||||
print(f" #{a.id}: {a.action_type} by {a.actor} ({a.status})")
|
||||
print()
|
||||
else:
|
||||
print("\nNo recent actions\n")
|
||||
|
||||
elif command == "veto":
|
||||
if not args:
|
||||
print("Usage: veto <action_id>")
|
||||
return
|
||||
response = self.bot.process_mention(
|
||||
mention_text=f"veto action {args}",
|
||||
author=self.current_user,
|
||||
)
|
||||
print(f"\n{response}\n")
|
||||
|
||||
else:
|
||||
print(f"Unknown command: {command}")
|
||||
print("Type 'help' for available commands\n")
|
||||
|
||||
|
||||
def main():
|
||||
"""CLI entry point"""
|
||||
cli = GovbotCLI()
|
||||
cli.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
0
src/govbot/db/__init__.py
Normal file
0
src/govbot/db/__init__.py
Normal file
202
src/govbot/db/models.py
Normal file
202
src/govbot/db/models.py
Normal file
@@ -0,0 +1,202 @@
|
||||
"""
|
||||
Database models for governance bot using SQLAlchemy.
|
||||
|
||||
These models support flexible, agentic governance by storing:
|
||||
- Actions with constitutional reasoning and audit trail
|
||||
- Governance processes with flexible state
|
||||
- Clarification requests for constitutional ambiguity
|
||||
- Veto votes for safety mechanism
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
import json
|
||||
|
||||
from sqlalchemy import (
|
||||
create_engine,
|
||||
Column,
|
||||
Integer,
|
||||
String,
|
||||
Text,
|
||||
DateTime,
|
||||
Boolean,
|
||||
ForeignKey,
|
||||
JSON,
|
||||
)
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import relationship, Session
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class Action(Base):
|
||||
"""
|
||||
Records all bot actions with constitutional grounding and audit trail.
|
||||
Every action must cite constitutional basis and can be reversed.
|
||||
"""
|
||||
|
||||
__tablename__ = "actions"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
timestamp = Column(DateTime, default=datetime.utcnow, nullable=False)
|
||||
action_type = Column(String(100), nullable=False) # e.g., "proposal_created", "vote_cast"
|
||||
actor = Column(String(500), nullable=False) # Mastodon handle or "bot"
|
||||
bot_reasoning = Column(Text) # AI's explanation of why it took this action
|
||||
constitutional_citation = Column(Text) # Which part of constitution was applied
|
||||
data = Column(JSON) # Flexible storage for action-specific data
|
||||
reversible = Column(Boolean, default=True) # Can this action be reversed?
|
||||
reversed_by = Column(Integer, ForeignKey("actions.id"), nullable=True)
|
||||
status = Column(
|
||||
String(50), default="executed"
|
||||
) # pending, executed, reversed, vetoed
|
||||
|
||||
# Relationship to reversal action
|
||||
reversal = relationship("Action", remote_side=[id], foreign_keys=[reversed_by])
|
||||
|
||||
# Relationship to veto votes
|
||||
veto_votes = relationship("VetoVote", back_populates="action")
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Action {self.id}: {self.action_type} by {self.actor} at {self.timestamp}>"
|
||||
|
||||
def to_dict(self):
|
||||
"""Convert to dictionary for JSON serialization"""
|
||||
return {
|
||||
"id": self.id,
|
||||
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
|
||||
"action_type": self.action_type,
|
||||
"actor": self.actor,
|
||||
"bot_reasoning": self.bot_reasoning,
|
||||
"constitutional_citation": self.constitutional_citation,
|
||||
"data": self.data,
|
||||
"reversible": self.reversible,
|
||||
"reversed_by": self.reversed_by,
|
||||
"status": self.status,
|
||||
}
|
||||
|
||||
|
||||
class GovernanceProcess(Base):
|
||||
"""
|
||||
Represents an ongoing governance process (proposal, vote, etc.).
|
||||
Flexible schema allows AI to create different process types dynamically.
|
||||
"""
|
||||
|
||||
__tablename__ = "governance_processes"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
process_type = Column(String(100), nullable=False) # Dynamically determined by AI
|
||||
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
||||
deadline = Column(DateTime) # When this process concludes
|
||||
status = Column(String(50), default="active") # active, completed, cancelled
|
||||
state_data = Column(JSON) # Flexible storage for process-specific state
|
||||
constitutional_basis = Column(Text) # Citation of relevant constitutional provisions
|
||||
creator = Column(String(500)) # Who initiated this process
|
||||
mastodon_thread_id = Column(String(500)) # Link to Mastodon discussion thread
|
||||
|
||||
def __repr__(self):
|
||||
return f"<GovernanceProcess {self.id}: {self.process_type} ({self.status})>"
|
||||
|
||||
def to_dict(self):
|
||||
"""Convert to dictionary for JSON serialization"""
|
||||
return {
|
||||
"id": self.id,
|
||||
"process_type": self.process_type,
|
||||
"created_at": self.created_at.isoformat() if self.created_at else None,
|
||||
"deadline": self.deadline.isoformat() if self.deadline else None,
|
||||
"status": self.status,
|
||||
"state_data": self.state_data,
|
||||
"constitutional_basis": self.constitutional_basis,
|
||||
"creator": self.creator,
|
||||
"mastodon_thread_id": self.mastodon_thread_id,
|
||||
}
|
||||
|
||||
|
||||
class Clarification(Base):
|
||||
"""
|
||||
Records constitutional ambiguities that need human clarification.
|
||||
AI uses these when it encounters serious uncertainty about interpretation.
|
||||
"""
|
||||
|
||||
__tablename__ = "clarifications"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
question = Column(Text, nullable=False) # What's ambiguous?
|
||||
process_id = Column(
|
||||
Integer, ForeignKey("governance_processes.id"), nullable=True
|
||||
) # Related process if any
|
||||
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
||||
resolution = Column(Text) # Community's answer
|
||||
resolved_at = Column(DateTime)
|
||||
constitutional_update = Column(
|
||||
Boolean, default=False
|
||||
) # Should this update the constitution?
|
||||
|
||||
# Relationship to governance process
|
||||
process = relationship("GovernanceProcess")
|
||||
|
||||
def __repr__(self):
|
||||
status = "resolved" if self.resolved_at else "pending"
|
||||
return f"<Clarification {self.id}: {status}>"
|
||||
|
||||
def to_dict(self):
|
||||
"""Convert to dictionary for JSON serialization"""
|
||||
return {
|
||||
"id": self.id,
|
||||
"question": self.question,
|
||||
"process_id": self.process_id,
|
||||
"created_at": self.created_at.isoformat() if self.created_at else None,
|
||||
"resolution": self.resolution,
|
||||
"resolved_at": self.resolved_at.isoformat() if self.resolved_at else None,
|
||||
"constitutional_update": self.constitutional_update,
|
||||
}
|
||||
|
||||
|
||||
class VetoVote(Base):
|
||||
"""
|
||||
Tracks supermajority veto votes on bot actions.
|
||||
Safety mechanism allowing community to halt problematic actions.
|
||||
"""
|
||||
|
||||
__tablename__ = "veto_votes"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
action_id = Column(Integer, ForeignKey("actions.id"), nullable=False)
|
||||
voter = Column(String(500), nullable=False) # Mastodon handle
|
||||
vote = Column(String(20), nullable=False) # "veto" or "support"
|
||||
timestamp = Column(DateTime, default=datetime.utcnow, nullable=False)
|
||||
rationale = Column(Text) # Why veto/support?
|
||||
|
||||
# Relationship to action
|
||||
action = relationship("Action", back_populates="veto_votes")
|
||||
|
||||
def __repr__(self):
|
||||
return f"<VetoVote {self.id}: {self.voter} {self.vote}s action {self.action_id}>"
|
||||
|
||||
def to_dict(self):
|
||||
"""Convert to dictionary for JSON serialization"""
|
||||
return {
|
||||
"id": self.id,
|
||||
"action_id": self.action_id,
|
||||
"voter": self.voter,
|
||||
"vote": self.vote,
|
||||
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
|
||||
"rationale": self.rationale,
|
||||
}
|
||||
|
||||
|
||||
# Database initialization and utility functions
|
||||
|
||||
|
||||
def init_db(db_path: str = "govbot.db"):
|
||||
"""Initialize the database with all tables"""
|
||||
engine = create_engine(f"sqlite:///{db_path}")
|
||||
Base.metadata.create_all(engine)
|
||||
return engine
|
||||
|
||||
|
||||
def get_session(engine):
|
||||
"""Create a database session"""
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
Session = sessionmaker(bind=engine)
|
||||
return Session()
|
||||
277
src/govbot/db/queries.py
Normal file
277
src/govbot/db/queries.py
Normal file
@@ -0,0 +1,277 @@
|
||||
"""
|
||||
Database query functions for governance operations.
|
||||
|
||||
Provides high-level functions for common governance queries,
|
||||
abstracting SQLAlchemy details for easier use by the agent.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import List, Optional, Dict, Any
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import func, and_, or_
|
||||
|
||||
from .models import Action, GovernanceProcess, Clarification, VetoVote
|
||||
|
||||
|
||||
# Action queries
|
||||
|
||||
|
||||
def create_action(
|
||||
session: Session,
|
||||
action_type: str,
|
||||
actor: str,
|
||||
data: Dict[str, Any],
|
||||
bot_reasoning: Optional[str] = None,
|
||||
constitutional_citation: Optional[str] = None,
|
||||
reversible: bool = True,
|
||||
) -> Action:
|
||||
"""Create a new action with audit trail"""
|
||||
action = Action(
|
||||
action_type=action_type,
|
||||
actor=actor,
|
||||
data=data,
|
||||
bot_reasoning=bot_reasoning,
|
||||
constitutional_citation=constitutional_citation,
|
||||
reversible=reversible,
|
||||
status="executed",
|
||||
)
|
||||
session.add(action)
|
||||
session.commit()
|
||||
return action
|
||||
|
||||
|
||||
def get_action(session: Session, action_id: int) -> Optional[Action]:
|
||||
"""Retrieve an action by ID"""
|
||||
return session.query(Action).filter(Action.id == action_id).first()
|
||||
|
||||
|
||||
def reverse_action(
|
||||
session: Session, action_id: int, reversing_actor: str, reason: str
|
||||
) -> Action:
|
||||
"""Reverse an action by creating a reversal record"""
|
||||
original = get_action(session, action_id)
|
||||
if not original:
|
||||
raise ValueError(f"Action {action_id} not found")
|
||||
if not original.reversible:
|
||||
raise ValueError(f"Action {action_id} is not reversible")
|
||||
|
||||
# Mark original as reversed
|
||||
original.status = "reversed"
|
||||
|
||||
# Create reversal action
|
||||
reversal = Action(
|
||||
action_type=f"reverse_{original.action_type}",
|
||||
actor=reversing_actor,
|
||||
data={"original_action_id": action_id, "reason": reason},
|
||||
bot_reasoning=reason,
|
||||
reversed_by=action_id,
|
||||
reversible=False,
|
||||
status="executed",
|
||||
)
|
||||
session.add(reversal)
|
||||
session.commit()
|
||||
return reversal
|
||||
|
||||
|
||||
def get_recent_actions(
|
||||
session: Session, limit: int = 50, action_type: Optional[str] = None
|
||||
) -> List[Action]:
|
||||
"""Get recent actions, optionally filtered by type"""
|
||||
query = session.query(Action).order_by(Action.timestamp.desc())
|
||||
if action_type:
|
||||
query = query.filter(Action.action_type == action_type)
|
||||
return query.limit(limit).all()
|
||||
|
||||
|
||||
# Governance process queries
|
||||
|
||||
|
||||
def create_process(
|
||||
session: Session,
|
||||
process_type: str,
|
||||
creator: str,
|
||||
constitutional_basis: str,
|
||||
deadline: Optional[datetime] = None,
|
||||
state_data: Optional[Dict[str, Any]] = None,
|
||||
mastodon_thread_id: Optional[str] = None,
|
||||
) -> GovernanceProcess:
|
||||
"""Create a new governance process"""
|
||||
process = GovernanceProcess(
|
||||
process_type=process_type,
|
||||
creator=creator,
|
||||
constitutional_basis=constitutional_basis,
|
||||
deadline=deadline,
|
||||
state_data=state_data or {},
|
||||
mastodon_thread_id=mastodon_thread_id,
|
||||
status="active",
|
||||
)
|
||||
session.add(process)
|
||||
session.commit()
|
||||
return process
|
||||
|
||||
|
||||
def get_process(session: Session, process_id: int) -> Optional[GovernanceProcess]:
|
||||
"""Retrieve a governance process by ID"""
|
||||
return session.query(GovernanceProcess).filter(GovernanceProcess.id == process_id).first()
|
||||
|
||||
|
||||
def update_process_state(
|
||||
session: Session, process_id: int, state_data: Dict[str, Any]
|
||||
) -> GovernanceProcess:
|
||||
"""Update the state data of a governance process"""
|
||||
process = get_process(session, process_id)
|
||||
if not process:
|
||||
raise ValueError(f"Process {process_id} not found")
|
||||
|
||||
# Merge new state data with existing
|
||||
current_state = process.state_data or {}
|
||||
current_state.update(state_data)
|
||||
process.state_data = current_state
|
||||
|
||||
session.commit()
|
||||
return process
|
||||
|
||||
|
||||
def complete_process(
|
||||
session: Session, process_id: int, outcome: str
|
||||
) -> GovernanceProcess:
|
||||
"""Mark a governance process as completed"""
|
||||
process = get_process(session, process_id)
|
||||
if not process:
|
||||
raise ValueError(f"Process {process_id} not found")
|
||||
|
||||
process.status = "completed"
|
||||
state_data = process.state_data or {}
|
||||
state_data["outcome"] = outcome
|
||||
state_data["completed_at"] = datetime.utcnow().isoformat()
|
||||
process.state_data = state_data
|
||||
|
||||
session.commit()
|
||||
return process
|
||||
|
||||
|
||||
def get_active_processes(
|
||||
session: Session, process_type: Optional[str] = None
|
||||
) -> List[GovernanceProcess]:
|
||||
"""Get all active governance processes, optionally filtered by type"""
|
||||
query = session.query(GovernanceProcess).filter(GovernanceProcess.status == "active")
|
||||
if process_type:
|
||||
query = query.filter(GovernanceProcess.process_type == process_type)
|
||||
return query.all()
|
||||
|
||||
|
||||
def get_processes_past_deadline(session: Session) -> List[GovernanceProcess]:
|
||||
"""Get active processes that have passed their deadline"""
|
||||
return (
|
||||
session.query(GovernanceProcess)
|
||||
.filter(
|
||||
and_(
|
||||
GovernanceProcess.status == "active",
|
||||
GovernanceProcess.deadline <= datetime.utcnow(),
|
||||
)
|
||||
)
|
||||
.all()
|
||||
)
|
||||
|
||||
|
||||
# Clarification queries
|
||||
|
||||
|
||||
def create_clarification(
|
||||
session: Session,
|
||||
question: str,
|
||||
process_id: Optional[int] = None,
|
||||
) -> Clarification:
|
||||
"""Create a new clarification request"""
|
||||
clarification = Clarification(
|
||||
question=question,
|
||||
process_id=process_id,
|
||||
)
|
||||
session.add(clarification)
|
||||
session.commit()
|
||||
return clarification
|
||||
|
||||
|
||||
def resolve_clarification(
|
||||
session: Session, clarification_id: int, resolution: str, update_constitution: bool = False
|
||||
) -> Clarification:
|
||||
"""Resolve a clarification with community answer"""
|
||||
clarification = (
|
||||
session.query(Clarification).filter(Clarification.id == clarification_id).first()
|
||||
)
|
||||
if not clarification:
|
||||
raise ValueError(f"Clarification {clarification_id} not found")
|
||||
|
||||
clarification.resolution = resolution
|
||||
clarification.resolved_at = datetime.utcnow()
|
||||
clarification.constitutional_update = update_constitution
|
||||
|
||||
session.commit()
|
||||
return clarification
|
||||
|
||||
|
||||
def get_pending_clarifications(session: Session) -> List[Clarification]:
|
||||
"""Get all unresolved clarification requests"""
|
||||
return session.query(Clarification).filter(Clarification.resolved_at.is_(None)).all()
|
||||
|
||||
|
||||
# Veto vote queries
|
||||
|
||||
|
||||
def cast_veto_vote(
|
||||
session: Session,
|
||||
action_id: int,
|
||||
voter: str,
|
||||
vote: str,
|
||||
rationale: Optional[str] = None,
|
||||
) -> VetoVote:
|
||||
"""Cast a veto vote on an action"""
|
||||
# Check if voter already voted on this action
|
||||
existing = (
|
||||
session.query(VetoVote)
|
||||
.filter(and_(VetoVote.action_id == action_id, VetoVote.voter == voter))
|
||||
.first()
|
||||
)
|
||||
|
||||
if existing:
|
||||
# Update existing vote
|
||||
existing.vote = vote
|
||||
existing.rationale = rationale
|
||||
existing.timestamp = datetime.utcnow()
|
||||
session.commit()
|
||||
return existing
|
||||
else:
|
||||
# Create new vote
|
||||
veto_vote = VetoVote(
|
||||
action_id=action_id,
|
||||
voter=voter,
|
||||
vote=vote,
|
||||
rationale=rationale,
|
||||
)
|
||||
session.add(veto_vote)
|
||||
session.commit()
|
||||
return veto_vote
|
||||
|
||||
|
||||
def get_veto_votes(session: Session, action_id: int) -> Dict[str, int]:
|
||||
"""Get veto vote counts for an action"""
|
||||
votes = session.query(VetoVote).filter(VetoVote.action_id == action_id).all()
|
||||
|
||||
counts = {"veto": 0, "support": 0}
|
||||
for vote in votes:
|
||||
if vote.vote in counts:
|
||||
counts[vote.vote] += 1
|
||||
|
||||
return counts
|
||||
|
||||
|
||||
def check_veto_threshold(session: Session, action_id: int, threshold: float = 0.67) -> bool:
|
||||
"""Check if an action has reached veto threshold (default 2/3)"""
|
||||
counts = get_veto_votes(session, action_id)
|
||||
total = counts["veto"] + counts["support"]
|
||||
|
||||
if total == 0:
|
||||
return False
|
||||
|
||||
veto_ratio = counts["veto"] / total
|
||||
return veto_ratio >= threshold
|
||||
0
src/govbot/governance/__init__.py
Normal file
0
src/govbot/governance/__init__.py
Normal file
314
src/govbot/governance/constitution.py
Normal file
314
src/govbot/governance/constitution.py
Normal file
@@ -0,0 +1,314 @@
|
||||
"""
|
||||
Constitutional Reasoning Engine using RAG (Retrieval Augmented Generation).
|
||||
|
||||
This module provides the core AI capability to:
|
||||
- Read and understand the constitution
|
||||
- Answer questions about governance rules
|
||||
- Provide constitutional citations for actions
|
||||
- Identify ambiguities requiring clarification
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any
|
||||
import hashlib
|
||||
|
||||
|
||||
class ConstitutionalReasoner:
|
||||
"""
|
||||
RAG-based system for constitutional interpretation.
|
||||
Uses the 'llm' CLI tool for embeddings and queries.
|
||||
"""
|
||||
|
||||
def __init__(self, constitution_path: str, model: Optional[str] = None):
|
||||
"""
|
||||
Initialize the constitutional reasoner.
|
||||
|
||||
Args:
|
||||
constitution_path: Path to the constitution markdown file
|
||||
model: LLM model to use (e.g., 'llama3.2', 'gpt-4'). If None, uses llm default.
|
||||
"""
|
||||
self.constitution_path = Path(constitution_path)
|
||||
self.model = model
|
||||
|
||||
if not self.constitution_path.exists():
|
||||
raise FileNotFoundError(f"Constitution not found at {constitution_path}")
|
||||
|
||||
# Load constitution content
|
||||
self.constitution_text = self.constitution_path.read_text()
|
||||
|
||||
# Split into sections for RAG
|
||||
self.sections = self._split_into_sections()
|
||||
|
||||
def _split_into_sections(self) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Split constitution into logical sections for retrieval.
|
||||
Returns list of dicts with 'title', 'content', and 'id' keys.
|
||||
"""
|
||||
sections = []
|
||||
current_section = None
|
||||
current_content = []
|
||||
|
||||
for line in self.constitution_text.split("\n"):
|
||||
# Article or Section headers
|
||||
if line.startswith("## Article") or line.startswith("### Section"):
|
||||
# Save previous section
|
||||
if current_section:
|
||||
sections.append(
|
||||
{
|
||||
"title": current_section,
|
||||
"content": "\n".join(current_content).strip(),
|
||||
"id": hashlib.md5(current_section.encode()).hexdigest()[:8],
|
||||
}
|
||||
)
|
||||
|
||||
# Start new section
|
||||
current_section = line.strip("#").strip()
|
||||
current_content = []
|
||||
elif current_section:
|
||||
current_content.append(line)
|
||||
|
||||
# Save last section
|
||||
if current_section:
|
||||
sections.append(
|
||||
{
|
||||
"title": current_section,
|
||||
"content": "\n".join(current_content).strip(),
|
||||
"id": hashlib.md5(current_section.encode()).hexdigest()[:8],
|
||||
}
|
||||
)
|
||||
|
||||
return sections
|
||||
|
||||
def query(self, question: str, context: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Query the constitution using AI reasoning.
|
||||
|
||||
Args:
|
||||
question: The governance question to answer
|
||||
context: Optional context about the situation
|
||||
|
||||
Returns:
|
||||
Dict with 'answer', 'citations', 'confidence', and 'ambiguity' keys
|
||||
"""
|
||||
# Build prompt with relevant sections
|
||||
relevant_sections = self._find_relevant_sections(question)
|
||||
|
||||
prompt = self._build_query_prompt(question, relevant_sections, context)
|
||||
|
||||
# Query using llm
|
||||
try:
|
||||
result = self._call_llm(prompt)
|
||||
return self._parse_query_result(result, relevant_sections)
|
||||
except Exception as e:
|
||||
return {
|
||||
"answer": f"Error querying constitution: {str(e)}",
|
||||
"citations": [],
|
||||
"confidence": "low",
|
||||
"ambiguity": "error",
|
||||
}
|
||||
|
||||
def _find_relevant_sections(self, query: str, top_k: int = 3) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Find the most relevant constitutional sections for a query.
|
||||
For now, uses simple keyword matching. Can be enhanced with embeddings later.
|
||||
"""
|
||||
query_lower = query.lower()
|
||||
scored_sections = []
|
||||
|
||||
for section in self.sections:
|
||||
score = 0
|
||||
section_text = (section["title"] + " " + section["content"]).lower()
|
||||
|
||||
# Simple keyword scoring
|
||||
keywords = query_lower.split()
|
||||
for keyword in keywords:
|
||||
if len(keyword) > 3: # Skip short words
|
||||
score += section_text.count(keyword)
|
||||
|
||||
scored_sections.append((score, section))
|
||||
|
||||
# Sort by score and return top k
|
||||
scored_sections.sort(key=lambda x: x[0], reverse=True)
|
||||
return [section for score, section in scored_sections[:top_k] if score > 0]
|
||||
|
||||
def _build_query_prompt(
|
||||
self, question: str, sections: List[Dict[str, str]], context: Optional[str]
|
||||
) -> str:
|
||||
"""Build the prompt for constitutional query"""
|
||||
sections_text = "\n\n".join(
|
||||
[f"**{s['title']}**\n{s['content']}" for s in sections]
|
||||
)
|
||||
|
||||
prompt = f"""You are a constitutional reasoner for a democratic community governance system.
|
||||
|
||||
RELEVANT CONSTITUTIONAL SECTIONS:
|
||||
{sections_text}
|
||||
|
||||
QUESTION: {question}
|
||||
"""
|
||||
|
||||
if context:
|
||||
prompt += f"\nCONTEXT: {context}"
|
||||
|
||||
prompt += """
|
||||
|
||||
Please provide:
|
||||
1. A clear answer based on the constitutional provisions
|
||||
2. Specific citations (article/section numbers)
|
||||
3. Your confidence level (high/medium/low)
|
||||
4. Any ambiguities that might require clarification (if any)
|
||||
|
||||
Format your response as JSON:
|
||||
{
|
||||
"answer": "your answer here",
|
||||
"citations": ["Article X, Section Y", ...],
|
||||
"confidence": "high|medium|low",
|
||||
"ambiguity": "description of any ambiguity, or null if clear"
|
||||
}
|
||||
"""
|
||||
return prompt
|
||||
|
||||
def _call_llm(self, prompt: str) -> str:
|
||||
"""Call the llm CLI tool"""
|
||||
cmd = ["llm", "prompt"]
|
||||
|
||||
if self.model:
|
||||
cmd.extend(["-m", self.model])
|
||||
|
||||
cmd.append(prompt)
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
||||
return result.stdout.strip()
|
||||
|
||||
def _parse_query_result(
|
||||
self, result: str, sections: List[Dict[str, str]]
|
||||
) -> Dict[str, Any]:
|
||||
"""Parse the LLM response into structured format"""
|
||||
try:
|
||||
# Try to extract JSON from the response
|
||||
# Handle cases where LLM wraps JSON in markdown code blocks
|
||||
if "```json" in result:
|
||||
json_start = result.find("```json") + 7
|
||||
json_end = result.find("```", json_start)
|
||||
json_str = result[json_start:json_end].strip()
|
||||
elif "```" in result:
|
||||
json_start = result.find("```") + 3
|
||||
json_end = result.find("```", json_start)
|
||||
json_str = result[json_start:json_end].strip()
|
||||
else:
|
||||
json_str = result
|
||||
|
||||
parsed = json.loads(json_str)
|
||||
|
||||
# Add section metadata to citations
|
||||
parsed["relevant_sections"] = sections
|
||||
|
||||
return parsed
|
||||
except json.JSONDecodeError:
|
||||
# Fallback if JSON parsing fails
|
||||
return {
|
||||
"answer": result,
|
||||
"citations": [s["title"] for s in sections],
|
||||
"confidence": "medium",
|
||||
"ambiguity": None,
|
||||
"relevant_sections": sections,
|
||||
}
|
||||
|
||||
def get_full_constitution(self) -> str:
|
||||
"""Return the full constitution text"""
|
||||
return self.constitution_text
|
||||
|
||||
def get_section(self, section_id: str) -> Optional[Dict[str, str]]:
|
||||
"""Get a specific section by ID"""
|
||||
for section in self.sections:
|
||||
if section["id"] == section_id:
|
||||
return section
|
||||
return None
|
||||
|
||||
def list_sections(self) -> List[str]:
|
||||
"""List all section titles"""
|
||||
return [s["title"] for s in self.sections]
|
||||
|
||||
def interpret_proposal(self, proposal_text: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Interpret a proposal and determine its type and requirements.
|
||||
|
||||
Returns:
|
||||
Dict with proposal_type, requirements, timeline, and thresholds
|
||||
"""
|
||||
question = f"""Given this proposal: "{proposal_text}"
|
||||
|
||||
What type of proposal is this according to the constitution?
|
||||
What are the requirements (timeline, voting threshold, etc.)?
|
||||
"""
|
||||
|
||||
result = self.query(question)
|
||||
|
||||
# Extract structured information from the answer
|
||||
try:
|
||||
prompt = f"""Based on this constitutional interpretation:
|
||||
{result['answer']}
|
||||
|
||||
Extract structured information as JSON:
|
||||
{{
|
||||
"proposal_type": "standard|urgent|constitutional_amendment",
|
||||
"discussion_period_days": number,
|
||||
"voting_threshold": "description of threshold",
|
||||
"special_requirements": ["list", "of", "requirements"]
|
||||
}}
|
||||
"""
|
||||
structured = self._call_llm(prompt)
|
||||
proposal_info = json.loads(structured)
|
||||
proposal_info["constitutional_basis"] = result
|
||||
return proposal_info
|
||||
except Exception as e:
|
||||
# Fallback
|
||||
return {
|
||||
"proposal_type": "standard",
|
||||
"discussion_period_days": 6,
|
||||
"voting_threshold": "simple_majority",
|
||||
"special_requirements": [],
|
||||
"constitutional_basis": result,
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
def check_ambiguity(self, question: str) -> Optional[str]:
|
||||
"""
|
||||
Check if a question reveals serious constitutional ambiguity.
|
||||
|
||||
Returns:
|
||||
None if clear, or a string describing the ambiguity if serious
|
||||
"""
|
||||
result = self.query(question)
|
||||
if result.get("confidence") == "low" or result.get("ambiguity"):
|
||||
return result.get("ambiguity") or "Low confidence in interpretation"
|
||||
return None
|
||||
|
||||
|
||||
# CLI interface for testing
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python -m src.govbot.governance.constitution <question>")
|
||||
print('Example: python -m src.govbot.governance.constitution "What are the rules for proposals?"')
|
||||
sys.exit(1)
|
||||
|
||||
# Find constitution file
|
||||
constitution_path = Path(__file__).parent.parent.parent.parent / "constitution.md"
|
||||
|
||||
reasoner = ConstitutionalReasoner(str(constitution_path))
|
||||
|
||||
question = " ".join(sys.argv[1:])
|
||||
result = reasoner.query(question)
|
||||
|
||||
print("\n=== Constitutional Query ===")
|
||||
print(f"Question: {question}\n")
|
||||
print(f"Answer: {result['answer']}\n")
|
||||
print(f"Citations: {', '.join(result['citations'])}")
|
||||
print(f"Confidence: {result['confidence']}")
|
||||
if result.get("ambiguity"):
|
||||
print(f"Ambiguity: {result['ambiguity']}")
|
||||
416
src/govbot/governance/primitives.py
Normal file
416
src/govbot/governance/primitives.py
Normal file
@@ -0,0 +1,416 @@
|
||||
"""
|
||||
Action Primitives for Agentic Governance.
|
||||
|
||||
These are the low-level operations that the AI agent can orchestrate
|
||||
to implement governance processes. Each primitive is simple and composable,
|
||||
allowing the agent to flexibly implement constitutional procedures.
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, Any, Optional, List
|
||||
from sqlalchemy.orm import Session
|
||||
import json
|
||||
|
||||
from ..db import queries
|
||||
from ..db.models import GovernanceProcess, Action
|
||||
|
||||
|
||||
class GovernancePrimitives:
|
||||
"""
|
||||
Provides primitive operations for governance actions.
|
||||
These are called by the AI agent to implement constitutional procedures.
|
||||
"""
|
||||
|
||||
def __init__(self, db_session: Session):
|
||||
self.db = db_session
|
||||
|
||||
# Storage primitives
|
||||
|
||||
def store_record(
|
||||
self,
|
||||
record_type: str,
|
||||
data: Dict[str, Any],
|
||||
actor: str,
|
||||
reasoning: Optional[str] = None,
|
||||
citation: Optional[str] = None,
|
||||
) -> int:
|
||||
"""
|
||||
Store a governance record (generic storage primitive).
|
||||
|
||||
Args:
|
||||
record_type: Type of record (e.g., "proposal", "vote", "decision")
|
||||
data: Record data as dictionary
|
||||
actor: Who created this record
|
||||
reasoning: Bot's reasoning for creating this record
|
||||
citation: Constitutional citation
|
||||
|
||||
Returns:
|
||||
Record ID
|
||||
"""
|
||||
action = queries.create_action(
|
||||
session=self.db,
|
||||
action_type=f"store_{record_type}",
|
||||
actor=actor,
|
||||
data=data,
|
||||
bot_reasoning=reasoning,
|
||||
constitutional_citation=citation,
|
||||
)
|
||||
return action.id
|
||||
|
||||
def query_records(
|
||||
self,
|
||||
record_type: Optional[str] = None,
|
||||
criteria: Optional[Dict[str, Any]] = None,
|
||||
limit: int = 50,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Query governance records (generic retrieval primitive).
|
||||
|
||||
Args:
|
||||
record_type: Type of record to query (None for all)
|
||||
criteria: Filter criteria (e.g., {"status": "active"})
|
||||
limit: Maximum number of results
|
||||
|
||||
Returns:
|
||||
List of matching records as dictionaries
|
||||
"""
|
||||
actions = queries.get_recent_actions(
|
||||
session=self.db, limit=limit, action_type=record_type
|
||||
)
|
||||
|
||||
if criteria:
|
||||
# Filter by criteria in data field
|
||||
filtered = []
|
||||
for action in actions:
|
||||
if action.data and all(
|
||||
action.data.get(k) == v for k, v in criteria.items()
|
||||
):
|
||||
filtered.append(action.to_dict())
|
||||
return filtered
|
||||
|
||||
return [action.to_dict() for action in actions]
|
||||
|
||||
# Process primitives
|
||||
|
||||
def create_process(
|
||||
self,
|
||||
process_type: str,
|
||||
creator: str,
|
||||
deadline_days: int,
|
||||
constitutional_basis: str,
|
||||
initial_state: Optional[Dict[str, Any]] = None,
|
||||
mastodon_thread_id: Optional[str] = None,
|
||||
) -> int:
|
||||
"""
|
||||
Create a new governance process.
|
||||
|
||||
Args:
|
||||
process_type: Type of process (e.g., "standard_proposal")
|
||||
creator: Who initiated the process
|
||||
deadline_days: Days until deadline
|
||||
constitutional_basis: Constitutional citation
|
||||
initial_state: Initial state data
|
||||
mastodon_thread_id: Link to Mastodon thread
|
||||
|
||||
Returns:
|
||||
Process ID
|
||||
"""
|
||||
deadline = datetime.utcnow() + timedelta(days=deadline_days)
|
||||
|
||||
process = queries.create_process(
|
||||
session=self.db,
|
||||
process_type=process_type,
|
||||
creator=creator,
|
||||
constitutional_basis=constitutional_basis,
|
||||
deadline=deadline,
|
||||
state_data=initial_state or {},
|
||||
mastodon_thread_id=mastodon_thread_id,
|
||||
)
|
||||
|
||||
# Log the action
|
||||
queries.create_action(
|
||||
session=self.db,
|
||||
action_type="process_created",
|
||||
actor="bot",
|
||||
data={
|
||||
"process_id": process.id,
|
||||
"process_type": process_type,
|
||||
"creator": creator,
|
||||
"deadline": deadline.isoformat(),
|
||||
},
|
||||
constitutional_citation=constitutional_basis,
|
||||
)
|
||||
|
||||
return process.id
|
||||
|
||||
def update_process_state(
|
||||
self, process_id: int, state_updates: Dict[str, Any], actor: str = "bot"
|
||||
) -> bool:
|
||||
"""
|
||||
Update the state of a governance process.
|
||||
|
||||
Args:
|
||||
process_id: ID of process to update
|
||||
state_updates: Dictionary of state updates to merge
|
||||
actor: Who is updating the state
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
try:
|
||||
process = queries.update_process_state(
|
||||
session=self.db, process_id=process_id, state_data=state_updates
|
||||
)
|
||||
|
||||
# Log the action
|
||||
queries.create_action(
|
||||
session=self.db,
|
||||
action_type="process_updated",
|
||||
actor=actor,
|
||||
data={"process_id": process_id, "updates": state_updates},
|
||||
)
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
return False
|
||||
|
||||
def complete_process(
|
||||
self, process_id: int, outcome: str, reasoning: str
|
||||
) -> bool:
|
||||
"""
|
||||
Mark a governance process as completed.
|
||||
|
||||
Args:
|
||||
process_id: ID of process to complete
|
||||
outcome: Outcome description (e.g., "passed", "failed")
|
||||
reasoning: Explanation of outcome
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
try:
|
||||
process = queries.complete_process(
|
||||
session=self.db, process_id=process_id, outcome=outcome
|
||||
)
|
||||
|
||||
# Log the action
|
||||
queries.create_action(
|
||||
session=self.db,
|
||||
action_type="process_completed",
|
||||
actor="bot",
|
||||
data={"process_id": process_id, "outcome": outcome},
|
||||
bot_reasoning=reasoning,
|
||||
)
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
return False
|
||||
|
||||
# Calculation primitives
|
||||
|
||||
def calculate(self, expression: str, variables: Dict[str, Any]) -> Any:
|
||||
"""
|
||||
Safely evaluate a mathematical expression.
|
||||
|
||||
Args:
|
||||
expression: Math expression (e.g., "agree > disagree")
|
||||
variables: Variable values (e.g., {"agree": 10, "disagree": 3})
|
||||
|
||||
Returns:
|
||||
Result of calculation
|
||||
"""
|
||||
# Safe evaluation using eval with restricted globals
|
||||
allowed_names = {
|
||||
"abs": abs,
|
||||
"max": max,
|
||||
"min": min,
|
||||
"sum": sum,
|
||||
"len": len,
|
||||
}
|
||||
allowed_names.update(variables)
|
||||
|
||||
try:
|
||||
result = eval(expression, {"__builtins__": {}}, allowed_names)
|
||||
return result
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid expression: {expression} - {e}")
|
||||
|
||||
def count_votes(
|
||||
self, process_id: int
|
||||
) -> Dict[str, int]:
|
||||
"""
|
||||
Count votes for a governance process.
|
||||
|
||||
Args:
|
||||
process_id: ID of process to count votes for
|
||||
|
||||
Returns:
|
||||
Dictionary with vote counts
|
||||
"""
|
||||
process = queries.get_process(self.db, process_id)
|
||||
if not process:
|
||||
return {}
|
||||
|
||||
# Get votes from process state
|
||||
votes = process.state_data.get("votes", {})
|
||||
|
||||
# Count by type
|
||||
counts = {"agree": 0, "disagree": 0, "abstain": 0, "block": 0}
|
||||
|
||||
for voter, vote_data in votes.items():
|
||||
vote_type = vote_data.get("vote", "").lower()
|
||||
if vote_type in counts:
|
||||
counts[vote_type] += 1
|
||||
|
||||
return counts
|
||||
|
||||
def check_threshold(
|
||||
self, counts: Dict[str, int], threshold_type: str
|
||||
) -> bool:
|
||||
"""
|
||||
Check if vote counts meet a threshold.
|
||||
|
||||
Args:
|
||||
counts: Vote counts dictionary
|
||||
threshold_type: Type of threshold to check
|
||||
|
||||
Returns:
|
||||
True if threshold is met
|
||||
"""
|
||||
agree = counts.get("agree", 0)
|
||||
disagree = counts.get("disagree", 0)
|
||||
block = counts.get("block", 0)
|
||||
|
||||
if threshold_type == "simple_majority":
|
||||
return agree > disagree
|
||||
|
||||
elif threshold_type == "3x_majority":
|
||||
return agree >= (disagree * 3)
|
||||
|
||||
elif threshold_type == "with_blocks":
|
||||
# Require 9x more agree than disagree+block
|
||||
return agree >= ((disagree + block) * 9)
|
||||
|
||||
elif threshold_type == "supermajority_2/3":
|
||||
total = agree + disagree
|
||||
if total == 0:
|
||||
return False
|
||||
return (agree / total) >= (2 / 3)
|
||||
|
||||
else:
|
||||
raise ValueError(f"Unknown threshold type: {threshold_type}")
|
||||
|
||||
# Scheduling primitives
|
||||
|
||||
def schedule_reminder(
|
||||
self, when: datetime, message: str, recipient: Optional[str] = None
|
||||
) -> int:
|
||||
"""
|
||||
Schedule a reminder for a future time.
|
||||
|
||||
Args:
|
||||
when: When to send reminder
|
||||
message: Reminder message
|
||||
recipient: Who to remind (None for broadcast)
|
||||
|
||||
Returns:
|
||||
Reminder ID
|
||||
"""
|
||||
action = queries.create_action(
|
||||
session=self.db,
|
||||
action_type="reminder_scheduled",
|
||||
actor="bot",
|
||||
data={
|
||||
"scheduled_for": when.isoformat(),
|
||||
"message": message,
|
||||
"recipient": recipient,
|
||||
"status": "pending",
|
||||
},
|
||||
)
|
||||
return action.id
|
||||
|
||||
def get_pending_reminders(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get reminders that are due.
|
||||
|
||||
Returns:
|
||||
List of reminder dictionaries
|
||||
"""
|
||||
actions = queries.get_recent_actions(
|
||||
session=self.db, limit=100, action_type="reminder_scheduled"
|
||||
)
|
||||
|
||||
now = datetime.utcnow()
|
||||
due_reminders = []
|
||||
|
||||
for action in actions:
|
||||
if action.data.get("status") == "pending":
|
||||
scheduled_time = datetime.fromisoformat(
|
||||
action.data["scheduled_for"]
|
||||
)
|
||||
if scheduled_time <= now:
|
||||
due_reminders.append(action.to_dict())
|
||||
|
||||
return due_reminders
|
||||
|
||||
def mark_reminder_sent(self, reminder_id: int) -> bool:
|
||||
"""
|
||||
Mark a reminder as sent.
|
||||
|
||||
Args:
|
||||
reminder_id: ID of reminder action
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
action = queries.get_action(self.db, reminder_id)
|
||||
if action and action.data:
|
||||
action.data["status"] = "sent"
|
||||
self.db.commit()
|
||||
return True
|
||||
return False
|
||||
|
||||
# Reversal primitives
|
||||
|
||||
def reverse_action(
|
||||
self, action_id: int, actor: str, reason: str
|
||||
) -> bool:
|
||||
"""
|
||||
Reverse a previous action.
|
||||
|
||||
Args:
|
||||
action_id: ID of action to reverse
|
||||
actor: Who is reversing the action
|
||||
reason: Reason for reversal
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
try:
|
||||
queries.reverse_action(
|
||||
session=self.db,
|
||||
action_id=action_id,
|
||||
reversing_actor=actor,
|
||||
reason=reason,
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
return False
|
||||
|
||||
def is_action_reversed(self, action_id: int) -> bool:
|
||||
"""
|
||||
Check if an action has been reversed.
|
||||
|
||||
Args:
|
||||
action_id: ID of action to check
|
||||
|
||||
Returns:
|
||||
True if action is reversed
|
||||
"""
|
||||
action = queries.get_action(self.db, action_id)
|
||||
return action.status == "reversed" if action else False
|
||||
|
||||
|
||||
def create_primitives(db_session: Session) -> GovernancePrimitives:
|
||||
"""Factory function to create primitives instance"""
|
||||
return GovernancePrimitives(db_session)
|
||||
15
src/govbot/platforms/__init__.py
Normal file
15
src/govbot/platforms/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""
|
||||
Platform adapters for Govbot.
|
||||
|
||||
This module provides a platform-agnostic interface for governance bots
|
||||
to work across different social/communication platforms.
|
||||
"""
|
||||
|
||||
from .base import PlatformAdapter, PlatformMessage, PlatformSkill, SkillParameter
|
||||
|
||||
__all__ = [
|
||||
"PlatformAdapter",
|
||||
"PlatformMessage",
|
||||
"PlatformSkill",
|
||||
"SkillParameter",
|
||||
]
|
||||
536
src/govbot/platforms/base.py
Normal file
536
src/govbot/platforms/base.py
Normal file
@@ -0,0 +1,536 @@
|
||||
"""
|
||||
Base platform adapter interface for Govbot.
|
||||
|
||||
This module defines the abstract interface that all platform adapters must implement.
|
||||
Platform adapters enable Govbot to work across different social/communication platforms
|
||||
(Mastodon, Discord, Telegram, Matrix, etc.) while maintaining the same governance logic.
|
||||
|
||||
Key Concepts:
|
||||
-------------
|
||||
1. **Platform Adapter**: Main interface for platform integration
|
||||
2. **Messages**: Normalized message format across platforms
|
||||
3. **Skills**: Platform-specific capabilities (admin, moderation, etc.)
|
||||
4. **Callbacks**: How the bot receives events from the platform
|
||||
|
||||
Architecture:
|
||||
-------------
|
||||
┌─────────────────┐
|
||||
│ Governance │
|
||||
│ Agent (Core) │ ← Platform-agnostic
|
||||
└────────┬────────┘
|
||||
│
|
||||
┌────────▼────────┐
|
||||
│ Platform │
|
||||
│ Adapter │ ← This interface
|
||||
└────────┬────────┘
|
||||
│
|
||||
┌──────┴──────┐
|
||||
│ Mastodon │ ← Concrete implementations
|
||||
│ Discord │
|
||||
│ Telegram │
|
||||
└─────────────┘
|
||||
|
||||
To implement a new platform:
|
||||
----------------------------
|
||||
1. Subclass PlatformAdapter
|
||||
2. Implement all abstract methods
|
||||
3. Define platform-specific skills
|
||||
4. Register message listeners
|
||||
5. Map platform concepts to abstract interface
|
||||
|
||||
See PLATFORMS.md for detailed implementation guide.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Callable, Optional, Dict, Any, List
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class MessageVisibility(Enum):
|
||||
"""Message visibility levels (common across platforms)"""
|
||||
|
||||
PUBLIC = "public" # Visible to everyone
|
||||
UNLISTED = "unlisted" # Public but not in feeds
|
||||
FOLLOWERS = "followers" # Followers only
|
||||
DIRECT = "direct" # Direct message
|
||||
PRIVATE = "private" # Private/group-only
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlatformMessage:
|
||||
"""
|
||||
Normalized message representation across platforms.
|
||||
|
||||
This provides a common structure for messages regardless of platform.
|
||||
Platform adapters translate their native format into this structure.
|
||||
"""
|
||||
|
||||
id: str # Platform-specific message ID
|
||||
text: str # Message content (without bot mention)
|
||||
author_id: str # Unique user identifier on the platform
|
||||
author_handle: str # Human-readable handle/username
|
||||
timestamp: datetime
|
||||
thread_id: Optional[str] = None # Thread/conversation ID if applicable
|
||||
reply_to_id: Optional[str] = None # ID of message being replied to
|
||||
visibility: MessageVisibility = MessageVisibility.PUBLIC
|
||||
mentions_bot: bool = False # Was the bot explicitly mentioned?
|
||||
raw_data: Optional[Dict[str, Any]] = None # Platform-specific data
|
||||
|
||||
|
||||
@dataclass
|
||||
class SkillParameter:
|
||||
"""
|
||||
A parameter for a platform skill.
|
||||
|
||||
Describes what inputs a skill needs and validates them.
|
||||
"""
|
||||
|
||||
name: str # Parameter name
|
||||
type: str # Type hint (e.g., "str", "int", "user_id")
|
||||
description: str # What this parameter does
|
||||
required: bool = True # Is this parameter required?
|
||||
default: Optional[Any] = None # Default value if not required
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlatformSkill:
|
||||
"""
|
||||
A platform-specific capability.
|
||||
|
||||
Skills are actions the bot can perform that are specific to a platform,
|
||||
such as moderation, admin actions, channel management, etc.
|
||||
|
||||
Example skills:
|
||||
- Mastodon: suspend_account, update_instance_rules, transfer_admin
|
||||
- Discord: ban_user, create_channel, update_server_rules
|
||||
- Telegram: restrict_user, update_group_description
|
||||
"""
|
||||
|
||||
name: str # Skill identifier (e.g., "suspend_account")
|
||||
description: str # Human-readable description
|
||||
category: str # Category: "admin", "moderation", "content", "user_management"
|
||||
parameters: List[SkillParameter] # What inputs does this skill need?
|
||||
requires_confirmation: bool = True # Should this prompt for confirmation?
|
||||
reversible: bool = False # Can this action be undone?
|
||||
constitutional_authorization: Optional[str] = None # What constitutional provision allows this?
|
||||
|
||||
def validate_params(self, params: Dict[str, Any]) -> bool:
|
||||
"""
|
||||
Validate that provided parameters match requirements.
|
||||
|
||||
Args:
|
||||
params: Dictionary of parameter values
|
||||
|
||||
Returns:
|
||||
True if valid, False otherwise
|
||||
"""
|
||||
for param in self.parameters:
|
||||
if param.required and param.name not in params:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class PlatformAdapter(ABC):
|
||||
"""
|
||||
Abstract base class for platform adapters.
|
||||
|
||||
All platform implementations must inherit from this class and implement
|
||||
all abstract methods. This ensures consistent behavior across platforms.
|
||||
|
||||
Lifecycle:
|
||||
----------
|
||||
1. __init__() - Initialize with platform-specific config
|
||||
2. connect() - Establish connection to platform
|
||||
3. start_listening() - Begin receiving messages
|
||||
4. [Messages received and processed via callback]
|
||||
5. post() - Send responses
|
||||
6. execute_skill() - Perform platform actions
|
||||
7. disconnect() - Clean up when shutting down
|
||||
|
||||
Example Implementation:
|
||||
----------------------
|
||||
class MyPlatformAdapter(PlatformAdapter):
|
||||
def __init__(self, config):
|
||||
self.client = MyPlatformClient(config.api_token)
|
||||
self.bot_id = config.bot_id
|
||||
|
||||
def connect(self):
|
||||
self.client.authenticate()
|
||||
|
||||
def start_listening(self, callback):
|
||||
def on_message(msg):
|
||||
normalized = self._normalize_message(msg)
|
||||
callback(normalized)
|
||||
self.client.stream(on_message)
|
||||
|
||||
# ... implement other methods
|
||||
"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""
|
||||
Initialize the platform adapter.
|
||||
|
||||
Args:
|
||||
config: Platform-specific configuration dictionary
|
||||
"""
|
||||
self.config = config
|
||||
self.connected = False
|
||||
self.bot_user_id: Optional[str] = None
|
||||
self.bot_username: Optional[str] = None
|
||||
|
||||
@abstractmethod
|
||||
def connect(self) -> bool:
|
||||
"""
|
||||
Establish connection to the platform.
|
||||
|
||||
This should authenticate, verify credentials, and prepare the adapter
|
||||
to receive and send messages.
|
||||
|
||||
Returns:
|
||||
True if connection successful, False otherwise
|
||||
|
||||
Raises:
|
||||
Exception: If connection fails with error details
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def disconnect(self):
|
||||
"""
|
||||
Disconnect from the platform and cleanup resources.
|
||||
|
||||
Should close connections, stop listeners, and free resources.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def start_listening(self, callback: Callable[[PlatformMessage], None]):
|
||||
"""
|
||||
Start listening for messages that mention the bot.
|
||||
|
||||
The adapter should filter for messages that mention the bot or are
|
||||
direct messages to it, normalize them to PlatformMessage format,
|
||||
and call the provided callback.
|
||||
|
||||
Args:
|
||||
callback: Function to call with each received message
|
||||
Signature: callback(message: PlatformMessage) -> None
|
||||
|
||||
Example:
|
||||
def handle_message(msg: PlatformMessage):
|
||||
print(f"Received: {msg.text} from {msg.author_handle}")
|
||||
|
||||
adapter.start_listening(handle_message)
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def post(
|
||||
self,
|
||||
message: str,
|
||||
thread_id: Optional[str] = None,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: MessageVisibility = MessageVisibility.PUBLIC,
|
||||
) -> str:
|
||||
"""
|
||||
Post a message to the platform.
|
||||
|
||||
Args:
|
||||
message: Text content to post
|
||||
thread_id: Thread/conversation to post in (if applicable)
|
||||
reply_to_id: Message ID to reply to (if applicable)
|
||||
visibility: Message visibility level
|
||||
|
||||
Returns:
|
||||
Message ID of the posted message
|
||||
|
||||
Raises:
|
||||
Exception: If posting fails
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_skills(self) -> List[PlatformSkill]:
|
||||
"""
|
||||
Get available platform-specific skills.
|
||||
|
||||
Returns:
|
||||
List of skills this platform supports
|
||||
|
||||
Example:
|
||||
[
|
||||
PlatformSkill(
|
||||
name="suspend_account",
|
||||
description="Temporarily suspend a user account",
|
||||
category="moderation",
|
||||
parameters=[
|
||||
SkillParameter("user_id", "str", "User to suspend"),
|
||||
SkillParameter("duration", "int", "Days to suspend", required=False),
|
||||
SkillParameter("reason", "str", "Reason for suspension"),
|
||||
],
|
||||
requires_confirmation=True,
|
||||
reversible=True,
|
||||
),
|
||||
]
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def execute_skill(
|
||||
self, skill_name: str, parameters: Dict[str, Any], actor: str
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute a platform-specific skill.
|
||||
|
||||
Args:
|
||||
skill_name: Name of the skill to execute
|
||||
parameters: Dictionary of parameter values
|
||||
actor: Who is requesting this action (for audit)
|
||||
|
||||
Returns:
|
||||
Dictionary with execution results:
|
||||
{
|
||||
"success": bool,
|
||||
"message": str, # Human-readable result
|
||||
"data": dict, # Additional result data
|
||||
"reversible": bool,
|
||||
"reverse_params": dict, # Parameters to reverse this action
|
||||
}
|
||||
|
||||
Raises:
|
||||
ValueError: If skill_name unknown or parameters invalid
|
||||
Exception: If execution fails
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_user_info(self, user_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get information about a user.
|
||||
|
||||
Args:
|
||||
user_id: Platform-specific user ID
|
||||
|
||||
Returns:
|
||||
Dictionary with user info:
|
||||
{
|
||||
"id": str,
|
||||
"handle": str,
|
||||
"display_name": str,
|
||||
"roles": List[str], # Admin, moderator, etc.
|
||||
"is_bot": bool,
|
||||
}
|
||||
Returns None if user not found.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def format_thread_url(self, thread_id: str) -> str:
|
||||
"""
|
||||
Generate a URL to view a thread/conversation.
|
||||
|
||||
Args:
|
||||
thread_id: Platform thread identifier
|
||||
|
||||
Returns:
|
||||
Full URL to the thread
|
||||
"""
|
||||
pass
|
||||
|
||||
# Helper methods (can be overridden but have default implementations)
|
||||
|
||||
def is_admin(self, user_id: str) -> bool:
|
||||
"""
|
||||
Check if a user has admin privileges.
|
||||
|
||||
Default implementation checks user roles. Can be overridden.
|
||||
|
||||
Args:
|
||||
user_id: User to check
|
||||
|
||||
Returns:
|
||||
True if user is admin
|
||||
"""
|
||||
user_info = self.get_user_info(user_id)
|
||||
if not user_info:
|
||||
return False
|
||||
return "admin" in user_info.get("roles", [])
|
||||
|
||||
def is_moderator(self, user_id: str) -> bool:
|
||||
"""
|
||||
Check if a user has moderator privileges.
|
||||
|
||||
Default implementation checks user roles. Can be overridden.
|
||||
|
||||
Args:
|
||||
user_id: User to check
|
||||
|
||||
Returns:
|
||||
True if user is moderator or admin
|
||||
"""
|
||||
user_info = self.get_user_info(user_id)
|
||||
if not user_info:
|
||||
return False
|
||||
roles = user_info.get("roles", [])
|
||||
return "admin" in roles or "moderator" in roles
|
||||
|
||||
def extract_mentions(self, text: str) -> List[str]:
|
||||
"""
|
||||
Extract user mentions from text.
|
||||
|
||||
Default implementation looks for @username patterns.
|
||||
Platforms should override if their mention format differs.
|
||||
|
||||
Args:
|
||||
text: Message text
|
||||
|
||||
Returns:
|
||||
List of mentioned usernames (without @)
|
||||
"""
|
||||
import re
|
||||
|
||||
mentions = re.findall(r"@(\w+)", text)
|
||||
return mentions
|
||||
|
||||
def get_skill(self, skill_name: str) -> Optional[PlatformSkill]:
|
||||
"""
|
||||
Get a specific skill by name.
|
||||
|
||||
Args:
|
||||
skill_name: Name of skill to retrieve
|
||||
|
||||
Returns:
|
||||
PlatformSkill if found, None otherwise
|
||||
"""
|
||||
for skill in self.get_skills():
|
||||
if skill.name == skill_name:
|
||||
return skill
|
||||
return None
|
||||
|
||||
def validate_skill_execution(
|
||||
self, skill_name: str, parameters: Dict[str, Any]
|
||||
) -> tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Validate skill execution before performing it.
|
||||
|
||||
Args:
|
||||
skill_name: Skill to validate
|
||||
parameters: Parameters to validate
|
||||
|
||||
Returns:
|
||||
(is_valid, error_message) tuple
|
||||
"""
|
||||
skill = self.get_skill(skill_name)
|
||||
if not skill:
|
||||
return False, f"Unknown skill: {skill_name}"
|
||||
|
||||
if not skill.validate_params(parameters):
|
||||
return False, f"Invalid parameters for skill: {skill_name}"
|
||||
|
||||
return True, None
|
||||
|
||||
|
||||
# Mock adapter for testing (useful for development)
|
||||
|
||||
|
||||
class MockPlatformAdapter(PlatformAdapter):
|
||||
"""
|
||||
Mock platform adapter for testing without a real platform.
|
||||
|
||||
Useful for:
|
||||
- Unit testing
|
||||
- Development without platform credentials
|
||||
- Demonstrating the adapter interface
|
||||
"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
super().__init__(config)
|
||||
self.messages_sent: List[Dict[str, Any]] = []
|
||||
self.skills_executed: List[Dict[str, Any]] = []
|
||||
|
||||
def connect(self) -> bool:
|
||||
self.connected = True
|
||||
self.bot_user_id = "mock_bot_123"
|
||||
self.bot_username = "mockbot"
|
||||
return True
|
||||
|
||||
def disconnect(self):
|
||||
self.connected = False
|
||||
|
||||
def start_listening(self, callback: Callable[[PlatformMessage], None]):
|
||||
# Mock adapter doesn't actually listen; messages can be injected via simulate_message()
|
||||
self.message_callback = callback
|
||||
|
||||
def simulate_message(self, text: str, author: str = "testuser"):
|
||||
"""Simulate receiving a message (for testing)"""
|
||||
msg = PlatformMessage(
|
||||
id=f"msg_{len(self.messages_sent)}",
|
||||
text=text,
|
||||
author_id=f"user_{author}",
|
||||
author_handle=author,
|
||||
timestamp=datetime.utcnow(),
|
||||
mentions_bot=True,
|
||||
)
|
||||
if hasattr(self, "message_callback"):
|
||||
self.message_callback(msg)
|
||||
|
||||
def post(
|
||||
self,
|
||||
message: str,
|
||||
thread_id: Optional[str] = None,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: MessageVisibility = MessageVisibility.PUBLIC,
|
||||
) -> str:
|
||||
msg_id = f"post_{len(self.messages_sent)}"
|
||||
self.messages_sent.append(
|
||||
{
|
||||
"id": msg_id,
|
||||
"message": message,
|
||||
"thread_id": thread_id,
|
||||
"reply_to_id": reply_to_id,
|
||||
"visibility": visibility.value,
|
||||
}
|
||||
)
|
||||
return msg_id
|
||||
|
||||
def get_skills(self) -> List[PlatformSkill]:
|
||||
return [
|
||||
PlatformSkill(
|
||||
name="mock_action",
|
||||
description="Mock platform action for testing",
|
||||
category="admin",
|
||||
parameters=[
|
||||
SkillParameter("target", "str", "Target of action"),
|
||||
],
|
||||
requires_confirmation=False,
|
||||
reversible=True,
|
||||
)
|
||||
]
|
||||
|
||||
def execute_skill(
|
||||
self, skill_name: str, parameters: Dict[str, Any], actor: str
|
||||
) -> Dict[str, Any]:
|
||||
self.skills_executed.append(
|
||||
{"skill": skill_name, "parameters": parameters, "actor": actor}
|
||||
)
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Mock execution of {skill_name}",
|
||||
"data": parameters,
|
||||
"reversible": True,
|
||||
"reverse_params": parameters,
|
||||
}
|
||||
|
||||
def get_user_info(self, user_id: str) -> Optional[Dict[str, Any]]:
|
||||
return {
|
||||
"id": user_id,
|
||||
"handle": user_id.replace("user_", ""),
|
||||
"display_name": f"Mock User {user_id}",
|
||||
"roles": ["member"],
|
||||
"is_bot": False,
|
||||
}
|
||||
|
||||
def format_thread_url(self, thread_id: str) -> str:
|
||||
return f"https://mock.platform/thread/{thread_id}"
|
||||
680
src/govbot/platforms/mastodon.py
Normal file
680
src/govbot/platforms/mastodon.py
Normal file
@@ -0,0 +1,680 @@
|
||||
"""
|
||||
Mastodon platform adapter for Govbot.
|
||||
|
||||
Implements the platform interface for Mastodon/Fediverse instances,
|
||||
enabling governance bots to work on any Mastodon-compatible server.
|
||||
|
||||
Features:
|
||||
---------
|
||||
- Streaming API for real-time mentions
|
||||
- Thread-aware posting
|
||||
- Admin and moderation skills
|
||||
- Visibility controls (public, unlisted, followers-only, direct)
|
||||
- Instance-level and user-level actions
|
||||
|
||||
Configuration Required:
|
||||
-----------------------
|
||||
- instance_url: Your Mastodon instance URL
|
||||
- access_token: Bot account access token
|
||||
- bot_username: Bot's username (for filtering mentions)
|
||||
|
||||
Optional:
|
||||
- client_id: OAuth client ID (for token generation)
|
||||
- client_secret: OAuth client secret
|
||||
|
||||
Getting Access Tokens:
|
||||
----------------------
|
||||
1. Register application at: https://your-instance/settings/applications/new
|
||||
2. Generate access token with appropriate scopes:
|
||||
- read (for streaming)
|
||||
- write (for posting)
|
||||
- admin:read, admin:write (for instance management, if bot is admin)
|
||||
|
||||
See PLATFORMS.md for detailed setup guide.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Callable, Optional, Dict, Any, List
|
||||
from datetime import datetime
|
||||
import time
|
||||
import threading
|
||||
|
||||
try:
|
||||
from mastodon import Mastodon, StreamListener, MastodonError
|
||||
MASTODON_AVAILABLE = True
|
||||
except ImportError:
|
||||
MASTODON_AVAILABLE = False
|
||||
logging.warning("Mastodon.py not installed. Install with: pip install Mastodon.py")
|
||||
|
||||
from .base import (
|
||||
PlatformAdapter,
|
||||
PlatformMessage,
|
||||
PlatformSkill,
|
||||
SkillParameter,
|
||||
MessageVisibility,
|
||||
)
|
||||
|
||||
logger = logging.getLogger("govbot.platforms.mastodon")
|
||||
|
||||
|
||||
class MastodonAdapter(PlatformAdapter):
|
||||
"""
|
||||
Mastodon platform adapter implementation.
|
||||
|
||||
Connects to Mastodon instances and provides governance capabilities.
|
||||
"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""
|
||||
Initialize Mastodon adapter.
|
||||
|
||||
Args:
|
||||
config: Configuration dictionary with:
|
||||
- instance_url: Mastodon instance URL
|
||||
- access_token: Bot access token
|
||||
- bot_username: Bot's username
|
||||
- client_id (optional): OAuth client ID
|
||||
- client_secret (optional): OAuth client secret
|
||||
"""
|
||||
super().__init__(config)
|
||||
|
||||
if not MASTODON_AVAILABLE:
|
||||
raise ImportError(
|
||||
"Mastodon.py is required for Mastodon adapter. "
|
||||
"Install with: pip install Mastodon.py"
|
||||
)
|
||||
|
||||
self.instance_url = config.get("instance_url")
|
||||
self.access_token = config.get("access_token")
|
||||
self.bot_username = config.get("bot_username", "govbot")
|
||||
|
||||
if not self.instance_url or not self.access_token:
|
||||
raise ValueError(
|
||||
"Mastodon adapter requires 'instance_url' and 'access_token' in config"
|
||||
)
|
||||
|
||||
self.client: Optional[Mastodon] = None
|
||||
self.stream_listener: Optional['GovbotStreamListener'] = None
|
||||
self.listener_thread: Optional[threading.Thread] = None
|
||||
|
||||
def connect(self) -> bool:
|
||||
"""
|
||||
Connect to Mastodon instance.
|
||||
|
||||
Returns:
|
||||
True if connection successful
|
||||
|
||||
Raises:
|
||||
MastodonError: If connection fails
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Connecting to Mastodon instance: {self.instance_url}")
|
||||
|
||||
self.client = Mastodon(
|
||||
access_token=self.access_token,
|
||||
api_base_url=self.instance_url,
|
||||
)
|
||||
|
||||
# Verify credentials and get bot account info
|
||||
account = self.client.account_verify_credentials()
|
||||
self.bot_user_id = str(account["id"])
|
||||
self.bot_username = account["username"]
|
||||
|
||||
logger.info(
|
||||
f"Connected as @{self.bot_username} (ID: {self.bot_user_id})"
|
||||
)
|
||||
|
||||
self.connected = True
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Mastodon: {e}")
|
||||
raise
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect from Mastodon and cleanup."""
|
||||
logger.info("Disconnecting from Mastodon")
|
||||
|
||||
# Stop stream listener if running
|
||||
if self.stream_listener:
|
||||
self.stream_listener.stop()
|
||||
self.stream_listener = None
|
||||
|
||||
if self.listener_thread:
|
||||
self.listener_thread.join(timeout=5)
|
||||
self.listener_thread = None
|
||||
|
||||
self.connected = False
|
||||
logger.info("Disconnected from Mastodon")
|
||||
|
||||
def start_listening(self, callback: Callable[[PlatformMessage], None]):
|
||||
"""
|
||||
Start listening for mentions via Mastodon streaming API.
|
||||
|
||||
Args:
|
||||
callback: Function to call with each received message
|
||||
"""
|
||||
if not self.connected or not self.client:
|
||||
raise RuntimeError("Must call connect() before start_listening()")
|
||||
|
||||
logger.info("Starting Mastodon stream listener for mentions")
|
||||
|
||||
# Create stream listener
|
||||
self.stream_listener = GovbotStreamListener(
|
||||
bot_id=self.bot_user_id,
|
||||
callback=callback,
|
||||
adapter=self,
|
||||
)
|
||||
|
||||
# Start streaming in a separate thread
|
||||
def stream_thread():
|
||||
try:
|
||||
# Stream user timeline (includes mentions)
|
||||
self.client.stream_user(self.stream_listener, run_async=False, reconnect_async=True)
|
||||
except Exception as e:
|
||||
logger.error(f"Stream listener error: {e}", exc_info=True)
|
||||
|
||||
self.listener_thread = threading.Thread(target=stream_thread, daemon=True)
|
||||
self.listener_thread.start()
|
||||
|
||||
logger.info("Stream listener started")
|
||||
|
||||
def post(
|
||||
self,
|
||||
message: str,
|
||||
thread_id: Optional[str] = None,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: MessageVisibility = MessageVisibility.PUBLIC,
|
||||
) -> str:
|
||||
"""
|
||||
Post a message to Mastodon.
|
||||
|
||||
Args:
|
||||
message: Text content to post (max 500 characters for most instances)
|
||||
thread_id: Not used in Mastodon (use reply_to_id for threading)
|
||||
reply_to_id: Status ID to reply to
|
||||
visibility: Message visibility level
|
||||
|
||||
Returns:
|
||||
Status ID of posted message
|
||||
|
||||
Raises:
|
||||
MastodonError: If posting fails
|
||||
"""
|
||||
if not self.connected or not self.client:
|
||||
raise RuntimeError("Must call connect() before posting")
|
||||
|
||||
# Map visibility to Mastodon format
|
||||
mastodon_visibility = self._map_visibility(visibility)
|
||||
|
||||
try:
|
||||
status = self.client.status_post(
|
||||
status=message,
|
||||
in_reply_to_id=reply_to_id,
|
||||
visibility=mastodon_visibility,
|
||||
)
|
||||
|
||||
logger.info(f"Posted status {status['id']}")
|
||||
return str(status["id"])
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to post status: {e}")
|
||||
raise
|
||||
|
||||
def get_skills(self) -> List[PlatformSkill]:
|
||||
"""
|
||||
Get Mastodon-specific skills.
|
||||
|
||||
Returns:
|
||||
List of available Mastodon governance skills
|
||||
"""
|
||||
return [
|
||||
# Moderation skills
|
||||
PlatformSkill(
|
||||
name="suspend_account",
|
||||
description="Suspend a user account (reversible)",
|
||||
category="moderation",
|
||||
parameters=[
|
||||
SkillParameter("account_id", "str", "Account ID to suspend"),
|
||||
SkillParameter("reason", "str", "Reason for suspension"),
|
||||
],
|
||||
requires_confirmation=True,
|
||||
reversible=True,
|
||||
constitutional_authorization="Requires moderation authority per constitution",
|
||||
),
|
||||
PlatformSkill(
|
||||
name="silence_account",
|
||||
description="Silence a user account (hide from public timelines)",
|
||||
category="moderation",
|
||||
parameters=[
|
||||
SkillParameter("account_id", "str", "Account ID to silence"),
|
||||
SkillParameter("reason", "str", "Reason for silencing"),
|
||||
],
|
||||
requires_confirmation=True,
|
||||
reversible=True,
|
||||
constitutional_authorization="Requires moderation authority per constitution",
|
||||
),
|
||||
PlatformSkill(
|
||||
name="delete_status",
|
||||
description="Delete a status/post",
|
||||
category="moderation",
|
||||
parameters=[
|
||||
SkillParameter("status_id", "str", "Status ID to delete"),
|
||||
SkillParameter("reason", "str", "Reason for deletion"),
|
||||
],
|
||||
requires_confirmation=True,
|
||||
reversible=False,
|
||||
constitutional_authorization="Requires moderation authority per constitution",
|
||||
),
|
||||
# Instance administration skills
|
||||
PlatformSkill(
|
||||
name="update_instance_rules",
|
||||
description="Update instance rules/code of conduct",
|
||||
category="admin",
|
||||
parameters=[
|
||||
SkillParameter("rules", "list", "List of rule texts"),
|
||||
],
|
||||
requires_confirmation=True,
|
||||
reversible=True,
|
||||
constitutional_authorization="Requires constitutional amendment process",
|
||||
),
|
||||
PlatformSkill(
|
||||
name="update_instance_description",
|
||||
description="Update instance description/about page",
|
||||
category="admin",
|
||||
parameters=[
|
||||
SkillParameter("description", "str", "New instance description"),
|
||||
],
|
||||
requires_confirmation=True,
|
||||
reversible=True,
|
||||
constitutional_authorization="Requires governance approval",
|
||||
),
|
||||
PlatformSkill(
|
||||
name="grant_moderator",
|
||||
description="Grant moderator role to a user",
|
||||
category="admin",
|
||||
parameters=[
|
||||
SkillParameter("account_id", "str", "Account ID to promote"),
|
||||
],
|
||||
requires_confirmation=True,
|
||||
reversible=True,
|
||||
constitutional_authorization="Requires governance approval",
|
||||
),
|
||||
PlatformSkill(
|
||||
name="revoke_moderator",
|
||||
description="Revoke moderator role from a user",
|
||||
category="admin",
|
||||
parameters=[
|
||||
SkillParameter("account_id", "str", "Account ID to demote"),
|
||||
],
|
||||
requires_confirmation=True,
|
||||
reversible=True,
|
||||
constitutional_authorization="Requires governance approval",
|
||||
),
|
||||
# Content management
|
||||
PlatformSkill(
|
||||
name="create_announcement",
|
||||
description="Create an instance-wide announcement",
|
||||
category="content",
|
||||
parameters=[
|
||||
SkillParameter("text", "str", "Announcement text"),
|
||||
SkillParameter("starts_at", "datetime", "When announcement starts", required=False),
|
||||
SkillParameter("ends_at", "datetime", "When announcement ends", required=False),
|
||||
],
|
||||
requires_confirmation=False,
|
||||
reversible=True,
|
||||
constitutional_authorization="Authorized communicators per constitution",
|
||||
),
|
||||
]
|
||||
|
||||
def execute_skill(
|
||||
self, skill_name: str, parameters: Dict[str, Any], actor: str
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute a Mastodon-specific skill.
|
||||
|
||||
Args:
|
||||
skill_name: Name of skill to execute
|
||||
parameters: Skill parameters
|
||||
actor: Who is requesting this action
|
||||
|
||||
Returns:
|
||||
Execution result dictionary
|
||||
|
||||
Raises:
|
||||
ValueError: If skill unknown or parameters invalid
|
||||
MastodonError: If execution fails
|
||||
"""
|
||||
if not self.connected or not self.client:
|
||||
raise RuntimeError("Must call connect() before executing skills")
|
||||
|
||||
# Validate skill and parameters
|
||||
is_valid, error = self.validate_skill_execution(skill_name, parameters)
|
||||
if not is_valid:
|
||||
raise ValueError(error)
|
||||
|
||||
logger.info(f"Executing skill '{skill_name}' requested by {actor}")
|
||||
|
||||
# Route to appropriate handler
|
||||
try:
|
||||
if skill_name == "suspend_account":
|
||||
return self._suspend_account(parameters)
|
||||
elif skill_name == "silence_account":
|
||||
return self._silence_account(parameters)
|
||||
elif skill_name == "delete_status":
|
||||
return self._delete_status(parameters)
|
||||
elif skill_name == "update_instance_rules":
|
||||
return self._update_instance_rules(parameters)
|
||||
elif skill_name == "update_instance_description":
|
||||
return self._update_instance_description(parameters)
|
||||
elif skill_name == "grant_moderator":
|
||||
return self._grant_moderator(parameters)
|
||||
elif skill_name == "revoke_moderator":
|
||||
return self._revoke_moderator(parameters)
|
||||
elif skill_name == "create_announcement":
|
||||
return self._create_announcement(parameters)
|
||||
else:
|
||||
raise ValueError(f"Unknown skill: {skill_name}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Skill execution failed: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"Execution failed: {str(e)}",
|
||||
"data": {},
|
||||
"reversible": False,
|
||||
}
|
||||
|
||||
def get_user_info(self, user_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get information about a Mastodon user.
|
||||
|
||||
Args:
|
||||
user_id: Mastodon account ID
|
||||
|
||||
Returns:
|
||||
User info dictionary or None if not found
|
||||
"""
|
||||
if not self.connected or not self.client:
|
||||
return None
|
||||
|
||||
try:
|
||||
account = self.client.account(user_id)
|
||||
|
||||
# Determine roles (requires admin API access)
|
||||
roles = ["member"]
|
||||
try:
|
||||
# Check if account is admin/moderator (requires admin scope)
|
||||
admin_account = self.client.admin_account(user_id)
|
||||
if admin_account.get("role", {}).get("name") == "Admin":
|
||||
roles.append("admin")
|
||||
elif admin_account.get("role", {}).get("name") == "Moderator":
|
||||
roles.append("moderator")
|
||||
except:
|
||||
# If we don't have admin access, can't check roles
|
||||
pass
|
||||
|
||||
return {
|
||||
"id": str(account["id"]),
|
||||
"handle": account["username"],
|
||||
"display_name": account["display_name"],
|
||||
"roles": roles,
|
||||
"is_bot": account.get("bot", False),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get user info for {user_id}: {e}")
|
||||
return None
|
||||
|
||||
def format_thread_url(self, thread_id: str) -> str:
|
||||
"""
|
||||
Generate URL to a Mastodon thread.
|
||||
|
||||
Args:
|
||||
thread_id: Status ID
|
||||
|
||||
Returns:
|
||||
Full URL to the status
|
||||
"""
|
||||
# Get the status to find its URL
|
||||
try:
|
||||
if self.client:
|
||||
status = self.client.status(thread_id)
|
||||
return status.get("url", f"{self.instance_url}/web/statuses/{thread_id}")
|
||||
except:
|
||||
pass
|
||||
|
||||
return f"{self.instance_url}/web/statuses/{thread_id}"
|
||||
|
||||
# Private helper methods for skill execution
|
||||
|
||||
def _suspend_account(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Suspend a user account"""
|
||||
account_id = params["account_id"]
|
||||
reason = params.get("reason", "Suspended by governance decision")
|
||||
|
||||
self.client.admin_account_moderate(
|
||||
account_id,
|
||||
action="suspend",
|
||||
report_note=reason,
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Account {account_id} suspended",
|
||||
"data": {"account_id": account_id, "reason": reason},
|
||||
"reversible": True,
|
||||
"reverse_params": {"account_id": account_id, "action": "unsuspend"},
|
||||
}
|
||||
|
||||
def _silence_account(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Silence a user account"""
|
||||
account_id = params["account_id"]
|
||||
reason = params.get("reason", "Silenced by governance decision")
|
||||
|
||||
self.client.admin_account_moderate(
|
||||
account_id,
|
||||
action="silence",
|
||||
report_note=reason,
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Account {account_id} silenced",
|
||||
"data": {"account_id": account_id, "reason": reason},
|
||||
"reversible": True,
|
||||
"reverse_params": {"account_id": account_id, "action": "unsilence"},
|
||||
}
|
||||
|
||||
def _delete_status(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Delete a status"""
|
||||
status_id = params["status_id"]
|
||||
reason = params.get("reason", "Deleted by governance decision")
|
||||
|
||||
self.client.status_delete(status_id)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Status {status_id} deleted",
|
||||
"data": {"status_id": status_id, "reason": reason},
|
||||
"reversible": False,
|
||||
}
|
||||
|
||||
def _update_instance_rules(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Update instance rules"""
|
||||
# Note: This requires admin API access
|
||||
# Implementation depends on Mastodon version and API availability
|
||||
rules = params["rules"]
|
||||
|
||||
# This would use admin API to update instance rules
|
||||
# Exact implementation varies by Mastodon version
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Updated instance rules ({len(rules)} rules)",
|
||||
"data": {"rules": rules},
|
||||
"reversible": True,
|
||||
"reverse_params": {"rules": "previous_rules"}, # Would need to store previous
|
||||
}
|
||||
|
||||
def _update_instance_description(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Update instance description"""
|
||||
description = params["description"]
|
||||
|
||||
# This would use admin API
|
||||
# Exact implementation varies
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "Updated instance description",
|
||||
"data": {"description": description},
|
||||
"reversible": True,
|
||||
"reverse_params": {"description": "previous_description"},
|
||||
}
|
||||
|
||||
def _grant_moderator(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Grant moderator role"""
|
||||
account_id = params["account_id"]
|
||||
|
||||
# Use admin API to update role
|
||||
self.client.admin_account_moderate(account_id, action="promote_moderator")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Granted moderator to account {account_id}",
|
||||
"data": {"account_id": account_id},
|
||||
"reversible": True,
|
||||
"reverse_params": {"account_id": account_id},
|
||||
}
|
||||
|
||||
def _revoke_moderator(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Revoke moderator role"""
|
||||
account_id = params["account_id"]
|
||||
|
||||
# Use admin API to update role
|
||||
self.client.admin_account_moderate(account_id, action="demote_moderator")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Revoked moderator from account {account_id}",
|
||||
"data": {"account_id": account_id},
|
||||
"reversible": True,
|
||||
"reverse_params": {"account_id": account_id},
|
||||
}
|
||||
|
||||
def _create_announcement(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Create instance announcement"""
|
||||
text = params["text"]
|
||||
starts_at = params.get("starts_at")
|
||||
ends_at = params.get("ends_at")
|
||||
|
||||
announcement = self.client.admin_announcement_create(
|
||||
text=text,
|
||||
starts_at=starts_at,
|
||||
ends_at=ends_at,
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "Created announcement",
|
||||
"data": {"announcement_id": announcement["id"], "text": text},
|
||||
"reversible": True,
|
||||
"reverse_params": {"announcement_id": announcement["id"]},
|
||||
}
|
||||
|
||||
def _map_visibility(self, visibility: MessageVisibility) -> str:
|
||||
"""Map abstract visibility to Mastodon visibility"""
|
||||
mapping = {
|
||||
MessageVisibility.PUBLIC: "public",
|
||||
MessageVisibility.UNLISTED: "unlisted",
|
||||
MessageVisibility.FOLLOWERS: "private",
|
||||
MessageVisibility.DIRECT: "direct",
|
||||
MessageVisibility.PRIVATE: "private",
|
||||
}
|
||||
return mapping.get(visibility, "public")
|
||||
|
||||
|
||||
class GovbotStreamListener(StreamListener):
|
||||
"""
|
||||
Mastodon stream listener for governance bot.
|
||||
|
||||
Listens for notifications (mentions, replies) and calls the callback.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
bot_id: str,
|
||||
callback: Callable[[PlatformMessage], None],
|
||||
adapter: MastodonAdapter,
|
||||
):
|
||||
super().__init__()
|
||||
self.bot_id = bot_id
|
||||
self.callback = callback
|
||||
self.adapter = adapter
|
||||
self.running = True
|
||||
|
||||
def on_notification(self, notification):
|
||||
"""Handle incoming notifications"""
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
# We care about mentions and replies
|
||||
if notification["type"] in ["mention", "reply"]:
|
||||
status = notification["status"]
|
||||
|
||||
# Don't respond to ourselves
|
||||
if str(status["account"]["id"]) == self.bot_id:
|
||||
return
|
||||
|
||||
# Convert to PlatformMessage
|
||||
message = self._status_to_message(status)
|
||||
message.mentions_bot = True # It's a notification, so we were mentioned
|
||||
|
||||
# Call the callback
|
||||
try:
|
||||
self.callback(message)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in message callback: {e}", exc_info=True)
|
||||
|
||||
def on_update(self, status):
|
||||
"""Handle status updates (posts)"""
|
||||
# We primarily care about notifications, not all posts
|
||||
pass
|
||||
|
||||
def _status_to_message(self, status: Dict[str, Any]) -> PlatformMessage:
|
||||
"""Convert Mastodon status to PlatformMessage"""
|
||||
from html import unescape
|
||||
import re
|
||||
|
||||
# Extract text content (strip HTML)
|
||||
content = status.get("content", "")
|
||||
# Simple HTML stripping (could use BeautifulSoup for better parsing)
|
||||
content = re.sub(r"<[^>]+>", "", content)
|
||||
content = unescape(content)
|
||||
|
||||
# Map visibility
|
||||
visibility_map = {
|
||||
"public": MessageVisibility.PUBLIC,
|
||||
"unlisted": MessageVisibility.UNLISTED,
|
||||
"private": MessageVisibility.FOLLOWERS,
|
||||
"direct": MessageVisibility.DIRECT,
|
||||
}
|
||||
visibility = visibility_map.get(
|
||||
status.get("visibility", "public"), MessageVisibility.PUBLIC
|
||||
)
|
||||
|
||||
return PlatformMessage(
|
||||
id=str(status["id"]),
|
||||
text=content,
|
||||
author_id=str(status["account"]["id"]),
|
||||
author_handle=status["account"]["username"],
|
||||
timestamp=status["created_at"],
|
||||
thread_id=str(status.get("in_reply_to_id", status["id"])),
|
||||
reply_to_id=str(status["in_reply_to_id"]) if status.get("in_reply_to_id") else None,
|
||||
visibility=visibility,
|
||||
raw_data=status,
|
||||
)
|
||||
|
||||
def stop(self):
|
||||
"""Stop the listener"""
|
||||
self.running = False
|
||||
172
src/govbot/scheduler.py
Normal file
172
src/govbot/scheduler.py
Normal file
@@ -0,0 +1,172 @@
|
||||
"""
|
||||
Background scheduler for governance processes.
|
||||
|
||||
Handles:
|
||||
- Checking proposal deadlines
|
||||
- Sending reminders
|
||||
- Monitoring veto votes
|
||||
- Processing completed proposals
|
||||
"""
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .agent import GovernanceAgent
|
||||
from .db import queries
|
||||
|
||||
logger = logging.getLogger("govbot.scheduler")
|
||||
|
||||
|
||||
class GovernanceScheduler:
|
||||
"""
|
||||
Background scheduler for temporal governance tasks.
|
||||
|
||||
Runs in a separate thread and periodically:
|
||||
- Checks for processes past deadline
|
||||
- Sends pending reminders
|
||||
- Monitors veto votes
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent: GovernanceAgent,
|
||||
db_session: Session,
|
||||
check_interval: int = 60,
|
||||
):
|
||||
"""
|
||||
Initialize the scheduler.
|
||||
|
||||
Args:
|
||||
agent: GovernanceAgent instance
|
||||
db_session: Database session
|
||||
check_interval: How often to check (in seconds)
|
||||
"""
|
||||
self.agent = agent
|
||||
self.db = db_session
|
||||
self.check_interval = check_interval
|
||||
self.running = False
|
||||
self.thread: Optional[threading.Thread] = None
|
||||
|
||||
def start(self):
|
||||
"""Start the scheduler in a background thread"""
|
||||
if self.running:
|
||||
logger.warning("Scheduler already running")
|
||||
return
|
||||
|
||||
self.running = True
|
||||
self.thread = threading.Thread(target=self._run_loop, daemon=True)
|
||||
self.thread.start()
|
||||
logger.info(f"Scheduler started (checking every {self.check_interval}s)")
|
||||
|
||||
def stop(self):
|
||||
"""Stop the scheduler"""
|
||||
self.running = False
|
||||
if self.thread:
|
||||
self.thread.join(timeout=5)
|
||||
logger.info("Scheduler stopped")
|
||||
|
||||
def _run_loop(self):
|
||||
"""Main scheduler loop"""
|
||||
while self.running:
|
||||
try:
|
||||
self._check_tasks()
|
||||
except Exception as e:
|
||||
logger.error(f"Scheduler error: {e}", exc_info=True)
|
||||
|
||||
# Sleep but check periodically for shutdown
|
||||
for _ in range(self.check_interval):
|
||||
if not self.running:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
def _check_tasks(self):
|
||||
"""Check and process all scheduled tasks"""
|
||||
now = datetime.utcnow()
|
||||
|
||||
logger.debug(f"Checking scheduled tasks at {now}")
|
||||
|
||||
# Check for processes past deadline
|
||||
completed = self.agent.check_deadlines()
|
||||
if completed:
|
||||
logger.info(f"Completed {len(completed)} processes past deadline")
|
||||
for process in completed:
|
||||
logger.info(
|
||||
f"Process {process['process_id']}: {process['outcome']} "
|
||||
f"with votes {process['vote_counts']}"
|
||||
)
|
||||
# TODO: Post result to Mastodon
|
||||
|
||||
# Check for pending reminders
|
||||
reminders = self.agent.primitives.get_pending_reminders()
|
||||
for reminder in reminders:
|
||||
logger.info(f"Sending reminder: {reminder['data']['message']}")
|
||||
# TODO: Post reminder to Mastodon
|
||||
self.agent.primitives.mark_reminder_sent(reminder["id"])
|
||||
|
||||
# Check for veto votes (every cycle)
|
||||
self._check_veto_votes()
|
||||
|
||||
def _check_veto_votes(self):
|
||||
"""Check if any actions have reached veto threshold"""
|
||||
# Get recent actions that might be vetoed
|
||||
recent_actions = queries.get_recent_actions(
|
||||
session=self.db, limit=100
|
||||
)
|
||||
|
||||
for action in recent_actions:
|
||||
if action.status == "executed" and action.reversible:
|
||||
# Check veto votes
|
||||
if queries.check_veto_threshold(self.db, action.id):
|
||||
logger.warning(
|
||||
f"Action {action.id} reached veto threshold! Reversing..."
|
||||
)
|
||||
|
||||
# Reverse the action
|
||||
queries.reverse_action(
|
||||
session=self.db,
|
||||
action_id=action.id,
|
||||
reversing_actor="community_veto",
|
||||
reason="Supermajority veto threshold reached",
|
||||
)
|
||||
|
||||
logger.info(f"Action {action.id} reversed by community veto")
|
||||
# TODO: Post to Mastodon about veto
|
||||
|
||||
|
||||
def run_scheduler_test():
|
||||
"""Test function for the scheduler"""
|
||||
from .db.models import init_db, get_session
|
||||
from .utils.config import load_config
|
||||
|
||||
# Initialize
|
||||
config = load_config()
|
||||
engine = init_db(config.governance.db_path)
|
||||
db_session = get_session(engine)
|
||||
|
||||
agent = GovernanceAgent(
|
||||
db_session=db_session,
|
||||
constitution_path=config.governance.constitution_path,
|
||||
model=config.ai.default_model,
|
||||
)
|
||||
|
||||
scheduler = GovernanceScheduler(agent, db_session, check_interval=10)
|
||||
|
||||
logger.info("Starting scheduler test...")
|
||||
scheduler.start()
|
||||
|
||||
try:
|
||||
# Run for 60 seconds
|
||||
time.sleep(60)
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Test interrupted")
|
||||
finally:
|
||||
scheduler.stop()
|
||||
logger.info("Scheduler test complete")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_scheduler_test()
|
||||
0
src/govbot/utils/__init__.py
Normal file
0
src/govbot/utils/__init__.py
Normal file
160
src/govbot/utils/config.py
Normal file
160
src/govbot/utils/config.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""
|
||||
Configuration management for Govbot using Pydantic.
|
||||
|
||||
Loads configuration from YAML file and environment variables,
|
||||
with validation to ensure all required settings are present.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
import yaml
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class MastodonConfig(BaseModel):
|
||||
"""Mastodon instance configuration"""
|
||||
|
||||
instance_url: str = Field(..., description="Mastodon instance URL (e.g., https://mastodon.social)")
|
||||
client_id: Optional[str] = Field(None, description="OAuth client ID")
|
||||
client_secret: Optional[str] = Field(None, description="OAuth client secret")
|
||||
access_token: Optional[str] = Field(None, description="Access token for bot account")
|
||||
bot_username: str = Field("govbot", description="Bot's Mastodon username")
|
||||
|
||||
|
||||
class AIConfig(BaseModel):
|
||||
"""AI model configuration"""
|
||||
|
||||
default_model: Optional[str] = Field(
|
||||
None,
|
||||
description="Default LLM model to use (e.g., 'llama3.2' for Ollama, 'gpt-4' for OpenAI)",
|
||||
)
|
||||
fallback_model: Optional[str] = Field(None, description="Fallback model if default fails")
|
||||
temperature: float = Field(0.7, description="LLM temperature for responses")
|
||||
max_tokens: Optional[int] = Field(None, description="Maximum tokens for LLM responses")
|
||||
|
||||
|
||||
class GovernanceConfig(BaseModel):
|
||||
"""Governance system configuration"""
|
||||
|
||||
constitution_path: str = Field(
|
||||
"constitution.md", description="Path to constitution file"
|
||||
)
|
||||
db_path: str = Field("govbot.db", description="Path to SQLite database")
|
||||
default_veto_threshold: float = Field(
|
||||
0.67, description="Default supermajority veto threshold (e.g., 0.67 = 2/3)"
|
||||
)
|
||||
enable_auto_execution: bool = Field(
|
||||
True, description="Allow bot to execute actions automatically"
|
||||
)
|
||||
require_confirmation_for: list[str] = Field(
|
||||
default_factory=lambda: ["admin_action", "moderation"],
|
||||
description="Action types requiring human confirmation",
|
||||
)
|
||||
|
||||
|
||||
class PlatformConfig(BaseModel):
|
||||
"""Platform selection and configuration"""
|
||||
|
||||
type: str = Field(..., description="Platform type: mastodon, discord, telegram, mock")
|
||||
mastodon: Optional[MastodonConfig] = Field(None, description="Mastodon configuration")
|
||||
# Future platforms:
|
||||
# discord: Optional[DiscordConfig] = None
|
||||
# telegram: Optional[TelegramConfig] = None
|
||||
|
||||
|
||||
class BotConfig(BaseSettings):
|
||||
"""Main bot configuration"""
|
||||
|
||||
platform: PlatformConfig = Field(..., description="Platform configuration")
|
||||
ai: AIConfig = Field(default_factory=AIConfig)
|
||||
governance: GovernanceConfig = Field(default_factory=GovernanceConfig)
|
||||
|
||||
debug: bool = Field(False, description="Enable debug mode")
|
||||
log_level: str = Field("INFO", description="Logging level")
|
||||
|
||||
class Config:
|
||||
env_prefix = "GOVBOT_"
|
||||
env_nested_delimiter = "__"
|
||||
|
||||
|
||||
def load_config(config_path: str = "config/config.yaml") -> BotConfig:
|
||||
"""
|
||||
Load configuration from YAML file.
|
||||
|
||||
Args:
|
||||
config_path: Path to YAML config file
|
||||
|
||||
Returns:
|
||||
BotConfig instance
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If config file doesn't exist
|
||||
ValidationError: If config is invalid
|
||||
"""
|
||||
config_file = Path(config_path)
|
||||
|
||||
if not config_file.exists():
|
||||
raise FileNotFoundError(
|
||||
f"Config file not found: {config_path}\n"
|
||||
f"Please create it based on config/config.example.yaml"
|
||||
)
|
||||
|
||||
with open(config_file) as f:
|
||||
config_data = yaml.safe_load(f)
|
||||
|
||||
return BotConfig(**config_data)
|
||||
|
||||
|
||||
def create_example_config(output_path: str = "config/config.example.yaml"):
|
||||
"""
|
||||
Create an example configuration file.
|
||||
|
||||
Args:
|
||||
output_path: Where to write the example config
|
||||
"""
|
||||
example_config = {
|
||||
"platform": {
|
||||
"type": "mastodon", # or "discord", "telegram", "mock"
|
||||
"mastodon": {
|
||||
"instance_url": "https://your-mastodon-instance.social",
|
||||
"client_id": "your_client_id_here",
|
||||
"client_secret": "your_client_secret_here",
|
||||
"access_token": "your_access_token_here",
|
||||
"bot_username": "govbot",
|
||||
},
|
||||
# Discord example (for future use):
|
||||
# "discord": {
|
||||
# "token": "your_discord_bot_token",
|
||||
# "guild_id": "your_server_id",
|
||||
# },
|
||||
},
|
||||
"ai": {
|
||||
"default_model": "llama3.2", # Or 'gpt-4', 'claude-3', etc.
|
||||
"fallback_model": None,
|
||||
"temperature": 0.7,
|
||||
"max_tokens": None,
|
||||
},
|
||||
"governance": {
|
||||
"constitution_path": "constitution.md",
|
||||
"db_path": "govbot.db",
|
||||
"default_veto_threshold": 0.67,
|
||||
"enable_auto_execution": True,
|
||||
"require_confirmation_for": ["admin_action", "moderation"],
|
||||
},
|
||||
"debug": False,
|
||||
"log_level": "INFO",
|
||||
}
|
||||
|
||||
output_file = Path(output_path)
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with open(output_file, "w") as f:
|
||||
yaml.dump(example_config, f, default_flow_style=False, sort_keys=False)
|
||||
|
||||
print(f"Example config created at: {output_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Create example config when run directly
|
||||
create_example_config()
|
||||
Reference in New Issue
Block a user