Files
protocol-bicorder/analysis/scripts/json_to_csv.py
Nathan Schneider 60e83783ec Flatten data/readings/ → data/
Remove the intermediate readings/ subdirectory level — dataset naming
(synthetic_YYYYMMDD, manual_YYYYMMDD) already encodes what the data is.
Update all path references across scripts and docs accordingly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 17:46:23 -06:00

165 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""
Convert a directory of individual bicorder JSON reading files into a diagnostic CSV.
This is the reverse of convert_csv_to_json.py. Each JSON file becomes one row.
Handles readings across bicorder versions by matching on term_left/term_right pairs
rather than column names.
Null gradient values (e.g., shortform readings that skip non-key dimensions) are
written as empty cells so downstream analysis can treat them as NaN.
Usage:
python3 scripts/json_to_csv.py data/manual_20260320/json/
python3 scripts/json_to_csv.py data/manual_20260320/json/ -o data/manual_20260320/readings.csv
python3 scripts/json_to_csv.py data/manual_20260320/json/ --shortform-only
"""
import argparse
import csv
import json
from pathlib import Path
# Map old term pairs to current column names (matches COLUMN_RENAMES in other scripts).
# Keys are (term_left, term_right) as found in older JSON files.
TERM_RENAMES = {
('elite', 'vernacular'): ('institutional', 'vernacular'),
('exclusive', 'non-exclusive'): ('monopolistic', 'pluralistic'),
('insufficient', 'sufficient'): ('sufficient', 'limited'), # note: order swapped in old versions
('Kafka', 'Whitehead'): ('restraining', 'liberating'),
}
def load_bicorder_columns(bicorder_path):
"""Read ordered column definitions from bicorder.json."""
with open(bicorder_path) as f:
data = json.load(f)
columns = []
key_columns = set()
for category in data['diagnostic']:
set_name = category['set_name']
for gradient in category['gradients']:
col = f"{set_name}_{gradient['term_left']}_vs_{gradient['term_right']}"
columns.append(col)
if gradient.get('shortform', False):
key_columns.add(col)
return columns, key_columns
def normalize_terms(term_left, term_right):
"""Apply renames to match current bicorder.json terminology."""
pair = (term_left, term_right)
if pair in TERM_RENAMES:
return TERM_RENAMES[pair]
# Also check reversed pair (some old files had swapped left/right)
reversed_pair = (term_right, term_left)
if reversed_pair in TERM_RENAMES:
new_left, new_right = TERM_RENAMES[reversed_pair]
return new_right, new_left # swap back
return term_left, term_right
def json_to_row(json_path, all_columns):
"""Convert a single JSON reading file to a CSV row dict."""
with open(json_path) as f:
data = json.load(f)
meta = data.get('metadata', {})
row = {
'Descriptor': meta.get('protocol', ''),
'Description': '', # not stored in individual reading files
'analyst': meta.get('analyst', ''),
'standpoint': meta.get('standpoint', ''),
'timestamp': meta.get('timestamp', ''),
'shortform': str(meta.get('shortform', '')),
'version': data.get('version', ''),
}
# Build lookup: (normalized_term_left, normalized_term_right) -> value
gradient_values = {}
for category in data.get('diagnostic', []):
set_name = category['set_name']
for gradient in category.get('gradients', []):
tl = gradient['term_left']
tr = gradient['term_right']
tl_norm, tr_norm = normalize_terms(tl, tr)
col = f"{set_name}_{tl_norm}_vs_{tr_norm}"
value = gradient.get('value')
gradient_values[col] = '' if value is None else str(value)
for col in all_columns:
row[col] = gradient_values.get(col, '')
return row
def main():
parser = argparse.ArgumentParser(
description='Convert directory of bicorder JSON files to a diagnostic CSV',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Example usage:
python3 scripts/json_to_csv.py data/manual_20260320/json/
python3 scripts/json_to_csv.py data/manual_20260320/json/ --shortform-only
python3 scripts/json_to_csv.py data/manual_20260320/json/ -o data/manual_20260320/readings.csv
"""
)
parser.add_argument('json_dir', help='Directory containing bicorder JSON reading files')
parser.add_argument('-o', '--output', default=None,
help='Output CSV path (default: <dataset_dir>/readings.csv)')
parser.add_argument('-b', '--bicorder', default='../bicorder.json',
help='Path to bicorder.json (default: ../bicorder.json)')
parser.add_argument('--shortform-only', action='store_true',
help='Include only the key shortform dimensions (useful when most readings are shortform)')
args = parser.parse_args()
json_dir = Path(args.json_dir)
dataset_dir = json_dir.parent
output_path = Path(args.output) if args.output else dataset_dir / 'readings.csv'
all_columns, key_columns = load_bicorder_columns(args.bicorder)
if args.shortform_only:
columns = [c for c in all_columns if c in key_columns]
print(f"Shortform mode: using {len(columns)} key dimensions")
else:
columns = all_columns
json_files = sorted(json_dir.glob('*.json'))
if not json_files:
print(f"Error: no JSON files found in {json_dir}")
return
print(f"Converting {len(json_files)} JSON files → {output_path}")
fieldnames = ['Descriptor', 'Description', 'analyst', 'standpoint',
'timestamp', 'shortform', 'version'] + columns
rows = []
for json_path in json_files:
try:
row = json_to_row(json_path, columns)
rows.append(row)
except Exception as e:
print(f" Warning: skipping {json_path.name}: {e}")
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
# Summary stats
filled = {col: sum(1 for r in rows if r.get(col)) for col in columns}
print(f"Done. {len(rows)} rows written.")
print(f"\nDimension coverage (readings with a value):")
for col, count in filled.items():
pct = count / len(rows) * 100 if rows else 0
marker = '* ' if col in key_columns else ' '
print(f" {marker}{col}: {count}/{len(rows)} ({pct:.0f}%)")
print(f"\n(* = shortform/key dimension)")
if __name__ == '__main__':
main()