#!/usr/bin/env python3 import os import sys import json import glob import argparse from collections import Counter from datetime import datetime def analyze_patient_data(disease_name, input_dir, output_dir, format_type="html"): print(f"Analyzing patient data for {disease_name}...") # Create the output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Find all patient JSON files patients_files = glob.glob(f"{input_dir}/*.json") patients_files = [f for f in patients_files if not 'hospitalInformation' in f and not 'practitionerInformation' in f] print(f"Found {len(patients_files)} patient records for analysis") if len(patients_files) == 0: print("No patient files found to analyze.") with open(os.path.join(output_dir, f"{disease_name.lower().replace(' ', '_')}_report.html"), 'w') as f: f.write(f"

Analysis Report for {disease_name}

No patient files found to analyze.

") # Create empty CSV and JSON files with open(os.path.join(output_dir, f"{disease_name.lower().replace(' ', '_')}_report.csv"), 'w') as f: f.write("No patient files found to analyze.\n") with open(os.path.join(output_dir, f"{disease_name.lower().replace(' ', '_')}_report.json"), 'w') as f: f.write('{"error": "No patient files found to analyze."}\n') return # Initialize data collectors demographics = {'gender': Counter(), 'age': [], 'race': Counter(), 'ethnicity': Counter()} condition_counts = Counter() medication_counts = Counter() # Process each patient file for patient_file in patients_files: try: with open(patient_file, 'r') as f: data = json.load(f) # Skip non-patient resources if 'resourceType' in data and data['resourceType'] == 'Patient': # Basic patient info if 'gender' in data: demographics['gender'][data['gender']] += 1 if 'birthDate' in data: # Calculate age based on birth year birth_year = int(data['birthDate'][:4]) current_year = datetime.now().year age = current_year - birth_year demographics['age'].append(age) # Process race and ethnicity extensions if 'extension' in data: for ext in data.get('extension', []): if 'url' in ext and 'extension' in ext: if ext['url'].endswith('us-core-race'): for race_ext in ext['extension']: if 'valueCoding' in race_ext: race = race_ext['valueCoding'].get('display', 'Unknown') demographics['race'][race] += 1 elif ext['url'].endswith('us-core-ethnicity'): for eth_ext in ext['extension']: if 'valueCoding' in eth_ext: ethnicity = eth_ext['valueCoding'].get('display', 'Unknown') demographics['ethnicity'][ethnicity] += 1 # Check for Bundle resources with entries if 'resourceType' in data and data['resourceType'] == 'Bundle' and 'entry' in data: bundle_has_patient = False for entry in data['entry']: if 'resource' in entry: resource = entry['resource'] # Check if this bundle contains a patient if resource.get('resourceType') == 'Patient': bundle_has_patient = True # Basic patient info if 'gender' in resource: demographics['gender'][resource['gender']] += 1 if 'birthDate' in resource: # Calculate age based on birth year birth_year = int(resource['birthDate'][:4]) current_year = datetime.now().year age = current_year - birth_year demographics['age'].append(age) # Process race and ethnicity extensions if 'extension' in resource: for ext in resource.get('extension', []): if 'url' in ext and 'extension' in ext: if ext['url'].endswith('us-core-race'): for race_ext in ext['extension']: if 'valueCoding' in race_ext: race = race_ext['valueCoding'].get('display', 'Unknown') demographics['race'][race] += 1 elif ext['url'].endswith('us-core-ethnicity'): for eth_ext in ext['extension']: if 'valueCoding' in eth_ext: ethnicity = eth_ext['valueCoding'].get('display', 'Unknown') demographics['ethnicity'][ethnicity] += 1 # Check for conditions if resource.get('resourceType') == 'Condition': if 'code' in resource and 'coding' in resource['code']: for code in resource['code']['coding']: if 'display' in code: condition_counts[code['display']] += 1 # Check for medications if resource.get('resourceType') == 'MedicationRequest': if 'medicationCodeableConcept' in resource and 'coding' in resource['medicationCodeableConcept']: for code in resource['medicationCodeableConcept']['coding']: if 'display' in code: medication_counts[code['display']] += 1 except Exception as e: print(f"Error processing {patient_file}: {e}") # Calculate total patients (count unique patient files) total_patients = sum(demographics['gender'].values()) if total_patients == 0: print("Warning: No patient demographics found. Setting total_patients to file count.") total_patients = len(patients_files) print(f"Total patients found: {total_patients}") print(f"Gender distribution: {dict(demographics['gender'])}") if total_patients == 0: total_patients = 1 # Avoid division by zero # Generate HTML report if format_type.lower() in ["html", "all"]: create_html_report(disease_name, output_dir, demographics, condition_counts, medication_counts, total_patients) # Generate CSV report if format_type.lower() in ["csv", "all"]: create_csv_report(disease_name, output_dir, demographics, condition_counts, medication_counts, total_patients) # Generate JSON report if format_type.lower() in ["json", "all"]: create_json_report(disease_name, output_dir, demographics, condition_counts, medication_counts, total_patients) print(f"Analysis complete. Reports generated in {output_dir}") def create_html_report(disease_name, output_dir, demographics, condition_counts, medication_counts, total_patients): with open(os.path.join(output_dir, f"{disease_name.lower().replace(' ', '_')}_report.html"), 'w') as f: f.write(f''' Synthea Patient Analysis - {disease_name}

