#!/usr/bin/env python3 import os import sys import json import glob import pandas as pd from collections import Counter from datetime import datetime def analyze_patient_data(input_dir, output_dir, report_format='html', disease_name=None): """ Analyze Synthea-generated patient data and create reports. Args: input_dir: Directory containing patient JSON files output_dir: Directory to save analysis outputs report_format: Format for the report (html or csv) disease_name: Optional name of the disease being simulated """ # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) # Find all patient JSON files patients_files = glob.glob(f"{input_dir}/**/*.json", recursive=True) print(f"Found {len(patients_files)} patient records for analysis") if len(patients_files) == 0: print("No patient files found to analyze.") with open(os.path.join(output_dir, 'patient_stats.json'), 'w') as f: json.dump({"error": "No patient files found to analyze"}, f) return # Initialize data collectors patient_data = [] condition_counts = Counter() medication_counts = Counter() demographics = {'gender': Counter(), 'age': [], 'race': Counter(), 'ethnicity': Counter()} # Process each patient file for patient_file in patients_files: try: with open(patient_file, 'r') as f: data = json.load(f) # Basic patient info if 'gender' in data: demographics['gender'][data['gender']] += 1 if 'birthDate' in data: # Calculate age based on birth year birth_year = int(data['birthDate'][:4]) current_year = datetime.now().year age = current_year - birth_year demographics['age'].append(age) # Process race and ethnicity extensions if 'extension' in data: for ext in data['extension']: if 'url' in ext and 'extension' in ext: if ext['url'].endswith('us-core-race'): for race_ext in ext['extension']: if 'valueCoding' in race_ext: race = race_ext['valueCoding'].get('display', 'Unknown') demographics['race'][race] += 1 elif ext['url'].endswith('us-core-ethnicity'): for eth_ext in ext['extension']: if 'valueCoding' in eth_ext: ethnicity = eth_ext['valueCoding'].get('display', 'Unknown') demographics['ethnicity'][ethnicity] += 1 # Collect conditions and medications if 'entry' in data: for entry in data['entry']: if 'resource' in entry: resource = entry['resource'] # Check for conditions if resource.get('resourceType') == 'Condition': if 'code' in resource and 'coding' in resource['code']: for code in resource['code']['coding']: if 'display' in code: condition_counts[code['display']] += 1 # Check for medications if resource.get('resourceType') == 'MedicationRequest': if 'medicationCodeableConcept' in resource and 'coding' in resource['medicationCodeableConcept']: for code in resource['medicationCodeableConcept']['coding']: if 'display' in code: medication_counts[code['display']] += 1 except Exception as e: print(f"Error processing {patient_file}: {e}") # Prepare statistics stats = { 'total_patients': len(patients_files), 'disease_name': disease_name, 'demographics': { 'gender_distribution': {gender: count for gender, count in demographics['gender'].items()}, 'age_distribution': { 'min': min(demographics['age']) if demographics['age'] else None, 'max': max(demographics['age']) if demographics['age'] else None, 'average': sum(demographics['age']) / len(demographics['age']) if demographics['age'] else None, 'distribution': {'0-18': 0, '19-44': 0, '45-64': 0, '65+': 0} }, 'race_distribution': {race: count for race, count in demographics['race'].items()}, 'ethnicity_distribution': {ethnicity: count for ethnicity, count in demographics['ethnicity'].items()} }, 'disease_stats': { 'top_conditions': dict(condition_counts.most_common(15)), 'top_medications': dict(medication_counts.most_common(15)) } } # Calculate age distribution for age in demographics['age']: if age <= 18: stats['demographics']['age_distribution']['distribution']['0-18'] += 1 elif age <= 44: stats['demographics']['age_distribution']['distribution']['19-44'] += 1 elif age <= 64: stats['demographics']['age_distribution']['distribution']['45-64'] += 1 else: stats['demographics']['age_distribution']['distribution']['65+'] += 1 # Save statistics to file with open(os.path.join(output_dir, 'patient_stats.json'), 'w') as f: json.dump(stats, f, indent=2) # Generate report in requested format if report_format == 'html': generate_html_report(stats, output_dir) elif report_format == 'csv': generate_csv_reports(stats, output_dir) else: print(f"Unsupported report format: {report_format}. Only stats JSON file created.") print(f"Analysis complete. Reports generated in {os.path.abspath(output_dir)}") def generate_html_report(stats, output_dir): """Generate an HTML report from the patient statistics.""" disease_title = f" - {stats['disease_name']}" if stats['disease_name'] else "" html = f''' Synthea Patient Analysis{disease_title}

