Files
protocol-bicorder/analysis/multivariate_analysis.py
Nathan Schneider fa527bd1f1 Initial analysis
2025-11-21 19:34:33 -07:00

828 lines
31 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Multivariate Analysis Script for Protocol Bicorder Data
Performs comprehensive multivariate statistical analyses on protocol diagnostic data,
including clustering, dimensionality reduction, correlation analysis, and visualization.
Usage:
python3 multivariate_analysis.py diagnostic_output.csv [--analyses all]
python3 multivariate_analysis.py diagnostic_output.csv --analyses clustering pca
"""
import argparse
import sys
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
from scipy.spatial.distance import pdist, squareform
import networkx as nx
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA, FactorAnalysis
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.ensemble import RandomForestClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.metrics import silhouette_score, davies_bouldin_score
try:
import umap
UMAP_AVAILABLE = True
except ImportError:
UMAP_AVAILABLE = False
print("Note: UMAP not available. Install with: pip install umap-learn")
class ProtocolAnalyzer:
"""Main class for multivariate analysis of protocol data."""
def __init__(self, csv_path, output_dir='analysis_results'):
"""Initialize analyzer with data and output directory."""
self.csv_path = Path(csv_path)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
# Create subdirectories
(self.output_dir / 'plots').mkdir(exist_ok=True)
(self.output_dir / 'data').mkdir(exist_ok=True)
(self.output_dir / 'reports').mkdir(exist_ok=True)
# Load and prepare data
self.df = None
self.dimension_cols = []
self.design_cols = []
self.entanglement_cols = []
self.experience_cols = []
self.scaled_data = None
self.scaler = None
self._load_data()
def _load_data(self):
"""Load CSV and identify dimension columns."""
print(f"Loading data from {self.csv_path}...")
self.df = pd.read_csv(self.csv_path)
# Identify dimension columns
all_cols = self.df.columns.tolist()
self.design_cols = [c for c in all_cols if c.startswith('Design_')]
self.entanglement_cols = [c for c in all_cols if c.startswith('Entanglement_')]
self.experience_cols = [c for c in all_cols if c.startswith('Experience_')]
self.dimension_cols = self.design_cols + self.entanglement_cols + self.experience_cols
print(f"Loaded {len(self.df)} protocols with {len(self.dimension_cols)} dimensions")
print(f" - Design: {len(self.design_cols)}")
print(f" - Entanglement: {len(self.entanglement_cols)}")
print(f" - Experience: {len(self.experience_cols)}")
# Check for missing values
missing_count = self.df[self.dimension_cols].isna().sum().sum()
rows_with_missing = self.df[self.dimension_cols].isna().any(axis=1).sum()
if missing_count > 0:
print(f"\nWarning: Found {missing_count} missing values in {rows_with_missing} rows")
print("Dropping rows with missing values...")
self.df = self.df.dropna(subset=self.dimension_cols)
print(f"Dataset now contains {len(self.df)} protocols")
# Standardize the dimension data
self.scaler = StandardScaler()
self.scaled_data = self.scaler.fit_transform(self.df[self.dimension_cols])
def save_results(self, data, filename, subdir='data'):
"""Save results to CSV file."""
output_path = self.output_dir / subdir / filename
if isinstance(data, pd.DataFrame):
data.to_csv(output_path, index=False)
else:
pd.DataFrame(data).to_csv(output_path)
print(f" Saved: {output_path}")
def save_plot(self, filename):
"""Save current matplotlib figure."""
output_path = self.output_dir / 'plots' / filename
plt.tight_layout()
plt.savefig(output_path, dpi=300, bbox_inches='tight')
print(f" Saved: {output_path}")
plt.close()
# ========== CLUSTERING ANALYSES ==========
def kmeans_clustering(self, n_clusters_range=(2, 10)):
"""Perform K-means clustering with elbow method."""
print("\n=== K-Means Clustering ===")
# Elbow method
inertias = []
silhouettes = []
k_range = range(n_clusters_range[0], n_clusters_range[1] + 1)
for k in k_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = kmeans.fit_predict(self.scaled_data)
inertias.append(kmeans.inertia_)
if k > 1:
silhouettes.append(silhouette_score(self.scaled_data, labels))
else:
silhouettes.append(0)
# Plot elbow curve
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
ax1.plot(k_range, inertias, 'bo-')
ax1.set_xlabel('Number of Clusters (k)')
ax1.set_ylabel('Inertia')
ax1.set_title('Elbow Method for Optimal k')
ax1.grid(True, alpha=0.3)
ax2.plot(k_range, silhouettes, 'ro-')
ax2.set_xlabel('Number of Clusters (k)')
ax2.set_ylabel('Silhouette Score')
ax2.set_title('Silhouette Score by k')
ax2.grid(True, alpha=0.3)
self.save_plot('kmeans_elbow.png')
# Use optimal k (highest silhouette)
optimal_k = k_range[np.argmax(silhouettes)]
print(f"Optimal k by silhouette score: {optimal_k}")
# Final clustering
kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10)
self.df['kmeans_cluster'] = kmeans.fit_predict(self.scaled_data)
# Save results
results = self.df[['Descriptor', 'kmeans_cluster']].copy()
results['cluster'] = results['kmeans_cluster'] + 1 # 1-indexed for readability
self.save_results(results[['Descriptor', 'cluster']], 'kmeans_clusters.csv')
# Cluster statistics
print(f"\nCluster sizes:")
print(self.df['kmeans_cluster'].value_counts().sort_index())
return self.df['kmeans_cluster']
def hierarchical_clustering(self, n_clusters=5, method='ward'):
"""Perform hierarchical clustering with dendrogram."""
print("\n=== Hierarchical Clustering ===")
# Compute linkage
Z = linkage(self.scaled_data, method=method)
# Plot dendrogram
plt.figure(figsize=(16, 8))
dendrogram(Z, labels=self.df['Descriptor'].values, leaf_font_size=8)
plt.title(f'Hierarchical Clustering Dendrogram ({method} linkage)')
plt.xlabel('Protocol')
plt.ylabel('Distance')
plt.xticks(rotation=90)
self.save_plot('hierarchical_dendrogram.png')
# Cut tree to get clusters
self.df['hierarchical_cluster'] = fcluster(Z, n_clusters, criterion='maxclust')
# Save results
results = self.df[['Descriptor', 'hierarchical_cluster']].copy()
results.columns = ['Descriptor', 'cluster']
self.save_results(results, 'hierarchical_clusters.csv')
print(f"\nCluster sizes:")
print(self.df['hierarchical_cluster'].value_counts().sort_index())
return self.df['hierarchical_cluster']
def dbscan_clustering(self, eps=3.0, min_samples=3):
"""Perform DBSCAN clustering to identify outliers."""
print("\n=== DBSCAN Clustering ===")
dbscan = DBSCAN(eps=eps, min_samples=min_samples)
self.df['dbscan_cluster'] = dbscan.fit_predict(self.scaled_data)
n_clusters = len(set(self.df['dbscan_cluster'])) - (1 if -1 in self.df['dbscan_cluster'] else 0)
n_outliers = (self.df['dbscan_cluster'] == -1).sum()
print(f"Found {n_clusters} clusters and {n_outliers} outliers")
# Save results
results = self.df[['Descriptor', 'dbscan_cluster']].copy()
results.columns = ['Descriptor', 'cluster']
self.save_results(results, 'dbscan_clusters.csv')
if n_outliers > 0:
outliers = self.df[self.df['dbscan_cluster'] == -1][['Descriptor']]
self.save_results(outliers, 'dbscan_outliers.csv')
print("\nOutlier protocols:")
for protocol in outliers['Descriptor']:
print(f" - {protocol}")
return self.df['dbscan_cluster']
# ========== DIMENSIONALITY REDUCTION ==========
def pca_analysis(self, n_components=None):
"""Perform PCA and visualize results."""
print("\n=== Principal Component Analysis ===")
# Fit PCA
if n_components is None:
pca = PCA()
else:
pca = PCA(n_components=n_components)
pca_coords = pca.fit_transform(self.scaled_data)
# Explained variance
explained_var = pca.explained_variance_ratio_
cumsum_var = np.cumsum(explained_var)
print(f"First 5 PCs explain {cumsum_var[4]*100:.1f}% of variance")
# Plot explained variance
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
n_show = min(15, len(explained_var))
ax1.bar(range(1, n_show + 1), explained_var[:n_show])
ax1.set_xlabel('Principal Component')
ax1.set_ylabel('Explained Variance Ratio')
ax1.set_title('Variance Explained by Each PC')
ax1.grid(True, alpha=0.3, axis='y')
ax2.plot(range(1, n_show + 1), cumsum_var[:n_show], 'o-')
ax2.axhline(y=0.8, color='r', linestyle='--', alpha=0.5, label='80% threshold')
ax2.set_xlabel('Number of Components')
ax2.set_ylabel('Cumulative Explained Variance')
ax2.set_title('Cumulative Variance Explained')
ax2.legend()
ax2.grid(True, alpha=0.3)
self.save_plot('pca_variance.png')
# 2D visualization
plt.figure(figsize=(12, 10))
plt.scatter(pca_coords[:, 0], pca_coords[:, 1], alpha=0.6, s=50)
# Annotate points
for i, protocol in enumerate(self.df['Descriptor']):
if i % 3 == 0: # Label every 3rd point to avoid clutter
plt.annotate(protocol, (pca_coords[i, 0], pca_coords[i, 1]),
fontsize=6, alpha=0.7)
plt.xlabel(f'PC1 ({explained_var[0]*100:.1f}% variance)')
plt.ylabel(f'PC2 ({explained_var[1]*100:.1f}% variance)')
plt.title('Protocols in PCA Space (First 2 Components)')
plt.grid(True, alpha=0.3)
self.save_plot('pca_2d.png')
# Save PCA coordinates
pca_df = pd.DataFrame(pca_coords[:, :5],
columns=[f'PC{i+1}' for i in range(min(5, pca_coords.shape[1]))])
pca_df.insert(0, 'Descriptor', self.df['Descriptor'])
self.save_results(pca_df, 'pca_coordinates.csv')
# Component loadings
loadings = pd.DataFrame(
pca.components_[:5, :].T,
columns=[f'PC{i+1}' for i in range(min(5, pca.components_.shape[0]))],
index=self.dimension_cols
)
self.save_results(loadings, 'pca_loadings.csv')
# Plot loadings heatmap
plt.figure(figsize=(10, 12))
sns.heatmap(loadings, cmap='RdBu_r', center=0, cbar_kws={'label': 'Loading'})
plt.title('PCA Component Loadings')
plt.tight_layout()
self.save_plot('pca_loadings_heatmap.png')
return pca_coords, pca
def tsne_analysis(self, perplexity=30, n_components=2):
"""Perform t-SNE analysis."""
print("\n=== t-SNE Analysis ===")
tsne = TSNE(n_components=n_components, perplexity=perplexity, random_state=42, max_iter=1000)
tsne_coords = tsne.fit_transform(self.scaled_data)
# Plot
plt.figure(figsize=(12, 10))
plt.scatter(tsne_coords[:, 0], tsne_coords[:, 1], alpha=0.6, s=50)
# Annotate some points
for i, protocol in enumerate(self.df['Descriptor']):
if i % 4 == 0: # Label every 4th point
plt.annotate(protocol, (tsne_coords[i, 0], tsne_coords[i, 1]),
fontsize=6, alpha=0.7)
plt.xlabel('t-SNE Dimension 1')
plt.ylabel('t-SNE Dimension 2')
plt.title(f't-SNE Projection (perplexity={perplexity})')
plt.grid(True, alpha=0.3)
self.save_plot('tsne_2d.png')
# Save coordinates
tsne_df = pd.DataFrame(tsne_coords, columns=['TSNE1', 'TSNE2'])
tsne_df.insert(0, 'Descriptor', self.df['Descriptor'])
self.save_results(tsne_df, 'tsne_coordinates.csv')
return tsne_coords
def umap_analysis(self, n_neighbors=15, min_dist=0.1, n_components=2):
"""Perform UMAP analysis if available."""
if not UMAP_AVAILABLE:
print("\n=== UMAP Analysis ===")
print("UMAP not available. Install with: pip install umap-learn")
return None
print("\n=== UMAP Analysis ===")
reducer = umap.UMAP(n_neighbors=n_neighbors, min_dist=min_dist,
n_components=n_components, random_state=42)
umap_coords = reducer.fit_transform(self.scaled_data)
# Plot
plt.figure(figsize=(12, 10))
plt.scatter(umap_coords[:, 0], umap_coords[:, 1], alpha=0.6, s=50)
# Annotate some points
for i, protocol in enumerate(self.df['Descriptor']):
if i % 4 == 0:
plt.annotate(protocol, (umap_coords[i, 0], umap_coords[i, 1]),
fontsize=6, alpha=0.7)
plt.xlabel('UMAP Dimension 1')
plt.ylabel('UMAP Dimension 2')
plt.title(f'UMAP Projection (n_neighbors={n_neighbors}, min_dist={min_dist})')
plt.grid(True, alpha=0.3)
self.save_plot('umap_2d.png')
# Save coordinates
umap_df = pd.DataFrame(umap_coords, columns=['UMAP1', 'UMAP2'])
umap_df.insert(0, 'Descriptor', self.df['Descriptor'])
self.save_results(umap_df, 'umap_coordinates.csv')
return umap_coords
def factor_analysis(self, n_factors=5):
"""Perform factor analysis."""
print("\n=== Factor Analysis ===")
fa = FactorAnalysis(n_components=n_factors, random_state=42)
fa_coords = fa.fit_transform(self.scaled_data)
# Factor loadings
loadings = pd.DataFrame(
fa.components_.T,
columns=[f'Factor{i+1}' for i in range(n_factors)],
index=self.dimension_cols
)
self.save_results(loadings, 'factor_loadings.csv')
# Plot loadings heatmap
plt.figure(figsize=(10, 12))
sns.heatmap(loadings, cmap='RdBu_r', center=0, cbar_kws={'label': 'Loading'})
plt.title('Factor Analysis Loadings')
plt.tight_layout()
self.save_plot('factor_loadings_heatmap.png')
# Save factor scores
fa_df = pd.DataFrame(fa_coords,
columns=[f'Factor{i+1}' for i in range(n_factors)])
fa_df.insert(0, 'Descriptor', self.df['Descriptor'])
self.save_results(fa_df, 'factor_scores.csv')
return fa_coords, fa
# ========== CORRELATION & STRUCTURE ==========
def correlation_analysis(self):
"""Compute and visualize correlation matrices."""
print("\n=== Correlation Analysis ===")
# Full correlation matrix
corr_matrix = self.df[self.dimension_cols].corr()
# Plot full correlation heatmap
plt.figure(figsize=(16, 14))
sns.heatmap(corr_matrix, cmap='RdBu_r', center=0,
square=True, linewidths=0.5, cbar_kws={'label': 'Correlation'})
plt.title('Correlation Matrix - All Dimensions')
plt.tight_layout()
self.save_plot('correlation_heatmap_full.png')
# Save correlation matrix
self.save_results(corr_matrix, 'correlation_matrix.csv')
# Find strongest correlations
corr_pairs = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
corr_pairs.append({
'Dimension1': corr_matrix.columns[i],
'Dimension2': corr_matrix.columns[j],
'Correlation': corr_matrix.iloc[i, j]
})
corr_df = pd.DataFrame(corr_pairs).sort_values('Correlation',
key=abs,
ascending=False)
self.save_results(corr_df.head(20), 'top_correlations.csv')
print("\nTop 5 positive correlations:")
for _, row in corr_df.head(5).iterrows():
print(f" {row['Dimension1']} <-> {row['Dimension2']}: {row['Correlation']:.3f}")
print("\nTop 5 negative correlations:")
for _, row in corr_df.tail(5).iterrows():
print(f" {row['Dimension1']} <-> {row['Dimension2']}: {row['Correlation']:.3f}")
# Within-category correlations
self._plot_category_correlation('Design', self.design_cols)
self._plot_category_correlation('Entanglement', self.entanglement_cols)
self._plot_category_correlation('Experience', self.experience_cols)
return corr_matrix
def _plot_category_correlation(self, category_name, columns):
"""Plot correlation heatmap for a specific category."""
corr = self.df[columns].corr()
plt.figure(figsize=(10, 8))
sns.heatmap(corr, annot=True, fmt='.2f', cmap='RdBu_r', center=0,
square=True, linewidths=0.5, cbar_kws={'label': 'Correlation'})
plt.title(f'{category_name} Dimensions - Correlation Matrix')
plt.tight_layout()
self.save_plot(f'correlation_heatmap_{category_name.lower()}.png')
def network_analysis(self, threshold=0.5):
"""Create network graph of protocol similarities."""
print("\n=== Network Analysis ===")
# Compute pairwise distances
distances = pdist(self.scaled_data, metric='euclidean')
dist_matrix = squareform(distances)
# Convert to similarity (inverse of distance, normalized)
max_dist = dist_matrix.max()
similarity_matrix = 1 - (dist_matrix / max_dist)
# Create network
G = nx.Graph()
# Add nodes
for i, protocol in enumerate(self.df['Descriptor']):
G.add_node(i, label=protocol)
# Add edges above threshold
edge_count = 0
for i in range(len(similarity_matrix)):
for j in range(i+1, len(similarity_matrix)):
if similarity_matrix[i, j] > threshold:
G.add_edge(i, j, weight=similarity_matrix[i, j])
edge_count += 1
print(f"Network with {G.number_of_nodes()} nodes and {edge_count} edges")
# Calculate network metrics
if G.number_of_edges() > 0:
degree_centrality = nx.degree_centrality(G)
betweenness = nx.betweenness_centrality(G)
metrics_df = pd.DataFrame({
'Descriptor': [self.df.iloc[i]['Descriptor'] for i in G.nodes()],
'Degree_Centrality': [degree_centrality[i] for i in G.nodes()],
'Betweenness_Centrality': [betweenness[i] for i in G.nodes()]
}).sort_values('Degree_Centrality', ascending=False)
self.save_results(metrics_df, 'network_metrics.csv')
print("\nTop 5 most central protocols:")
for _, row in metrics_df.head(5).iterrows():
print(f" {row['Descriptor']}: {row['Degree_Centrality']:.3f}")
# Plot network
plt.figure(figsize=(16, 16))
pos = nx.spring_layout(G, k=0.5, iterations=50, seed=42)
# Node sizes based on degree centrality
node_sizes = [degree_centrality[i] * 3000 + 100 for i in G.nodes()]
nx.draw_networkx_nodes(G, pos, node_size=node_sizes,
node_color='lightblue', alpha=0.7)
nx.draw_networkx_edges(G, pos, alpha=0.2)
# Labels for high-centrality nodes
high_centrality = {i: self.df.iloc[i]['Descriptor']
for i in G.nodes() if degree_centrality[i] > 0.1}
nx.draw_networkx_labels(G, pos, labels=high_centrality, font_size=8)
plt.title(f'Protocol Similarity Network (threshold={threshold})')
plt.axis('off')
plt.tight_layout()
self.save_plot('network_graph.png')
else:
print("No edges above threshold - try lowering the threshold")
return G
# ========== CLASSIFICATION & PREDICTION ==========
def category_discriminant_analysis(self):
"""Analyze how well dimension categories discriminate protocols."""
print("\n=== Category Discriminant Analysis ===")
results = []
for category_name, columns in [('Design', self.design_cols),
('Entanglement', self.entanglement_cols),
('Experience', self.experience_cols)]:
# Use one category to predict clustering from another
X = self.df[columns].values
# Use kmeans clusters as target if available
if 'kmeans_cluster' in self.df.columns:
y = self.df['kmeans_cluster'].values
# LDA
try:
lda = LinearDiscriminantAnalysis()
lda.fit(X, y)
score = lda.score(X, y)
results.append({
'Category': category_name,
'Accuracy': score,
'N_Dimensions': len(columns)
})
print(f"{category_name} dimensions predict clusters with {score*100:.1f}% accuracy")
except:
print(f"Could not perform LDA for {category_name}")
if results:
results_df = pd.DataFrame(results)
self.save_results(results_df, 'category_discriminant_results.csv')
return results
def feature_importance_analysis(self):
"""Analyze which dimensions are most important for clustering."""
print("\n=== Feature Importance Analysis ===")
if 'kmeans_cluster' not in self.df.columns:
print("Run clustering first to enable feature importance analysis")
return None
# Random Forest classifier
X = self.df[self.dimension_cols].values
y = self.df['kmeans_cluster'].values
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X, y)
# Feature importances
importances = pd.DataFrame({
'Dimension': self.dimension_cols,
'Importance': rf.feature_importances_
}).sort_values('Importance', ascending=False)
self.save_results(importances, 'feature_importances.csv')
# Plot top 20
plt.figure(figsize=(10, 12))
top_20 = importances.head(20)
plt.barh(range(len(top_20)), top_20['Importance'])
plt.yticks(range(len(top_20)), top_20['Dimension'])
plt.xlabel('Importance')
plt.title('Top 20 Most Important Dimensions for Clustering')
plt.gca().invert_yaxis()
plt.tight_layout()
self.save_plot('feature_importances.png')
print("\nTop 10 most important dimensions:")
for _, row in importances.head(10).iterrows():
print(f" {row['Dimension']}: {row['Importance']:.4f}")
return importances
def analyst_comparison(self):
"""Compare ratings across different analysts."""
print("\n=== Analyst Comparison ===")
if 'analyst' not in self.df.columns:
print("No analyst column found")
return None
analysts = self.df['analyst'].unique()
print(f"Found {len(analysts)} unique analysts")
# Mean ratings by analyst for each dimension
analyst_means = self.df.groupby('analyst')[self.dimension_cols].mean()
self.save_results(analyst_means, 'analyst_mean_ratings.csv')
# Plot comparison
fig, axes = plt.subplots(3, 1, figsize=(14, 12))
for idx, (category_name, columns) in enumerate([
('Design', self.design_cols),
('Entanglement', self.entanglement_cols),
('Experience', self.experience_cols)
]):
analyst_means[columns].T.plot(ax=axes[idx], marker='o')
axes[idx].set_title(f'{category_name} Dimensions - Mean Ratings by Analyst')
axes[idx].set_ylabel('Mean Rating')
axes[idx].legend(title='Analyst', bbox_to_anchor=(1.05, 1), loc='upper left')
axes[idx].grid(True, alpha=0.3)
axes[idx].set_xticklabels(axes[idx].get_xticklabels(), rotation=45, ha='right')
plt.tight_layout()
self.save_plot('analyst_comparison.png')
return analyst_means
# ========== SUMMARY REPORT ==========
def generate_summary_report(self):
"""Generate a text summary of all analyses."""
print("\n=== Generating Summary Report ===")
report_lines = []
report_lines.append("=" * 80)
report_lines.append("MULTIVARIATE ANALYSIS SUMMARY REPORT")
report_lines.append("Protocol Bicorder Dataset")
report_lines.append("=" * 80)
report_lines.append("")
report_lines.append(f"Dataset: {self.csv_path}")
report_lines.append(f"Number of protocols: {len(self.df)}")
report_lines.append(f"Number of dimensions: {len(self.dimension_cols)}")
report_lines.append(f" - Design: {len(self.design_cols)}")
report_lines.append(f" - Entanglement: {len(self.entanglement_cols)}")
report_lines.append(f" - Experience: {len(self.experience_cols)}")
report_lines.append("")
report_lines.append("-" * 80)
report_lines.append("ANALYSES PERFORMED")
report_lines.append("-" * 80)
# Check which analyses were run
analyses_run = []
if 'kmeans_cluster' in self.df.columns:
analyses_run.append("- K-Means Clustering")
report_lines.append(f"K-Means: {len(self.df['kmeans_cluster'].unique())} clusters identified")
if 'hierarchical_cluster' in self.df.columns:
analyses_run.append("- Hierarchical Clustering")
report_lines.append(f"Hierarchical: {len(self.df['hierarchical_cluster'].unique())} clusters")
if 'dbscan_cluster' in self.df.columns:
analyses_run.append("- DBSCAN Clustering")
n_outliers = (self.df['dbscan_cluster'] == -1).sum()
report_lines.append(f"DBSCAN: {n_outliers} outlier protocols identified")
report_lines.append("")
report_lines.append("Dimensionality Reduction:")
report_lines.append("- Principal Component Analysis (PCA)")
report_lines.append("- t-SNE Projection")
if UMAP_AVAILABLE:
report_lines.append("- UMAP Projection")
report_lines.append("- Factor Analysis")
report_lines.append("")
report_lines.append("Statistical Analyses:")
report_lines.append("- Correlation Analysis")
report_lines.append("- Network Analysis")
report_lines.append("- Feature Importance Analysis")
if 'analyst' in self.df.columns:
report_lines.append("- Analyst Comparison")
report_lines.append("")
report_lines.append("-" * 80)
report_lines.append("OUTPUT FILES")
report_lines.append("-" * 80)
report_lines.append(f"All results saved to: {self.output_dir}/")
report_lines.append(" - plots/ : All visualizations (PNG)")
report_lines.append(" - data/ : All numerical results (CSV)")
report_lines.append(" - reports/ : This summary report")
report_lines.append("")
report_lines.append("=" * 80)
report_lines.append("END OF REPORT")
report_lines.append("=" * 80)
report_text = "\n".join(report_lines)
# Save report
report_path = self.output_dir / 'reports' / 'analysis_summary.txt'
with open(report_path, 'w') as f:
f.write(report_text)
print(f" Saved: {report_path}")
print("\n" + report_text)
return report_text
def main():
"""Main execution function."""
parser = argparse.ArgumentParser(
description='Multivariate analysis of Protocol Bicorder data',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 multivariate_analysis.py diagnostic_output.csv
python3 multivariate_analysis.py diagnostic_output.csv --output results/
python3 multivariate_analysis.py diagnostic_output.csv --analyses clustering pca
"""
)
parser.add_argument('csv_file', help='Input CSV file with protocol diagnostics')
parser.add_argument('--output', '-o', default='analysis_results',
help='Output directory (default: analysis_results)')
parser.add_argument('--analyses', nargs='+',
choices=['clustering', 'pca', 'tsne', 'umap', 'factor',
'correlation', 'network', 'importance', 'analyst', 'all'],
default=['all'],
help='Which analyses to run (default: all)')
args = parser.parse_args()
# Check if file exists
if not Path(args.csv_file).exists():
print(f"Error: File not found: {args.csv_file}")
sys.exit(1)
# Initialize analyzer
print("=" * 80)
print("PROTOCOL BICORDER - MULTIVARIATE ANALYSIS")
print("=" * 80)
analyzer = ProtocolAnalyzer(args.csv_file, args.output)
# Determine which analyses to run
run_all = 'all' in args.analyses
# Run analyses
try:
# Clustering
if run_all or 'clustering' in args.analyses:
analyzer.kmeans_clustering()
analyzer.hierarchical_clustering()
analyzer.dbscan_clustering()
# Dimensionality reduction
if run_all or 'pca' in args.analyses:
analyzer.pca_analysis()
if run_all or 'tsne' in args.analyses:
analyzer.tsne_analysis()
if run_all or 'umap' in args.analyses:
analyzer.umap_analysis()
if run_all or 'factor' in args.analyses:
analyzer.factor_analysis()
# Correlation and structure
if run_all or 'correlation' in args.analyses:
analyzer.correlation_analysis()
if run_all or 'network' in args.analyses:
analyzer.network_analysis(threshold=0.6)
# Classification
if run_all or 'importance' in args.analyses:
analyzer.category_discriminant_analysis()
analyzer.feature_importance_analysis()
if run_all or 'analyst' in args.analyses:
analyzer.analyst_comparison()
# Generate summary
analyzer.generate_summary_report()
print("\n" + "=" * 80)
print("ANALYSIS COMPLETE!")
print("=" * 80)
print(f"\nAll results saved to: {analyzer.output_dir}/")
except Exception as e:
print(f"\nError during analysis: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()