173 lines
6.6 KiB
Python
173 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Convert diagnostic_output.csv to individual JSON files following the bicorder.json spec.
|
|
Handles mapping between old CSV column names and current spec terminology.
|
|
"""
|
|
|
|
import csv
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Optional
|
|
import statistics
|
|
|
|
|
|
# Mapping from CSV columns to spec terms
|
|
# Format: (csv_column_suffix, set_name, term_left, term_right)
|
|
GRADIENT_MAPPINGS = [
|
|
# Design set
|
|
("explicit_vs_implicit", "Design", "explicit", "implicit"),
|
|
("precise_vs_interpretive", "Design", "precise", "interpretive"),
|
|
("elite_vs_vernacular", "Design", "institutional", "vernacular"), # Changed: elite → institutional
|
|
("documenting_vs_enabling", "Design", "documenting", "enabling"),
|
|
("static_vs_malleable", "Design", "static", "malleable"),
|
|
("technical_vs_social", "Design", "technical", "social"),
|
|
("universal_vs_particular", "Design", "universal", "particular"),
|
|
("durable_vs_ephemeral", "Design", "durable", "ephemeral"),
|
|
|
|
# Entanglement set
|
|
("macro_vs_micro", "Entanglement", "macro", "micro"),
|
|
("sovereign_vs_subsidiary", "Entanglement", "sovereign", "subsidiary"),
|
|
("self-enforcing_vs_enforced", "Entanglement", "self-enforcing", "enforced"),
|
|
("abstract_vs_embodied", "Entanglement", "abstract", "embodied"),
|
|
("obligatory_vs_voluntary", "Entanglement", "obligatory", "voluntary"),
|
|
("flocking_vs_swarming", "Entanglement", "flocking", "swarming"),
|
|
("defensible_vs_exposed", "Entanglement", "defensible", "exposed"),
|
|
("exclusive_vs_non-exclusive", "Entanglement", "monopolistic", "pluralistic"), # Changed: exclusive → monopolistic
|
|
|
|
# Experience set
|
|
("sufficient_vs_insufficient", "Experience", "sufficient", "limited"), # Changed: insufficient → limited
|
|
("crystallized_vs_contested", "Experience", "crystallized", "contested"),
|
|
("trust-evading_vs_trust-inducing", "Experience", "trust-evading", "trust-inducing"),
|
|
("predictable_vs_emergent", "Experience", "predictable", "emergent"),
|
|
("exclusion_vs_inclusion", "Experience", "exclusion", "inclusion"),
|
|
("Kafka_vs_Whitehead", "Experience", "restraining", "liberating"), # Changed: Kafka_vs_Whitehead → restraining_vs_liberating
|
|
("dead_vs_alive", "Experience", "dead", "alive"),
|
|
]
|
|
|
|
|
|
def load_spec_template(spec_path: str) -> Dict[str, Any]:
|
|
"""Load the bicorder.json spec as a template."""
|
|
with open(spec_path, 'r') as f:
|
|
return json.load(f)
|
|
|
|
|
|
def calculate_hardness(gradient_values: List[Optional[int]]) -> Optional[int]:
|
|
"""
|
|
Calculate hardness: mean of all gradient values, rounded to nearest integer.
|
|
Returns None if there are no valid values.
|
|
"""
|
|
valid_values = [v for v in gradient_values if v is not None]
|
|
if not valid_values:
|
|
return None
|
|
return round(statistics.mean(valid_values))
|
|
|
|
|
|
def calculate_polarization(gradient_values: List[Optional[int]]) -> Optional[int]:
|
|
"""
|
|
Calculate polarization: degree to which values are extreme vs centered.
|
|
If all values are 1 or 9 (max polarization), return 1.
|
|
If all values are 5 (centered), return 9.
|
|
Returns None if there are no valid values.
|
|
"""
|
|
valid_values = [v for v in gradient_values if v is not None]
|
|
if not valid_values:
|
|
return None
|
|
|
|
# Calculate average distance from center (5)
|
|
distances = [abs(v - 5) for v in valid_values]
|
|
avg_distance = statistics.mean(distances)
|
|
|
|
# Max distance is 4 (from 1 or 9 to 5)
|
|
# Convert to scale where 4 → 1 (polarized) and 0 → 9 (centrist)
|
|
# Linear mapping: polarization = 9 - (avg_distance / 4) * 8
|
|
polarization = 9 - (avg_distance / 4) * 8
|
|
|
|
return round(polarization)
|
|
|
|
|
|
def create_json_from_row(row: Dict[str, str], template: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Convert a CSV row to a JSON object following the spec."""
|
|
result = json.loads(json.dumps(template)) # Deep copy
|
|
|
|
# Update metadata
|
|
result["metadata"]["protocol"] = row["Descriptor"]
|
|
result["metadata"]["description"] = row["Description"]
|
|
result["metadata"]["analyst"] = row["analyst"]
|
|
result["metadata"]["standpoint"] = row["standpoint"]
|
|
result["metadata"]["timestamp"] = None # Not in CSV
|
|
|
|
# Collect gradient values for analysis calculations
|
|
gradient_values = []
|
|
|
|
# Map CSV values to gradient objects
|
|
for csv_suffix, set_name, term_left, term_right in GRADIENT_MAPPINGS:
|
|
csv_column = f"{set_name}_{csv_suffix}"
|
|
|
|
# Get the value from CSV (may be empty string)
|
|
csv_value = row.get(csv_column, "").strip()
|
|
value = int(csv_value) if csv_value else None
|
|
|
|
if value is not None:
|
|
gradient_values.append(value)
|
|
|
|
# Find the corresponding gradient in the template
|
|
for diagnostic_set in result["diagnostic"]:
|
|
if diagnostic_set["set_name"] == set_name:
|
|
for gradient in diagnostic_set["gradients"]:
|
|
if gradient["term_left"] == term_left and gradient["term_right"] == term_right:
|
|
gradient["value"] = value
|
|
break
|
|
|
|
# Calculate automated analysis fields
|
|
result["analysis"][0]["value"] = calculate_hardness(gradient_values) # hardness
|
|
result["analysis"][1]["value"] = calculate_polarization(gradient_values) # polarized
|
|
# analysis[2] is bureaucratic (LDA-based) - leave as null
|
|
# analysis[3] is usefulness - leave as null (not automated)
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
"""Main conversion process."""
|
|
# Paths
|
|
csv_path = "diagnostic_output.csv"
|
|
spec_path = "../bicorder.json"
|
|
output_dir = "synthetic_readings"
|
|
|
|
# Create output directory
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Load template
|
|
template = load_spec_template(spec_path)
|
|
|
|
# Process CSV
|
|
with open(csv_path, 'r', encoding='utf-8') as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
|
|
count = 0
|
|
for i, row in enumerate(reader, start=1):
|
|
# Create JSON object
|
|
json_obj = create_json_from_row(row, template)
|
|
|
|
# Generate filename from protocol name
|
|
protocol_name = row["Descriptor"]
|
|
# Sanitize filename
|
|
filename = protocol_name.replace("/", "_").replace("\\", "_")
|
|
filename = f"{i:03d}_{filename}.json"
|
|
|
|
# Write to file
|
|
output_path = os.path.join(output_dir, filename)
|
|
with open(output_path, 'w', encoding='utf-8') as jsonfile:
|
|
json.dump(json_obj, jsonfile, indent=2)
|
|
|
|
count += 1
|
|
if count % 50 == 0:
|
|
print(f"Processed {count} protocols...")
|
|
|
|
print(f"\nConversion complete! Created {count} JSON files in {output_dir}/")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|