#!/usr/bin/env python3 """ Convert diagnostic_output.csv to individual JSON files following the bicorder.json spec. Handles mapping between old CSV column names and current spec terminology. """ import csv import json import os from datetime import datetime from typing import Dict, List, Any, Optional import statistics # Mapping from CSV columns to spec terms # Format: (csv_column_suffix, set_name, term_left, term_right) GRADIENT_MAPPINGS = [ # Design set ("explicit_vs_implicit", "Design", "explicit", "implicit"), ("precise_vs_interpretive", "Design", "precise", "interpretive"), ("elite_vs_vernacular", "Design", "institutional", "vernacular"), # Changed: elite → institutional ("documenting_vs_enabling", "Design", "documenting", "enabling"), ("static_vs_malleable", "Design", "static", "malleable"), ("technical_vs_social", "Design", "technical", "social"), ("universal_vs_particular", "Design", "universal", "particular"), ("durable_vs_ephemeral", "Design", "durable", "ephemeral"), # Entanglement set ("macro_vs_micro", "Entanglement", "macro", "micro"), ("sovereign_vs_subsidiary", "Entanglement", "sovereign", "subsidiary"), ("self-enforcing_vs_enforced", "Entanglement", "self-enforcing", "enforced"), ("abstract_vs_embodied", "Entanglement", "abstract", "embodied"), ("obligatory_vs_voluntary", "Entanglement", "obligatory", "voluntary"), ("flocking_vs_swarming", "Entanglement", "flocking", "swarming"), ("defensible_vs_exposed", "Entanglement", "defensible", "exposed"), ("exclusive_vs_non-exclusive", "Entanglement", "monopolistic", "pluralistic"), # Changed: exclusive → monopolistic # Experience set ("sufficient_vs_insufficient", "Experience", "sufficient", "limited"), # Changed: insufficient → limited ("crystallized_vs_contested", "Experience", "crystallized", "contested"), ("trust-evading_vs_trust-inducing", "Experience", "trust-evading", "trust-inducing"), ("predictable_vs_emergent", "Experience", "predictable", "emergent"), ("exclusion_vs_inclusion", "Experience", "exclusion", "inclusion"), ("Kafka_vs_Whitehead", "Experience", "restraining", "liberating"), # Changed: Kafka_vs_Whitehead → restraining_vs_liberating ("dead_vs_alive", "Experience", "dead", "alive"), ] def load_spec_template(spec_path: str) -> Dict[str, Any]: """Load the bicorder.json spec as a template.""" with open(spec_path, 'r') as f: return json.load(f) def calculate_hardness(gradient_values: List[Optional[int]]) -> Optional[int]: """ Calculate hardness: mean of all gradient values, rounded to nearest integer. Returns None if there are no valid values. """ valid_values = [v for v in gradient_values if v is not None] if not valid_values: return None return round(statistics.mean(valid_values)) def calculate_polarization(gradient_values: List[Optional[int]]) -> Optional[int]: """ Calculate polarization: degree to which values are extreme vs centered. If all values are 1 or 9 (max polarization), return 1. If all values are 5 (centered), return 9. Returns None if there are no valid values. """ valid_values = [v for v in gradient_values if v is not None] if not valid_values: return None # Calculate average distance from center (5) distances = [abs(v - 5) for v in valid_values] avg_distance = statistics.mean(distances) # Max distance is 4 (from 1 or 9 to 5) # Convert to scale where 4 → 1 (polarized) and 0 → 9 (centrist) # Linear mapping: polarization = 9 - (avg_distance / 4) * 8 polarization = 9 - (avg_distance / 4) * 8 return round(polarization) def create_json_from_row(row: Dict[str, str], template: Dict[str, Any]) -> Dict[str, Any]: """Convert a CSV row to a JSON object following the spec.""" result = json.loads(json.dumps(template)) # Deep copy # Update metadata result["metadata"]["protocol"] = row["Descriptor"] result["metadata"]["description"] = row["Description"] result["metadata"]["analyst"] = row["analyst"] result["metadata"]["standpoint"] = row["standpoint"] result["metadata"]["timestamp"] = None # Not in CSV # Collect gradient values for analysis calculations gradient_values = [] # Map CSV values to gradient objects for csv_suffix, set_name, term_left, term_right in GRADIENT_MAPPINGS: csv_column = f"{set_name}_{csv_suffix}" # Get the value from CSV (may be empty string) csv_value = row.get(csv_column, "").strip() value = int(csv_value) if csv_value else None if value is not None: gradient_values.append(value) # Find the corresponding gradient in the template for diagnostic_set in result["diagnostic"]: if diagnostic_set["set_name"] == set_name: for gradient in diagnostic_set["gradients"]: if gradient["term_left"] == term_left and gradient["term_right"] == term_right: gradient["value"] = value break # Calculate automated analysis fields result["analysis"][0]["value"] = calculate_hardness(gradient_values) # hardness result["analysis"][1]["value"] = calculate_polarization(gradient_values) # polarized # analysis[2] is bureaucratic (LDA-based) - leave as null # analysis[3] is usefulness - leave as null (not automated) return result def main(): """Main conversion process.""" # Paths csv_path = "diagnostic_output.csv" spec_path = "../bicorder.json" output_dir = "synthetic_readings" # Create output directory os.makedirs(output_dir, exist_ok=True) # Load template template = load_spec_template(spec_path) # Process CSV with open(csv_path, 'r', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) count = 0 for i, row in enumerate(reader, start=1): # Create JSON object json_obj = create_json_from_row(row, template) # Generate filename from protocol name protocol_name = row["Descriptor"] # Sanitize filename filename = protocol_name.replace("/", "_").replace("\\", "_") filename = f"{i:03d}_{filename}.json" # Write to file output_path = os.path.join(output_dir, filename) with open(output_path, 'w', encoding='utf-8') as jsonfile: json.dump(json_obj, jsonfile, indent=2) count += 1 if count % 50 == 0: print(f"Processed {count} protocols...") print(f"\nConversion complete! Created {count} JSON files in {output_dir}/") if __name__ == "__main__": main()