Files
synthea-alldiseases/scripts/generate_batch.py

175 lines
6.4 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import sys
import csv
import time
import argparse
import subprocess
import concurrent.futures
import re
# Global variables
ARGS = None
def normalize_disease_name(name):
"""Convert a disease name to a normalized filename"""
# Convert to lowercase
name = name.lower()
# Replace special characters with underscores
name = re.sub(r'[^a-z0-9]+', '_', name)
# Remove leading/trailing underscores
name = name.strip('_')
# Ensure the name is not empty
if not name:
name = "unknown_disease"
return name
def process_disease(disease_entry):
"""Process a single disease from the CSV"""
disease_name = disease_entry.get("disease_name", "")
normalized_name = normalize_disease_name(disease_name)
icd10 = disease_entry.get("id", "")
category = disease_entry.get("disease_category", "")
print(f"\n{'='*80}")
print(f"Processing disease: {disease_name}")
print(f"ICD-10 code: {icd10}")
print(f"Category: {category}")
# Skip if module already exists (unless --force flag is used)
module_path = f"src/main/resources/modules/{normalized_name}.json"
if os.path.exists(module_path) and not ARGS.force:
print(f"✅ Module already exists at {module_path}, skipping")
return {"name": disease_name, "status": "skipped", "path": module_path}
# Create the command
cmd = ["python3", "generate_module.py", "--disease", disease_name, "--no-interactive"]
if icd10:
cmd.extend(["--icd10", icd10])
if category:
cmd.extend(["--category", category])
# Add auto-fallback option if requested
if ARGS.auto_fallback:
cmd.append("--auto-fallback")
# Run the generator
try:
print(f"Executing: {' '.join(cmd)}")
process = subprocess.run(cmd, check=True, text=True, capture_output=True)
print(f"✅ Successfully generated module for {disease_name}")
print(process.stdout.strip())
return {"name": disease_name, "status": "success", "path": module_path}
except subprocess.CalledProcessError as e:
print(f"❌ Failed to generate module for {disease_name}")
print(f"Error: {str(e)}")
print(f"STDOUT: {e.stdout}")
print(f"STDERR: {e.stderr}")
return {"name": disease_name, "status": "error", "error": str(e), "path": None}
def main():
"""Main function to process diseases from the CSV"""
global ARGS
parser = argparse.ArgumentParser(description='Generate Synthea modules for diseases')
parser.add_argument('--category', help='Only process diseases in this category')
parser.add_argument('--disease', help='Only process a specific disease (by name)')
parser.add_argument('--limit', type=int, help='Limit number of diseases to process')
parser.add_argument('--parallel', type=int, default=1, help='Number of parallel processes')
parser.add_argument('--skip-existing', action='store_true', help='Skip diseases that already have modules')
parser.add_argument('--csv-path', default='src/main/resources/disease_list.csv', help='Path to disease list CSV')
parser.add_argument('--force', action='store_true', help='Force generation even if module already exists')
parser.add_argument('--auto-fallback', action='store_true', help='Enable auto-fallback option')
args = parser.parse_args()
ARGS = args
# Read the disease list CSV
if not os.path.exists(args.csv_path):
print(f"Error: Disease list CSV not found at {args.csv_path}")
sys.exit(1)
print(f"Reading disease list from {args.csv_path}")
try:
with open(args.csv_path, 'r') as f:
reader = csv.DictReader(f)
diseases = list(reader)
except Exception as e:
print(f"Error reading CSV: {str(e)}")
sys.exit(1)
print(f"Found {len(diseases)} diseases in the CSV")
# Filter diseases
if args.category:
diseases = [d for d in diseases if (d.get('disease_category', '').lower() == args.category.lower())]
print(f"Filtered to {len(diseases)} diseases in category '{args.category}'")
if args.disease:
# Try to find an exact match first
disease_name_lower = args.disease.lower()
exact_match = [d for d in diseases if d.get("disease_name", "").lower() == disease_name_lower]
if exact_match:
diseases = exact_match
else:
# Try to find a disease that contains the specified name
partial_matches = [d for d in diseases if disease_name_lower in d.get("disease_name", "").lower()]
if partial_matches:
diseases = partial_matches
print(f"Found {len(diseases)} partial matches for '{args.disease}'")
else:
print(f"No matches found for disease '{args.disease}'")
sys.exit(1)
if args.limit and args.limit > 0:
diseases = diseases[:args.limit]
print(f"Limited to {args.limit} diseases")
# Process the diseases
total = len(diseases)
print(f"\nProcessing {total} diseases with {args.parallel} parallel workers")
start_time = time.time()
if args.parallel > 1:
with concurrent.futures.ProcessPoolExecutor(max_workers=args.parallel) as executor:
results = list(executor.map(process_disease, diseases))
else:
results = [process_disease(disease) for disease in diseases]
end_time = time.time()
elapsed = end_time - start_time
# Summarize results
success_count = sum(1 for r in results if r["status"] == "success")
skipped_count = sum(1 for r in results if r["status"] == "skipped")
error_count = sum(1 for r in results if r["status"] == "error")
print("\n" + "="*80)
print(f"SUMMARY: Processed {total} diseases in {elapsed:.2f} seconds")
print(f"- Successfully generated: {success_count}")
print(f"- Skipped (already exist): {skipped_count}")
print(f"- Failed: {error_count}")
# List errors if any
if error_count > 0:
print("\nFAILED DISEASES:")
for result in results:
if result["status"] == "error":
print(f"- {result['name']}: {result.get('error', 'Unknown error')}")
# Exit with error code if any failures
sys.exit(1)
print("\nAll done! 🎉")
if __name__ == "__main__":
main()