Synthea Patient Analysis{disease_title}

Total patients: {stats['total_patients']}

Demographics

Gender Distribution

''' for gender, count in stats['demographics']['gender_distribution'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 html += f"\n" html += '''
GenderCountPercentage
{gender}{count}{percentage:.1f}%

Age Distribution

''' for age_group, count in stats['demographics']['age_distribution']['distribution'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 html += f"\n" html += f'''
Age GroupCountPercentage
{age_group}{count}{percentage:.1f}%

Min Age: {stats['demographics']['age_distribution']['min']}

Max Age: {stats['demographics']['age_distribution']['max']}

Average Age: {stats['demographics']['age_distribution']['average']:.1f}

Race Distribution

''' for race, count in stats['demographics']['race_distribution'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 html += f"\n" html += '''
RaceCountPercentage
{race}{count}{percentage:.1f}%

Ethnicity Distribution

''' for ethnicity, count in stats['demographics']['ethnicity_distribution'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 html += f"\n" html += '''
EthnicityCountPercentage
{ethnicity}{count}{percentage:.1f}%

Disease Statistics

Top Conditions

''' for condition, count in stats['disease_stats']['top_conditions'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 html += f"\n" html += '''
ConditionCountPercentage
{condition}{count}{percentage:.1f}%

Top Medications

''' for medication, count in stats['disease_stats']['top_medications'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 html += f"\n" html += '''
MedicationCountPercentage of Patients
{medication}{count}{percentage:.1f}%
''' with open(os.path.join(output_dir, 'patient_analysis.html'), 'w') as f: f.write(html) def generate_csv_reports(stats, output_dir): """Generate CSV reports from the patient statistics.""" import csv # Demographics CSV with open(os.path.join(output_dir, 'demographics.csv'), 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['Category', 'Type', 'Count', 'Percentage']) # Gender for gender, count in stats['demographics']['gender_distribution'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 writer.writerow(['Gender', gender, count, f"{percentage:.1f}%"]) # Age for age_group, count in stats['demographics']['age_distribution']['distribution'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 writer.writerow(['Age', age_group, count, f"{percentage:.1f}%"]) # Race for race, count in stats['demographics']['race_distribution'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 writer.writerow(['Race', race, count, f"{percentage:.1f}%"]) # Ethnicity for ethnicity, count in stats['demographics']['ethnicity_distribution'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 writer.writerow(['Ethnicity', ethnicity, count, f"{percentage:.1f}%"]) # Conditions CSV with open(os.path.join(output_dir, 'conditions.csv'), 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['Condition', 'Count', 'Percentage']) for condition, count in stats['disease_stats']['top_conditions'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 writer.writerow([condition, count, f"{percentage:.1f}%"]) # Medications CSV with open(os.path.join(output_dir, 'medications.csv'), 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['Medication', 'Count', 'Percentage']) for medication, count in stats['disease_stats']['top_medications'].items(): percentage = (count / stats['total_patients']) * 100 if stats['total_patients'] > 0 else 0 writer.writerow([medication, count, f"{percentage:.1f}%"]) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Analyze Synthea patient data and generate reports") parser.add_argument("--input_dir", required=True, help="Directory containing patient JSON files") parser.add_argument("--output_dir", default="analysis_output", help="Directory to save analysis outputs") parser.add_argument("--report_format", default="html", choices=["html", "csv"], help="Format for the reports") parser.add_argument("--disease_name", help="Name of the disease being simulated") args = parser.parse_args() analyze_patient_data(args.input_dir, args.output_dir, args.report_format, args.disease_name)