Reorganize directory, add manual dataset and sync tooling

- Move all scripts to scripts/, web assets to web/, analysis results
  into self-contained data/readings/<type>_<YYYYMMDD>/ directories
- Add data/readings/manual_20260320/ with 32 JSON readings from
  git.medlab.host/ntnsndr/protocol-bicorder-data
- Add scripts/json_to_csv.py to convert bicorder JSON files to CSV
- Add scripts/sync_readings.sh for one-command sync + re-analysis of
  any dataset backed by a .sync_source config file
- Add scripts/classify_readings.py to apply the LDA classifier to all
  readings and save per-reading cluster assignments
- Add --min-coverage flag to multivariate_analysis.py for sparse/shortform
  datasets; also applies in lda_visualization.py
- Fix lda_visualization.py NaN handling and 0-d array annotation bug
- Update README.md and WORKFLOW.md to document datasets, sync workflow,
  shortform handling, and new scripts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Nathan Schneider
2026-03-20 17:35:13 -06:00
parent 0c794dddae
commit 897c30406b
545 changed files with 10715 additions and 718 deletions

View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""
Convert diagnostic_output.csv to individual JSON files following the bicorder.json spec.
Handles mapping between old CSV column names and current spec terminology.
"""
import csv
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional
import statistics
# Mapping from CSV columns to spec terms
# Format: (csv_column_suffix, set_name, term_left, term_right)
GRADIENT_MAPPINGS = [
# Design set
("explicit_vs_implicit", "Design", "explicit", "implicit"),
("precise_vs_interpretive", "Design", "precise", "interpretive"),
("elite_vs_vernacular", "Design", "institutional", "vernacular"), # Changed: elite → institutional
("documenting_vs_enabling", "Design", "documenting", "enabling"),
("static_vs_malleable", "Design", "static", "malleable"),
("technical_vs_social", "Design", "technical", "social"),
("universal_vs_particular", "Design", "universal", "particular"),
("durable_vs_ephemeral", "Design", "durable", "ephemeral"),
# Entanglement set
("macro_vs_micro", "Entanglement", "macro", "micro"),
("sovereign_vs_subsidiary", "Entanglement", "sovereign", "subsidiary"),
("self-enforcing_vs_enforced", "Entanglement", "self-enforcing", "enforced"),
("abstract_vs_embodied", "Entanglement", "abstract", "embodied"),
("obligatory_vs_voluntary", "Entanglement", "obligatory", "voluntary"),
("flocking_vs_swarming", "Entanglement", "flocking", "swarming"),
("defensible_vs_exposed", "Entanglement", "defensible", "exposed"),
("exclusive_vs_non-exclusive", "Entanglement", "monopolistic", "pluralistic"), # Changed: exclusive → monopolistic
# Experience set
("sufficient_vs_insufficient", "Experience", "sufficient", "limited"), # Changed: insufficient → limited
("crystallized_vs_contested", "Experience", "crystallized", "contested"),
("trust-evading_vs_trust-inducing", "Experience", "trust-evading", "trust-inducing"),
("predictable_vs_emergent", "Experience", "predictable", "emergent"),
("exclusion_vs_inclusion", "Experience", "exclusion", "inclusion"),
("Kafka_vs_Whitehead", "Experience", "restraining", "liberating"), # Changed: Kafka_vs_Whitehead → restraining_vs_liberating
("dead_vs_alive", "Experience", "dead", "alive"),
]
def load_spec_template(spec_path: str) -> Dict[str, Any]:
"""Load the bicorder.json spec as a template."""
with open(spec_path, 'r') as f:
return json.load(f)
def calculate_hardness(gradient_values: List[Optional[int]]) -> Optional[int]:
"""
Calculate hardness: mean of all gradient values, rounded to nearest integer.
Returns None if there are no valid values.
"""
valid_values = [v for v in gradient_values if v is not None]
if not valid_values:
return None
return round(statistics.mean(valid_values))
def calculate_polarization(gradient_values: List[Optional[int]]) -> Optional[int]:
"""
Calculate polarization: degree to which values are extreme vs centered.
If all values are 1 or 9 (max polarization), return 1.
If all values are 5 (centered), return 9.
Returns None if there are no valid values.
"""
valid_values = [v for v in gradient_values if v is not None]
if not valid_values:
return None
# Calculate average distance from center (5)
distances = [abs(v - 5) for v in valid_values]
avg_distance = statistics.mean(distances)
# Max distance is 4 (from 1 or 9 to 5)
# Convert to scale where 4 → 1 (polarized) and 0 → 9 (centrist)
# Linear mapping: polarization = 9 - (avg_distance / 4) * 8
polarization = 9 - (avg_distance / 4) * 8
return round(polarization)
def create_json_from_row(row: Dict[str, str], template: Dict[str, Any]) -> Dict[str, Any]:
"""Convert a CSV row to a JSON object following the spec."""
result = json.loads(json.dumps(template)) # Deep copy
# Update metadata
result["metadata"]["protocol"] = row["Descriptor"]
result["metadata"]["description"] = row["Description"]
result["metadata"]["analyst"] = row["analyst"]
result["metadata"]["standpoint"] = row["standpoint"]
result["metadata"]["timestamp"] = None # Not in CSV
# Collect gradient values for analysis calculations
gradient_values = []
# Map CSV values to gradient objects
for csv_suffix, set_name, term_left, term_right in GRADIENT_MAPPINGS:
csv_column = f"{set_name}_{csv_suffix}"
# Get the value from CSV (may be empty string)
csv_value = row.get(csv_column, "").strip()
value = int(csv_value) if csv_value else None
if value is not None:
gradient_values.append(value)
# Find the corresponding gradient in the template
for diagnostic_set in result["diagnostic"]:
if diagnostic_set["set_name"] == set_name:
for gradient in diagnostic_set["gradients"]:
if gradient["term_left"] == term_left and gradient["term_right"] == term_right:
gradient["value"] = value
break
# Calculate automated analysis fields
result["analysis"][0]["value"] = calculate_hardness(gradient_values) # hardness
result["analysis"][1]["value"] = calculate_polarization(gradient_values) # polarized
# analysis[2] is bureaucratic (LDA-based) - leave as null
# analysis[3] is usefulness - leave as null (not automated)
return result
def main():
"""Main conversion process."""
import argparse
parser = argparse.ArgumentParser(
description='Convert diagnostic readings CSV to individual JSON files',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Example usage:
python3 scripts/convert_csv_to_json.py data/readings/synthetic_20251116/readings.csv
python3 scripts/convert_csv_to_json.py data/readings/manual_20260101/readings.csv --output-dir data/readings/manual_20260101/json
"""
)
parser.add_argument('input_csv', help='Diagnostic readings CSV (e.g. data/readings/synthetic_20251116/readings.csv)')
parser.add_argument('--output-dir', default=None,
help='Output directory for JSON files (default: <dataset_dir>/json)')
parser.add_argument('--bicorder', default='../bicorder.json',
help='Path to bicorder.json (default: ../bicorder.json)')
args = parser.parse_args()
csv_path = args.input_csv
spec_path = args.bicorder
output_dir = args.output_dir if args.output_dir else str(Path(args.input_csv).parent / 'json')
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Load template
template = load_spec_template(spec_path)
# Process CSV
with open(csv_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
count = 0
for i, row in enumerate(reader, start=1):
# Create JSON object
json_obj = create_json_from_row(row, template)
# Generate filename from protocol name
protocol_name = row["Descriptor"]
# Sanitize filename
filename = protocol_name.replace("/", "_").replace("\\", "_")
filename = f"{i:03d}_{filename}.json"
# Write to file
output_path = os.path.join(output_dir, filename)
with open(output_path, 'w', encoding='utf-8') as jsonfile:
json.dump(json_obj, jsonfile, indent=2)
count += 1
if count % 50 == 0:
print(f"Processed {count} protocols...")
print(f"\nConversion complete! Created {count} JSON files in {output_dir}/")
if __name__ == "__main__":
main()