- Move all scripts to scripts/, web assets to web/, analysis results into self-contained data/readings/<type>_<YYYYMMDD>/ directories - Add data/readings/manual_20260320/ with 32 JSON readings from git.medlab.host/ntnsndr/protocol-bicorder-data - Add scripts/json_to_csv.py to convert bicorder JSON files to CSV - Add scripts/sync_readings.sh for one-command sync + re-analysis of any dataset backed by a .sync_source config file - Add scripts/classify_readings.py to apply the LDA classifier to all readings and save per-reading cluster assignments - Add --min-coverage flag to multivariate_analysis.py for sparse/shortform datasets; also applies in lda_visualization.py - Fix lda_visualization.py NaN handling and 0-d array annotation bug - Update README.md and WORKFLOW.md to document datasets, sync workflow, shortform handling, and new scripts Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
165 lines
6.2 KiB
Python
165 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Convert a directory of individual bicorder JSON reading files into a diagnostic CSV.
|
|
|
|
This is the reverse of convert_csv_to_json.py. Each JSON file becomes one row.
|
|
Handles readings across bicorder versions by matching on term_left/term_right pairs
|
|
rather than column names.
|
|
|
|
Null gradient values (e.g., shortform readings that skip non-key dimensions) are
|
|
written as empty cells so downstream analysis can treat them as NaN.
|
|
|
|
Usage:
|
|
python3 scripts/json_to_csv.py data/readings/manual_20260320/json/
|
|
python3 scripts/json_to_csv.py data/readings/manual_20260320/json/ -o data/readings/manual_20260320/readings.csv
|
|
python3 scripts/json_to_csv.py data/readings/manual_20260320/json/ --shortform-only
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
from pathlib import Path
|
|
|
|
|
|
# Map old term pairs to current column names (matches COLUMN_RENAMES in other scripts).
|
|
# Keys are (term_left, term_right) as found in older JSON files.
|
|
TERM_RENAMES = {
|
|
('elite', 'vernacular'): ('institutional', 'vernacular'),
|
|
('exclusive', 'non-exclusive'): ('monopolistic', 'pluralistic'),
|
|
('insufficient', 'sufficient'): ('sufficient', 'limited'), # note: order swapped in old versions
|
|
('Kafka', 'Whitehead'): ('restraining', 'liberating'),
|
|
}
|
|
|
|
|
|
def load_bicorder_columns(bicorder_path):
|
|
"""Read ordered column definitions from bicorder.json."""
|
|
with open(bicorder_path) as f:
|
|
data = json.load(f)
|
|
columns = []
|
|
key_columns = set()
|
|
for category in data['diagnostic']:
|
|
set_name = category['set_name']
|
|
for gradient in category['gradients']:
|
|
col = f"{set_name}_{gradient['term_left']}_vs_{gradient['term_right']}"
|
|
columns.append(col)
|
|
if gradient.get('shortform', False):
|
|
key_columns.add(col)
|
|
return columns, key_columns
|
|
|
|
|
|
def normalize_terms(term_left, term_right):
|
|
"""Apply renames to match current bicorder.json terminology."""
|
|
pair = (term_left, term_right)
|
|
if pair in TERM_RENAMES:
|
|
return TERM_RENAMES[pair]
|
|
# Also check reversed pair (some old files had swapped left/right)
|
|
reversed_pair = (term_right, term_left)
|
|
if reversed_pair in TERM_RENAMES:
|
|
new_left, new_right = TERM_RENAMES[reversed_pair]
|
|
return new_right, new_left # swap back
|
|
return term_left, term_right
|
|
|
|
|
|
def json_to_row(json_path, all_columns):
|
|
"""Convert a single JSON reading file to a CSV row dict."""
|
|
with open(json_path) as f:
|
|
data = json.load(f)
|
|
|
|
meta = data.get('metadata', {})
|
|
row = {
|
|
'Descriptor': meta.get('protocol', ''),
|
|
'Description': '', # not stored in individual reading files
|
|
'analyst': meta.get('analyst', ''),
|
|
'standpoint': meta.get('standpoint', ''),
|
|
'timestamp': meta.get('timestamp', ''),
|
|
'shortform': str(meta.get('shortform', '')),
|
|
'version': data.get('version', ''),
|
|
}
|
|
|
|
# Build lookup: (normalized_term_left, normalized_term_right) -> value
|
|
gradient_values = {}
|
|
for category in data.get('diagnostic', []):
|
|
set_name = category['set_name']
|
|
for gradient in category.get('gradients', []):
|
|
tl = gradient['term_left']
|
|
tr = gradient['term_right']
|
|
tl_norm, tr_norm = normalize_terms(tl, tr)
|
|
col = f"{set_name}_{tl_norm}_vs_{tr_norm}"
|
|
value = gradient.get('value')
|
|
gradient_values[col] = '' if value is None else str(value)
|
|
|
|
for col in all_columns:
|
|
row[col] = gradient_values.get(col, '')
|
|
|
|
return row
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Convert directory of bicorder JSON files to a diagnostic CSV',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Example usage:
|
|
python3 scripts/json_to_csv.py data/readings/manual_20260320/json/
|
|
python3 scripts/json_to_csv.py data/readings/manual_20260320/json/ --shortform-only
|
|
python3 scripts/json_to_csv.py data/readings/manual_20260320/json/ -o data/readings/manual_20260320/readings.csv
|
|
"""
|
|
)
|
|
parser.add_argument('json_dir', help='Directory containing bicorder JSON reading files')
|
|
parser.add_argument('-o', '--output', default=None,
|
|
help='Output CSV path (default: <dataset_dir>/readings.csv)')
|
|
parser.add_argument('-b', '--bicorder', default='../bicorder.json',
|
|
help='Path to bicorder.json (default: ../bicorder.json)')
|
|
parser.add_argument('--shortform-only', action='store_true',
|
|
help='Include only the key shortform dimensions (useful when most readings are shortform)')
|
|
args = parser.parse_args()
|
|
|
|
json_dir = Path(args.json_dir)
|
|
dataset_dir = json_dir.parent
|
|
output_path = Path(args.output) if args.output else dataset_dir / 'readings.csv'
|
|
|
|
all_columns, key_columns = load_bicorder_columns(args.bicorder)
|
|
|
|
if args.shortform_only:
|
|
columns = [c for c in all_columns if c in key_columns]
|
|
print(f"Shortform mode: using {len(columns)} key dimensions")
|
|
else:
|
|
columns = all_columns
|
|
|
|
json_files = sorted(json_dir.glob('*.json'))
|
|
if not json_files:
|
|
print(f"Error: no JSON files found in {json_dir}")
|
|
return
|
|
|
|
print(f"Converting {len(json_files)} JSON files → {output_path}")
|
|
|
|
fieldnames = ['Descriptor', 'Description', 'analyst', 'standpoint',
|
|
'timestamp', 'shortform', 'version'] + columns
|
|
|
|
rows = []
|
|
for json_path in json_files:
|
|
try:
|
|
row = json_to_row(json_path, columns)
|
|
rows.append(row)
|
|
except Exception as e:
|
|
print(f" Warning: skipping {json_path.name}: {e}")
|
|
|
|
with open(output_path, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
|
|
# Summary stats
|
|
filled = {col: sum(1 for r in rows if r.get(col)) for col in columns}
|
|
print(f"Done. {len(rows)} rows written.")
|
|
print(f"\nDimension coverage (readings with a value):")
|
|
for col, count in filled.items():
|
|
pct = count / len(rows) * 100 if rows else 0
|
|
marker = '* ' if col in key_columns else ' '
|
|
print(f" {marker}{col}: {count}/{len(rows)} ({pct:.0f}%)")
|
|
print(f"\n(* = shortform/key dimension)")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|