Implement working Mastodon bot with proposal system

Major Features:
- Mastodon integration with polling-based listener (streaming unreliable)
- Claude AI integration via llm CLI with API key support
- Public proposal announcements with voting
- Markdown stripping for Mastodon plain text
- Thread-aware voting system

Configuration:
- Added requirements.txt with all dependencies
- API key configuration in config.yaml (not streamed keys)
- Support for multiple Claude models via llm-anthropic

Platform Adapter (Mastodon):
- Polling notifications every 5 seconds (more reliable than streaming)
- Notification ID tracking to prevent re-processing on restart
- Markdown stripping for clean plain text output
- Vote thread matching via announcement IDs

Agent & Governance:
- Conversational tone (direct, concise, not legalistic)
- Proposal creation with AI-generated titles and descriptions
- Public announcements for proposals with all details
- Vote casting with automatic proposal detection from threads
- Constitutional reasoning for governance decisions

Bot Features:
- Long message splitting into threaded posts
- Public proposal announcements separate from user replies
- Announcement includes: title, proposer, description, deadline, voting instructions
- Vote tracking linked to proposal announcement threads

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Nathan Schneider
2026-02-06 22:26:42 -07:00
parent b636a805f9
commit 5fe22060e1
6 changed files with 351 additions and 39 deletions

View File

