""" Background scheduler for governance processes. Handles: - Checking proposal deadlines - Sending reminders - Monitoring veto votes - Processing completed proposals """ import logging import threading import time from datetime import datetime from typing import Optional from sqlalchemy.orm import Session from .agent import GovernanceAgent from .db import queries logger = logging.getLogger("govbot.scheduler") class GovernanceScheduler: """ Background scheduler for temporal governance tasks. Runs in a separate thread and periodically: - Checks for processes past deadline - Sends pending reminders - Monitors veto votes """ def __init__( self, agent: GovernanceAgent, db_session: Session, check_interval: int = 60, ): """ Initialize the scheduler. Args: agent: GovernanceAgent instance db_session: Database session check_interval: How often to check (in seconds) """ self.agent = agent self.db = db_session self.check_interval = check_interval self.running = False self.thread: Optional[threading.Thread] = None def start(self): """Start the scheduler in a background thread""" if self.running: logger.warning("Scheduler already running") return self.running = True self.thread = threading.Thread(target=self._run_loop, daemon=True) self.thread.start() logger.info(f"Scheduler started (checking every {self.check_interval}s)") def stop(self): """Stop the scheduler""" self.running = False if self.thread: self.thread.join(timeout=5) logger.info("Scheduler stopped") def _run_loop(self): """Main scheduler loop""" while self.running: try: self._check_tasks() except Exception as e: logger.error(f"Scheduler error: {e}", exc_info=True) # Sleep but check periodically for shutdown for _ in range(self.check_interval): if not self.running: break time.sleep(1) def _check_tasks(self): """Check and process all scheduled tasks""" now = datetime.utcnow() logger.debug(f"Checking scheduled tasks at {now}") # Check for processes past deadline completed = self.agent.check_deadlines() if completed: logger.info(f"Completed {len(completed)} processes past deadline") for process in completed: logger.info( f"Process {process['process_id']}: {process['outcome']} " f"with votes {process['vote_counts']}" ) # TODO: Post result to Mastodon # Check for pending reminders reminders = self.agent.primitives.get_pending_reminders() for reminder in reminders: logger.info(f"Sending reminder: {reminder['data']['message']}") # TODO: Post reminder to Mastodon self.agent.primitives.mark_reminder_sent(reminder["id"]) # Check for veto votes (every cycle) self._check_veto_votes() def _check_veto_votes(self): """Check if any actions have reached veto threshold""" # Get recent actions that might be vetoed recent_actions = queries.get_recent_actions( session=self.db, limit=100 ) for action in recent_actions: if action.status == "executed" and action.reversible: # Check veto votes if queries.check_veto_threshold(self.db, action.id): logger.warning( f"Action {action.id} reached veto threshold! Reversing..." ) # Reverse the action queries.reverse_action( session=self.db, action_id=action.id, reversing_actor="community_veto", reason="Supermajority veto threshold reached", ) logger.info(f"Action {action.id} reversed by community veto") # TODO: Post to Mastodon about veto def run_scheduler_test(): """Test function for the scheduler""" from .db.models import init_db, get_session from .utils.config import load_config # Initialize config = load_config() engine = init_db(config.governance.db_path) db_session = get_session(engine) agent = GovernanceAgent( db_session=db_session, constitution_path=config.governance.constitution_path, model=config.ai.default_model, ) scheduler = GovernanceScheduler(agent, db_session, check_interval=10) logger.info("Starting scheduler test...") scheduler.start() try: # Run for 60 seconds time.sleep(60) except KeyboardInterrupt: logger.info("Test interrupted") finally: scheduler.stop() logger.info("Scheduler test complete") if __name__ == "__main__": run_scheduler_test()