Synthea Patient Analysis - {disease_name}

Total patients analyzed: {total_patients}

Demographics

Gender Distribution

''') for gender, count in demographics['gender'].items(): percentage = (count / total_patients) * 100 f.write(f"\n") f.write('''
GenderCountPercentage
{gender}{count}{percentage:.1f}%

Age Statistics

''') if demographics['age']: min_age = min(demographics['age']) max_age = max(demographics['age']) avg_age = sum(demographics['age']) / len(demographics['age']) f.write(f"\n") f.write(f"\n") f.write(f"\n") else: f.write("\n") f.write('''
Minimum Age{min_age}
Maximum Age{max_age}
Average Age{avg_age:.1f}
No age data available

Top Conditions

''') for condition, count in sorted(condition_counts.items(), key=lambda x: x[1], reverse=True)[:15]: percentage = (count / total_patients) * 100 f.write(f"\n") f.write('''
ConditionCountPercentage of Patients
{condition}{count}{percentage:.1f}%

Top Medications

''') for medication, count in sorted(medication_counts.items(), key=lambda x: x[1], reverse=True)[:15]: percentage = (count / total_patients) * 100 f.write(f"\n") f.write('''
MedicationCountPercentage of Patients
{medication}{count}{percentage:.1f}%
''') def create_csv_report(disease_name, output_dir, demographics, condition_counts, medication_counts, total_patients): with open(os.path.join(output_dir, f"{disease_name.lower().replace(' ', '_')}_report.csv"), 'w') as f: # Write header f.write(f"Synthea Patient Analysis - {disease_name}\n") f.write(f"Total patients analyzed,{total_patients}\n\n") # Gender distribution f.write("Gender Distribution\n") f.write("Gender,Count,Percentage\n") for gender, count in demographics['gender'].items(): percentage = (count / total_patients) * 100 f.write(f"{gender},{count},{percentage:.1f}%\n") f.write("\n") # Age statistics f.write("Age Statistics\n") if demographics['age']: min_age = min(demographics['age']) max_age = max(demographics['age']) avg_age = sum(demographics['age']) / len(demographics['age']) f.write(f"Minimum Age,{min_age}\n") f.write(f"Maximum Age,{max_age}\n") f.write(f"Average Age,{avg_age:.1f}\n") else: f.write("No age data available\n") f.write("\n") # Top conditions f.write("Top Conditions\n") f.write("Condition,Count,Percentage of Patients\n") for condition, count in sorted(condition_counts.items(), key=lambda x: x[1], reverse=True)[:15]: percentage = (count / total_patients) * 100 f.write(f"{condition},{count},{percentage:.1f}%\n") f.write("\n") # Top medications f.write("Top Medications\n") f.write("Medication,Count,Percentage of Patients\n") for medication, count in sorted(medication_counts.items(), key=lambda x: x[1], reverse=True)[:15]: percentage = (count / total_patients) * 100 f.write(f"{medication},{count},{percentage:.1f}%\n") def create_json_report(disease_name, output_dir, demographics, condition_counts, medication_counts, total_patients): # Prepare the report data report_data = { "disease": disease_name, "total_patients": total_patients, "demographics": { "gender": {k: v for k, v in demographics['gender'].items()}, "race": {k: v for k, v in demographics['race'].items()}, "ethnicity": {k: v for k, v in demographics['ethnicity'].items()} }, "age_statistics": {} } if demographics['age']: report_data["age_statistics"] = { "min_age": min(demographics['age']), "max_age": max(demographics['age']), "avg_age": sum(demographics['age']) / len(demographics['age']) } # Add top conditions report_data["top_conditions"] = [ {"name": condition, "count": count, "percentage": (count / total_patients) * 100} for condition, count in sorted(condition_counts.items(), key=lambda x: x[1], reverse=True)[:15] ] # Add top medications report_data["top_medications"] = [ {"name": medication, "count": count, "percentage": (count / total_patients) * 100} for medication, count in sorted(medication_counts.items(), key=lambda x: x[1], reverse=True)[:15] ] # Write to JSON file with open(os.path.join(output_dir, f"{disease_name.lower().replace(' ', '_')}_report.json"), 'w') as f: json.dump(report_data, f, indent=2) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Analyze Synthea patient data") parser.add_argument("--disease", required=True, help="Disease name") parser.add_argument("--input_dir", required=True, help="Input directory with FHIR files") parser.add_argument("--output_dir", default=".", help="Output directory for reports") parser.add_argument("--format", default="html", choices=["html", "csv", "json", "all"], help="Output format (html, csv, json, or all)") args = parser.parse_args() analyze_patient_data(args.disease, args.input_dir, args.output_dir, args.format)