@@ -30,6 +30,7 @@ class GovernanceAgent:
db_session: Session,
constitution_path: str,
model: Optional[str] = None,
api_keys: Optional[Dict[str, str]] = None,
):
"""
Initialize the governance agent.
@@ -38,11 +39,13 @@ class GovernanceAgent:
db_session: Database session
constitution_path: Path to constitution file
model: LLM model to use (None for default)
api_keys: Dict with 'openai' and/or 'anthropic' API keys
"""
self.db = db_session
self.constitution = ConstitutionalReasoner(constitution_path, model)
self.constitution = ConstitutionalReasoner(constitution_path, model, api_keys)
self.primitives = GovernancePrimitives(db_session)
self.model = model
self.api_keys = api_keys or {}
def process_request(
self, request: str, actor: str, context: Optional[Dict[str, Any]] = None
@@ -203,6 +206,8 @@ Respond with JSON:
"constitutional_basis": str(constitutional_guidance.get("citations")),
"initial_state": {
"proposal_text": proposal_text,
"title": proposal_info.get("title", proposal_text[:100]),
"description": proposal_info.get("description", proposal_text),
"proposal_type": proposal_info["proposal_type"],
"voting_threshold": proposal_info.get("voting_threshold"),
"votes": {},
@@ -247,9 +252,27 @@ Process ID: {{process_id}}
vote_type = params.get("vote_type", "agree").lower()
process_id = params.get("process_id")
# If no process_id in params, try to find it from thread context
if not process_id and context:
# Get the status ID being replied to
reply_to_id = context.get("reply_to_id")
if reply_to_id:
# Query for active processes and check if any match this thread
active_processes = queries.get_active_processes(self.db)
for proc in active_processes:
if proc.state_data:
announcement_id = proc.state_data.get("announcement_thread_id")
if announcement_id and str(announcement_id) == str(reply_to_id):
process_id = proc.id
break
# If still not found, try the most recent active proposal
if not process_id and active_processes:
process_id = active_processes[0].id
if not process_id:
return {
"error": "Could not identify which proposal to vote on. Please reply to a proposal thread."
"error": "Could not identify which proposal to vote on. Please reply to a proposal announcement or specify the process ID."
}
plan = {
@@ -365,8 +388,12 @@ Plan the actions as JSON:
"actions": [
{{"primitive": "name", "args": {{...}}}}
],
"response_template": "Message to send user"
"response_template": "Message to send user (can use Markdown formatting)"
}}
TONE: Be direct, concise, and clear. Use short paragraphs with line breaks.
Avoid formal/legalistic language AND casual interjections (no "Hey!").
Professional but approachable. Get to the point quickly.
"""
try:
@@ -477,11 +504,11 @@ Plan the actions as JSON:
question=f"Ambiguity in request '{request}': {ambiguity}",
)
response = f"""I encountered constitutional ambiguity in processing your request.
response = f"""I found something unclear in the constitution regarding your request.
Question: {ambiguity}
Issue: {ambiguity}
This requires community clarification. Members can discuss and provide guidance.
This needs community clarification. Discussion welcome.
Clarification ID: {clarification.id}
"""
@@ -537,12 +564,21 @@ Clarification ID: {clarification.id}
def _call_llm(self, prompt: str) -> str:
"""Call the LLM via llm CLI"""
cmd = ["llm", "prompt"]
import os
cmd = ["llm"]
if self.model:
cmd.extend(["-m", self.model])
cmd.append(prompt)
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Set up environment with API keys
env = os.environ.copy()
if self.api_keys.get('openai'):
env['OPENAI_API_KEY'] = self.api_keys['openai']
if self.api_keys.get('anthropic'):
env['ANTHROPIC_API_KEY'] = self.api_keys['anthropic']
result = subprocess.run(cmd, capture_output=True, text=True, check=True, env=env)
return result.stdout.strip()
def _extract_json(self, text: str) -> Dict[str, Any]:

View File

@@ -82,10 +82,19 @@ class Govbot:
# Initialize AI agent
logger.info("Initializing AI agent...")
# Prepare API keys for agent
api_keys = {}
if self.config.ai.openai_api_key:
api_keys['openai'] = self.config.ai.openai_api_key
if self.config.ai.anthropic_api_key:
api_keys['anthropic'] = self.config.ai.anthropic_api_key
self.agent = GovernanceAgent(
db_session=self.db_session,
constitution_path=self.config.governance.constitution_path,
model=self.config.ai.default_model,
api_keys=api_keys if api_keys else None,
)
# Initialize scheduler for background tasks
@@ -197,17 +206,140 @@ class Govbot:
# Post response
response = result.get("response", "Sorry, I couldn't process that request.")
# Handle long responses by splitting into thread
try:
self.platform.post(
message=response,
self._post_response(
response=response,
reply_to_id=message.id,
thread_id=message.thread_id,
reply_to_id=message.id, # Reply to the message that mentioned us
visibility=message.visibility,
)
logger.info("Posted response")
except Exception as e:
logger.error(f"Failed to post response: {e}")
# If a proposal was created, post a public announcement
if result.get("success") and result.get("process_id"):
try:
self._announce_proposal(result.get("process_id"), message.visibility)
except Exception as e:
logger.error(f"Failed to post proposal announcement: {e}")
def _announce_proposal(self, process_id: int, visibility=None):
"""
Create a public announcement for a new proposal.
Args:
process_id: ID of the proposal process
visibility: Message visibility (defaults to PUBLIC for announcements)
"""
from .db import queries
from .platforms.base import MessageVisibility
# Get proposal details
process = queries.get_process(self.db_session, process_id)
if not process:
logger.warning(f"Could not find process {process_id} for announcement")
return
state = process.state_data
# Format announcement
announcement = f"""🗳️ NEW PROPOSAL
{state.get('title', 'Untitled Proposal')}
Proposed by: {process.creator}
{state.get('description', 'No description provided.')}
Voting deadline: {process.deadline.strftime('%Y-%m-%d %H:%M UTC') if process.deadline else 'TBD'}
To vote, reply with: agree, disagree, abstain, or block
Process ID: {process_id}
"""
# Post as public announcement (not a reply)
announcement_id = self.platform.post(
message=announcement,
visibility=visibility or MessageVisibility.PUBLIC,
)
logger.info(f"Posted public announcement for process {process_id}")
# Store the announcement thread ID in the process state for vote matching
from .db import queries
queries.update_process_state(
self.db_session,
process_id,
{"announcement_thread_id": announcement_id},
"system"
)
def _post_response(
self,
response: str,
reply_to_id: str,
thread_id: Optional[str] = None,
visibility=None,
max_length: int = 450,
):
"""
Post a response, splitting into thread if necessary.
Args:
response: Response text to post (plain text only)
reply_to_id: ID to reply to
thread_id: Thread context
visibility: Message visibility
max_length: Maximum characters per post (leaving room for indicators)
"""
# No conversion - Mastodon is plain text only
# Let the AI format with line breaks and simple text
# If response fits in one post, just send it
if len(response) <= max_length:
self.platform.post(
message=response,
thread_id=thread_id,
reply_to_id=reply_to_id,
visibility=visibility,
)
return
# Split into chunks
words = response.split()
chunks = []
current_chunk = []
current_length = 0
for word in words:
word_len = len(word) + 1 # +1 for space
if current_length + word_len > max_length and current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = [word]
current_length = word_len
else:
current_chunk.append(word)
current_length += word_len
if current_chunk:
chunks.append(" ".join(current_chunk))
# Post chunks as a thread
last_id = reply_to_id
for i, chunk in enumerate(chunks, 1):
# Add thread indicator
if len(chunks) > 1:
chunk = f"{chunk}\n\n[{i}/{len(chunks)}]"
last_id = self.platform.post(
message=chunk,
thread_id=thread_id,
reply_to_id=last_id,
visibility=visibility,
)
def process_mention(self, mention_text: str, author: str, thread_id: Optional[str] = None):
"""
Process a mention of the bot.

View File

@@ -22,16 +22,18 @@ class ConstitutionalReasoner:
Uses the 'llm' CLI tool for embeddings and queries.
"""
def __init__(self, constitution_path: str, model: Optional[str] = None):
def __init__(self, constitution_path: str, model: Optional[str] = None, api_keys: Optional[Dict[str, str]] = None):
"""
Initialize the constitutional reasoner.
Args:
constitution_path: Path to the constitution markdown file
model: LLM model to use (e.g., 'llama3.2', 'gpt-4'). If None, uses llm default.
api_keys: Dict with 'openai' and/or 'anthropic' API keys
"""
self.constitution_path = Path(constitution_path)
self.model = model
self.api_keys = api_keys or {}
if not self.constitution_path.exists():
raise FileNotFoundError(f"Constitution not found at {constitution_path}")
@@ -142,7 +144,7 @@ class ConstitutionalReasoner:
[f"**{s['title']}**\n{s['content']}" for s in sections]
)
prompt = f"""You are a constitutional reasoner for a democratic community governance system.
prompt = f"""You're helping with governance questions based on our community's constitution.
RELEVANT CONSTITUTIONAL SECTIONS:
{sections_text}
@@ -155,15 +157,19 @@ QUESTION: {question}
prompt += """
Please provide:
1. A clear answer based on the constitutional provisions
2. Specific citations (article/section numbers)
3. Your confidence level (high/medium/low)
4. Any ambiguities that might require clarification (if any)
Provide:
1. A clear answer based on the constitution
2. Which specific sections support this
3. Confidence level (high/medium/low)
4. Any unclear areas needing clarification (if any)
TONE: Direct, concise, and clear. Use short paragraphs with line breaks.
Avoid both formal/legalistic language AND casual interjections.
Professional but approachable.
Format your response as JSON:
{
"answer": "your answer here",
"answer": "clear, concise answer with short paragraphs",
"citations": ["Article X, Section Y", ...],
"confidence": "high|medium|low",
"ambiguity": "description of any ambiguity, or null if clear"
@@ -173,14 +179,23 @@ Format your response as JSON:
def _call_llm(self, prompt: str) -> str:
"""Call the llm CLI tool"""
cmd = ["llm", "prompt"]
import os
cmd = ["llm"]
if self.model:
cmd.extend(["-m", self.model])
cmd.append(prompt)
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Set up environment with API keys
env = os.environ.copy()
if self.api_keys.get('openai'):
env['OPENAI_API_KEY'] = self.api_keys['openai']
if self.api_keys.get('anthropic'):
env['ANTHROPIC_API_KEY'] = self.api_keys['anthropic']
result = subprocess.run(cmd, capture_output=True, text=True, check=True, env=env)
return result.stdout.strip()
def _parse_query_result(
@@ -254,6 +269,8 @@ What are the requirements (timeline, voting threshold, etc.)?
Extract structured information as JSON:
{{
"title": "concise title for the proposal (max 80 chars)",
"description": "clear 1-2 sentence description of what the proposal does",
"proposal_type": "standard|urgent|constitutional_amendment",
"discussion_period_days": number,
"voting_threshold": "description of threshold",

View File

@@ -135,6 +135,9 @@ class MastodonAdapter(PlatformAdapter):
"""Disconnect from Mastodon and cleanup."""
logger.info("Disconnecting from Mastodon")
# Stop polling
self.polling = False
# Stop stream listener if running
if self.stream_listener:
self.stream_listener.stop()
@@ -149,7 +152,7 @@ class MastodonAdapter(PlatformAdapter):
def start_listening(self, callback: Callable[[PlatformMessage], None]):
"""
Start listening for mentions via Mastodon streaming API.
Start listening for mentions via polling (streaming API can be unreliable).
Args:
callback: Function to call with each received message
@@ -157,27 +160,71 @@ class MastodonAdapter(PlatformAdapter):
if not self.connected or not self.client:
raise RuntimeError("Must call connect() before start_listening()")
logger.info("Starting Mastodon stream listener for mentions")
logger.info("Starting Mastodon notification poller")
# Create stream listener
self.stream_listener = GovbotStreamListener(
bot_id=self.bot_user_id,
callback=callback,
adapter=self,
)
# Track last seen notification ID to avoid duplicates
self.last_notification_id = None
self.callback = callback
self.polling = True
# Start streaming in a separate thread
def stream_thread():
# Get the current latest notification to set baseline
# This prevents re-processing old notifications on restart
try:
latest = self.client.notifications(limit=1)
if latest:
self.last_notification_id = latest[0]['id']
logger.info(f"Starting from notification ID: {self.last_notification_id}")
except Exception as e:
logger.warning(f"Could not get latest notification: {e}")
# Start polling in a separate thread
def poll_thread():
try:
# Stream user timeline (includes mentions)
self.client.stream_user(self.stream_listener, run_async=False, reconnect_async=True)
except Exception as e:
logger.error(f"Stream listener error: {e}", exc_info=True)
while self.polling:
try:
# Get recent notifications
notifications = self.client.notifications(
limit=20,
since_id=self.last_notification_id
)
self.listener_thread = threading.Thread(target=stream_thread, daemon=True)
# Process new notifications in reverse order (oldest first)
for notif in reversed(notifications):
# Only process mentions
if notif['type'] in ['mention']:
status = notif.get('status')
if not status:
continue
# Don't respond to ourselves
if str(status['account']['id']) == self.bot_user_id:
continue
# Update last seen ID
self.last_notification_id = notif['id']
# Convert to PlatformMessage
message = self._status_to_message(status)
message.mentions_bot = True
# Call the callback
logger.info(f"Processing mention from @{message.author_handle}")
self.callback(message)
# Sleep before next poll
time.sleep(5) # Poll every 5 seconds
except Exception as e:
logger.error(f"Error polling notifications: {e}", exc_info=True)
time.sleep(10) # Wait longer on error
except Exception as e:
logger.error(f"Poll thread error: {e}", exc_info=True)
self.listener_thread = threading.Thread(target=poll_thread, daemon=True)
self.listener_thread.start()
logger.info("Stream listener started")
logger.info("Notification poller started")
def post(
self,
@@ -190,7 +237,7 @@ class MastodonAdapter(PlatformAdapter):
Post a message to Mastodon.
Args:
message: Text content to post (max 500 characters for most instances)
message: Text content to post (Markdown will be stripped for plain text)
thread_id: Not used in Mastodon (use reply_to_id for threading)
reply_to_id: Status ID to reply to
visibility: Message visibility level
@@ -204,6 +251,9 @@ class MastodonAdapter(PlatformAdapter):
if not self.connected or not self.client:
raise RuntimeError("Must call connect() before posting")
# Strip Markdown formatting for Mastodon (plain text only)
message = self._strip_markdown(message)
# Map visibility to Mastodon format
mastodon_visibility = self._map_visibility(visibility)
@@ -446,6 +496,42 @@ class MastodonAdapter(PlatformAdapter):
return f"{self.instance_url}/web/statuses/{thread_id}"
# Private helper methods
def _status_to_message(self, status: Dict[str, Any]) -> PlatformMessage:
"""Convert Mastodon status to PlatformMessage"""
from html import unescape
import re
# Extract text content (strip HTML)
content = status.get("content", "")
# Simple HTML stripping
content = re.sub(r"<[^>]+>", "", content)
content = unescape(content)
# Map visibility
visibility_map = {
"public": MessageVisibility.PUBLIC,
"unlisted": MessageVisibility.UNLISTED,
"private": MessageVisibility.FOLLOWERS,
"direct": MessageVisibility.DIRECT,
}
visibility = visibility_map.get(
status.get("visibility", "public"), MessageVisibility.PUBLIC
)
return PlatformMessage(
id=str(status["id"]),
text=content,
author_id=str(status["account"]["id"]),
author_handle=status["account"]["username"],
timestamp=status["created_at"],
thread_id=str(status.get("in_reply_to_id", status["id"])),
reply_to_id=str(status["in_reply_to_id"]) if status.get("in_reply_to_id") else None,
visibility=visibility,
raw_data=status,
)
# Private helper methods for skill execution
def _suspend_account(self, params: Dict[str, Any]) -> Dict[str, Any]:
@@ -582,6 +668,28 @@ class MastodonAdapter(PlatformAdapter):
"reverse_params": {"announcement_id": announcement["id"]},
}
def _strip_markdown(self, text: str) -> str:
"""Strip Markdown formatting for plain text display on Mastodon"""
import re
# Remove bold/italic markers
text = re.sub(r'\*\*([^\*]+)\*\*', r'\1', text) # **bold** -> bold
text = re.sub(r'\*([^\*]+)\*', r'\1', text) # *italic* -> italic
text = re.sub(r'__([^_]+)__', r'\1', text) # __bold__ -> bold
text = re.sub(r'_([^_]+)_', r'\1', text) # _italic_ -> italic
text = re.sub(r'`([^`]+)`', r'\1', text) # `code` -> code
# Remove headers but keep the text
text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE)
# Convert Markdown lists to simple text with bullets
text = re.sub(r'^\s*[-*+]\s+', '', text, flags=re.MULTILINE)
# Remove link formatting but keep URLs: [text](url) -> text (url)
text = re.sub(r'\[([^\]]+)\]\(([^\)]+)\)', r'\1 (\2)', text)
return text
def _map_visibility(self, visibility: MessageVisibility) -> str:
"""Map abstract visibility to Mastodon visibility"""
mapping = {

View File

@@ -27,11 +27,13 @@ class AIConfig(BaseModel):
default_model: Optional[str] = Field(
None,
description="Default LLM model to use (e.g., 'llama3.2' for Ollama, 'gpt-4' for OpenAI)",
description="Default LLM model to use (e.g., 'gpt-4o-mini', 'claude-opus-4-6')",
)
fallback_model: Optional[str] = Field(None, description="Fallback model if default fails")
temperature: float = Field(0.7, description="LLM temperature for responses")
max_tokens: Optional[int] = Field(None, description="Maximum tokens for LLM responses")
openai_api_key: Optional[str] = Field(None, description="OpenAI API key (for GPT models)")
anthropic_api_key: Optional[str] = Field(None, description="Anthropic API key (for Claude models)")
class GovernanceConfig(BaseModel):