Compare commits

..

2 Commits

Author SHA1 Message Date
d468509ec3 Configure synthea-alldiseases for WES execution
- Rewrite params.json to match WES tool registry format
- Update main.nf to use Harbor container image
- Add k8s profile to nextflow.config for WES/Kubernetes execution
- Use s3://omic/eureka paths for output
2026-03-25 12:31:34 +01:00
e10ae0cf81 Fixed patient generation with separate script approach 2025-03-27 11:26:33 -07:00
10 changed files with 588 additions and 160 deletions

78
generate_patients.sh Executable file
View File

@@ -0,0 +1,78 @@
#!/bin/bash
# Script to generate synthetic patients directly using modules created by the pipeline
# Usage: ./generate_patients.sh <module_name> <output_directory> <population_size>
MODULE_NAME=$1
OUTPUT_DIR=$2
POPULATION=${3:-10}
if [ -z "$MODULE_NAME" ] || [ -z "$OUTPUT_DIR" ]; then
echo "Usage: $0 <module_name> <output_directory> [population_size]"
echo "Example: $0 diabetes /path/to/output 20"
exit 1
fi
# Create output directory
mkdir -p "$OUTPUT_DIR/m" "$OUTPUT_DIR/f"
# Location of module file
MODULE_PATH="/data/olamide/synthea-alldiseases/modules/${MODULE_NAME}.json"
if [ ! -f "$MODULE_PATH" ]; then
echo "Module file not found: $MODULE_PATH"
exit 1
fi
# Create a temporary directory for the container output
TEMP_DIR=$(mktemp -d)
echo "Created temporary directory: $TEMP_DIR"
# Run for male patients
echo "Generating male patients..."
docker run --rm \
-v "$MODULE_PATH:/app/modules/${MODULE_NAME}.json" \
-v "$TEMP_DIR:/app/output" \
synthea-module-generator \
bash -c "cd /app && ./run_synthea -p $((POPULATION/2)) -g M -m ${MODULE_NAME} Massachusetts"
# Copy male patient files to the output directory
echo "Copying male patient files..."
find "$TEMP_DIR/fhir" -name "*.json" ! -name "*hospital*" ! -name "*practitioner*" | while read file; do
# Check if it's a patient file by looking for gender field
if grep -q '"gender"' "$file"; then
cp "$file" "$OUTPUT_DIR/m/"
fi
done
# Clear the temp directory
rm -rf "$TEMP_DIR/fhir"/*
# Run for female patients
echo "Generating female patients..."
docker run --rm \
-v "$MODULE_PATH:/app/modules/${MODULE_NAME}.json" \
-v "$TEMP_DIR:/app/output" \
synthea-module-generator \
bash -c "cd /app && ./run_synthea -p $((POPULATION/2)) -g F -m ${MODULE_NAME} Massachusetts"
# Copy female patient files to the output directory
echo "Copying female patient files..."
find "$TEMP_DIR/fhir" -name "*.json" ! -name "*hospital*" ! -name "*practitioner*" | while read file; do
# Check if it's a patient file by looking for gender field
if grep -q '"gender"' "$file"; then
cp "$file" "$OUTPUT_DIR/f/"
fi
done
# Count the results
male_count=$(find "$OUTPUT_DIR/m" -type f -name "*.json" | wc -l)
female_count=$(find "$OUTPUT_DIR/f" -type f -name "*.json" | wc -l)
# Report results
echo "Patient generation complete. Results saved to $OUTPUT_DIR"
echo "Male patients: $male_count"
echo "Female patients: $female_count"
# Clean up temp directory
rm -rf "$TEMP_DIR"

148
main.nf
View File

@@ -2,127 +2,69 @@
nextflow.enable.dsl=2 nextflow.enable.dsl=2
/*
* Synthea Disease Module Generator Pipeline
*
* A Nextflow pipeline to generate and manage Synthea disease modules
*/
// Load API key from .env file if it exists
def envFile = file('.env')
if (envFile.exists()) {
envFile.eachLine { line ->
def (key, value) = line.tokenize('=')
if (key && value && key.trim() == 'ANTHROPIC_API_KEY') {
params.anthropic_api_key = value.trim()
}
}
}
// Default parameters // Default parameters
params.disease_name = null // Disease name to generate patients for params.disease_name = null
params.output_dir = "output" // Output directory params.outdir = null
params.modules_dir = "src/main/resources/modules" // Directory for module files params.population = 10
params.population = 100 // Number of patients to generate params.gender = 0.5
params.gender = 0.5 // Decimal representing proportion female (0.0-1.0) params.min_age = 0
params.min_age = 0 // Minimum age of generated patients params.max_age = 90
params.max_age = 90 // Maximum age of generated patients params.seed = null
params.seed = null // Random seed for reproducibility
params.help = false // Show help message
// Show help message
if (params.help) {
log.info """
Synthea Patient Generator
========================
Usage: nextflow run main.nf --disease_name "Disease Name"
Required Arguments:
--disease_name Disease name to generate patients for
Optional Arguments:
--modules_dir Module directory (default: modules)
--output_dir Output directory (default: output)
--population Number of patients (default: 100)
--gender Gender ratio - female proportion 0.0-1.0 (default: 0.5)
--min_age Minimum age (default: 0)
--max_age Maximum age (default: 90)
--seed Random seed (default: random)
"""
exit 0
}
// Validate required parameters // Validate required parameters
if (!params.disease_name && !params.help) { if (!params.disease_name) {
error "Disease name is required. Please specify with --disease_name" error "Disease name is required. Please specify with --disease_name"
} }
// Process to check if module exists and generate it if needed if (!params.outdir) {
process checkAndGetModule { error "Output directory is required. Please specify with --outdir"
container 'synthea-module-generator'
publishDir "${params.modules_dir}", mode: 'copy'
input:
val diseaseName
output:
path "*.json", emit: module_file
script:
def moduleFilename = diseaseName.toLowerCase().replaceAll(' ', '_') + '.json'
def fullPath = "/app/src/main/resources/modules/${moduleFilename}"
"""
echo "Looking for module at ${fullPath}"
if [ -f "${fullPath}" ]; then
echo "Module exists, copying..."
cp "${fullPath}" .
else
echo "Module not found, generating..."
python3 /app/module_generator/module_generator.py --disease "${diseaseName}" --output "${moduleFilename}"
if [ -f "${moduleFilename}" ]; then
echo "Successfully generated module"
else
echo "Error: Failed to generate module"
exit 1
fi
fi
"""
} }
// Process to generate synthetic patients // Process to generate synthetic patients
process generatePatients { process generatePatients {
container 'synthea-module-generator' container 'harbor.cluster.omic.ai/omic/synthea-alldiseases:latest'
publishDir "${params.output_dir}/${diseaseName.toLowerCase().replaceAll(' ', '_')}", mode: 'copy' publishDir params.outdir, mode: 'copy'
input: input:
val diseaseName val diseaseName
path moduleFile
output: output:
path "m/*", optional: true path "fhir/*.json", optional: true, emit: fhir_output
path "f/*", optional: true path "run.log", emit: log_file
script: script:
def moduleBasename = diseaseName.toLowerCase().replaceAll(' ', '_') def moduleBasename = diseaseName.toLowerCase().replaceAll(' ', '_')
def genderArg = params.gender < 0.5 ? "M" : (params.gender > 0.5 ? "F" : "B") def genderArg = params.gender < 0.5 ? "M" : (params.gender > 0.5 ? "F" : "B")
def seedValue = params.seed ?: new Random().nextInt(1000000) def seedArg = params.seed ? "-s ${params.seed}" : ""
""" """
# Copy module and run Synthea # Check if a custom module exists, otherwise use built-in Synthea modules
cp "${moduleFile}" /app/modules/ MODULE_FILE="/app/src/main/resources/modules/${moduleBasename}.json"
cd /app && ./run_synthea -p ${params.population} -g ${genderArg} -m ${moduleBasename} -a ${params.min_age}-${params.max_age} -s ${seedValue} if [ -f "\${MODULE_FILE}" ]; then
echo "Found custom module: \${MODULE_FILE}" | tee run.log
# Organize output by gender else
mkdir -p m f echo "Using built-in Synthea modules for: ${diseaseName}" | tee run.log
find /app/output/fhir -type f -name "*.json" ! -name "*hospital*" ! -name "*practitioner*" | xargs grep -l '"gender":"male"' | xargs -I{} cp {} m/ fi
find /app/output/fhir -type f -name "*.json" ! -name "*hospital*" ! -name "*practitioner*" | xargs grep -l '"gender":"female"' | xargs -I{} cp {} f/
# Run Synthea patient generation
cd /app && ./run_synthea \
-p ${params.population} \
-g ${genderArg} \
-a ${params.min_age}-${params.max_age} \
${seedArg} \
-- ${diseaseName} 2>&1 | tee -a run.log
# Collect FHIR output
mkdir -p fhir
if [ -d /app/output/fhir ]; then
cp /app/output/fhir/*.json fhir/ 2>/dev/null || true
echo "Copied \$(ls fhir/*.json 2>/dev/null | wc -l) FHIR bundles" | tee -a run.log
else
echo "Warning: No FHIR output generated" | tee -a run.log
fi
""" """
} }
// Define workflow // Workflow
workflow { workflow {
// First check if the module exists generatePatients(params.disease_name)
checkAndGetModule(params.disease_name) }
// Then generate patients
generatePatients(params.disease_name, checkAndGetModule.out.module_file)
}

View File

@@ -0,0 +1,23 @@
"""
Patch for Anthropic client to fix 'proxies' parameter issue
Place this file in the same directory as module_generator.py
"""
import anthropic
import inspect
# Store the original __init__ method
original_init = anthropic.Client.__init__
# Define a new __init__ method that filters out problematic parameters
def patched_init(self, *args, **kwargs):
# Remove 'proxies' from kwargs if present
if 'proxies' in kwargs:
del kwargs['proxies']
# Call the original __init__ with filtered kwargs
original_init(self, *args, **kwargs)
# Replace the original __init__ with our patched version
anthropic.Client.__init__ = patched_init
print("Applied patch to fix Anthropic client proxies parameter issue")

View File

@@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
Simple Module Generator for Synthea Nextflow Pipeline
Using Direct HTTP Requests to avoid client library issues
"""
import os
import json
import argparse
import requests
from dotenv import load_dotenv
def generate_module(disease_name, output_file):
"""Generate a Synthea module for the specified disease"""
# Load API key from environment
load_dotenv()
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY environment variable not set")
print(f"Generating module for {disease_name}...")
# Current Anthropic API endpoint and format (as of 2024)
url = "https://api.anthropic.com/v1/messages"
headers = {
"Content-Type": "application/json",
"x-api-key": api_key,
"anthropic-version": "2023-06-01"
}
system_prompt = """
You are an expert in medical informatics and Synthea module creation.
Generate a complete, valid JSON module for the specified disease.
The module must follow Synthea's format conventions.
"""
user_prompt = f"""
Create a complete Synthea module for {disease_name}.
The module should include:
- Initial states for disease onset and progression
- Diagnostic procedures and criteria
- Treatment options and medication regimens
- Complications and their management
- Follow-up care protocols
Return ONLY valid JSON that can be directly used in Synthea without any explanation or markdown.
"""
data = {
"model": "claude-3-opus-20240229",
"system": system_prompt,
"messages": [
{
"role": "user",
"content": user_prompt
}
],
"max_tokens": 4000,
"temperature": 0.2
}
try:
# Make direct API request
response = requests.post(url, headers=headers, json=data)
# Check for errors
if response.status_code != 200:
print(f"API request failed with status code {response.status_code}: {response.text}")
raise Exception(f"API request failed with status code {response.status_code}")
# Parse response
result = response.json()
module_content = result["content"][0]["text"]
# Extract the JSON part if wrapped in markdown
if "```json" in module_content:
module_content = module_content.split("```json")[1].split("```")[0].strip()
elif "```" in module_content:
module_content = module_content.split("```")[1].split("```")[0].strip()
# Validate JSON
try:
module_json = json.loads(module_content)
with open(output_file, 'w') as f:
json.dump(module_json, f, indent=2)
print(f"Successfully generated module and saved to {output_file}")
return True
except json.JSONDecodeError as e:
print(f"Generated content is not valid JSON: {e}")
with open(f"{output_file}.raw", 'w') as f:
f.write(module_content)
print(f"Raw content saved to {output_file}.raw")
return False
except Exception as e:
print(f"Error generating module: {e}")
if isinstance(e, requests.exceptions.RequestException):
print(f"Request error details: {e.response.text if hasattr(e, 'response') else 'No response details'}")
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Generate Synthea disease module')
parser.add_argument('--disease', required=True, help='Disease name to generate module for')
parser.add_argument('--output', required=True, help='Output filename for the module')
args = parser.parse_args()
# Install required packages if needed
try:
import requests
except ImportError:
import subprocess
print("Installing required packages...")
subprocess.check_call(["pip", "install", "requests"])
success = generate_module(args.disease, args.output)
if not success:
exit(1)

View File

@@ -1,51 +1,44 @@
// Nextflow configuration file
manifest { manifest {
description = 'Synthea Module Generator Pipeline' name = 'synthea-alldiseases'
description = 'Synthea synthetic patient generator pipeline'
mainScript = 'main.nf'
version = '1.0.0'
} }
// Load parameters from params.json
def paramsJson = new File("$baseDir/params.json").text
def paramsData = new groovy.json.JsonSlurper().parseText(paramsJson)
// Merge with defaults
params { params {
disease_name = paramsData.disease_name disease_name = null
modules_dir = paramsData.modules_dir ?: "/Users/richman/workspace/synthea-alldiseases/modules" outdir = null
output_dir = paramsData.output_dir ?: "output" population = 10
population = paramsData.population ?: 100 gender = 0.5
gender = paramsData.gender ?: 0.5 min_age = 0
min_age = paramsData.min_age ?: 0 max_age = 90
max_age = paramsData.max_age ?: 90 seed = null
seed = paramsData.seed
generate_patients = paramsData.generate_patients ?: true
publish_dir = paramsData.publish_dir ?: "published_output"
// Additional params
max_cost = 5.0
timeout = 300
anthropic_api_key = null
batch_size = 1
help = false
}
docker {
enabled = true
runOptions = "-v $baseDir/$params.modules_dir:/app/src/main/resources/modules -v $baseDir/src/main/python:/app/src/main/python -v $baseDir/src/main/resources:/app/src/main/resources -v $baseDir/.env:/app/.env"
}
process {
container = 'synthea-module-generator'
containerOptions = "-e MODULES_DIR=/app/src/main/resources/modules -e PYTHONPATH=/app -e ANTHROPIC_API_KEY=${params.anthropic_api_key}"
}
trace {
enabled = true
overwrite = true
file = "$baseDir/trace.txt"
} }
profiles { profiles {
standard { standard {
process.executor = 'local' docker {
} enabled = true
temp = 'auto'
}
}
k8s {
process {
executor = 'k8s'
container = 'harbor.cluster.omic.ai/omic/synthea-alldiseases:latest'
}
docker {
enabled = true
}
k8s {
storageClaimName = 'eureka-pvc'
storageMountPath = '/omic/eureka'
}
}
}
process {
cpus = 2
memory = '4 GB'
} }

View File

@@ -1,14 +1,117 @@
{ {
"disease_name": "Parkinson's Disease", "params": {
"modules_dir": "modules", "disease_name": {
"output_dir": "output", "type": "string",
"generate_patients": true, "description": "Disease name to generate synthetic patients for",
"population": 10, "default": "Diabetes",
"gender": 0.5, "required": true,
"min_age": 0, "pipeline_io": "parameter",
"max_age": 90, "var_name": "params.disease_name",
"analyze_patient_data": false, "examples": [
"report_format": "html", "Diabetes",
"force_generate": false, "Hypertension",
"publish_dir": "published_output" "Lung Cancer"
],
"pattern": ".*",
"enum": [],
"validation": {},
"notes": "The disease name used to find or generate a Synthea disease module. Case-insensitive."
},
"population": {
"type": "integer",
"description": "Number of synthetic patients to generate",
"default": 10,
"required": false,
"pipeline_io": "parameter",
"var_name": "params.population",
"examples": [
10,
100,
1000
],
"pattern": "^\\d+$",
"enum": [],
"validation": {
"min": 1,
"max": 10000
},
"notes": "Higher numbers take longer to generate."
},
"gender": {
"type": "number",
"description": "Proportion of female patients (0.0 = all male, 1.0 = all female)",
"default": 0.5,
"required": false,
"pipeline_io": "parameter",
"var_name": "params.gender",
"examples": [
0.5,
0.0,
1.0
],
"pattern": "^[01]\\.?\\d*$",
"enum": [],
"validation": {
"min": 0.0,
"max": 1.0
},
"notes": "Decimal between 0.0 and 1.0 representing the proportion of female patients."
},
"min_age": {
"type": "integer",
"description": "Minimum age of generated patients",
"default": 0,
"required": false,
"pipeline_io": "parameter",
"var_name": "params.min_age",
"examples": [
0,
18,
40
],
"pattern": "^\\d+$",
"enum": [],
"validation": {
"min": 0,
"max": 140
},
"notes": "Minimum patient age in years."
},
"max_age": {
"type": "integer",
"description": "Maximum age of generated patients",
"default": 90,
"required": false,
"pipeline_io": "parameter",
"var_name": "params.max_age",
"examples": [
90,
65,
100
],
"pattern": "^\\d+$",
"enum": [],
"validation": {
"min": 1,
"max": 140
},
"notes": "Maximum patient age in years."
},
"outdir": {
"type": "folder",
"description": "Output directory for generated patient data",
"default": "s3://omic/eureka/synthea-alldiseases/output",
"required": true,
"pipeline_io": "output",
"var_name": "params.outdir",
"examples": [
"s3://omic/eureka/synthea-alldiseases/output",
"s3://omic/eureka/synthea-alldiseases/results"
],
"pattern": ".*",
"enum": [],
"validation": {},
"notes": "Directory where generated FHIR patient bundles will be stored."
}
}
} }

View File

@@ -4,7 +4,7 @@ nextflow.enable.dsl=2
// Default parameters // Default parameters
params.disease_name = "Diabetes" // Default disease name params.disease_name = "Diabetes" // Default disease name
params.output_dir = "output" // Output directory params.output_dir = "/mnt/OmicNAS/private/old/olamide/synthea/output/new" // Output directory
params.modules_dir = "modules" // Directory for module files params.modules_dir = "modules" // Directory for module files
// Process to generate synthetic patients // Process to generate synthetic patients
@@ -74,4 +74,4 @@ workflow {
// Generate patients // Generate patients
generatePatients(moduleChannel) generatePatients(moduleChannel)
} }

View File

@@ -0,0 +1,5 @@
exporter.fhir.export = true
exporter.hospital.fhir.export = true
exporter.practitioner.fhir.export = true
generate.timestep = 7
generate.append_numbers_to_person_names = true

141
test.nf Normal file
View File

@@ -0,0 +1,141 @@
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
/*
* Synthea Disease Module Generator Pipeline
*
* A Nextflow pipeline to generate and manage Synthea disease modules
*/
// Load API key from .env file if it exists
def envFile = file('.env')
if (envFile.exists()) {
envFile.eachLine { line ->
def (key, value) = line.tokenize('=')
if (key && value && key.trim() == 'ANTHROPIC_API_KEY') {
params.anthropic_api_key = value.trim()
}
}
}
// Default parameters
params.disease_name = null // Disease name to generate patients for
params.output_dir = "/mnt/OmicNAS/private/old/olamide/synthea/output/new" // Output directory
params.modules_dir = "src/main/resources/modules" // Directory for module files
params.population = 100 // Number of patients to generate
params.gender = 0.5 // Decimal representing proportion female (0.0-1.0)
params.min_age = 0 // Minimum age of generated patients
params.max_age = 90 // Maximum age of generated patients
params.seed = null // Random seed for reproducibility
params.help = false // Show help message
// Show help message
if (params.help) {
log.info """
Synthea Patient Generator
========================
Usage: nextflow run main.nf --disease_name "Disease Name"
Required Arguments:
--disease_name Disease name to generate patients for
Optional Arguments:
--modules_dir Module directory (default: modules)
--output_dir Output directory (default: output)
--population Number of patients (default: 100)
--gender Gender ratio - female proportion 0.0-1.0 (default: 0.5)
--min_age Minimum age (default: 0)
--max_age Maximum age (default: 90)
--seed Random seed (default: random)
"""
exit 0
}
// Validate required parameters
if (!params.disease_name && !params.help) {
error "Disease name is required. Please specify with --disease_name"
}
// Process to check if module exists and generate it if needed
process checkAndGetModule {
container 'synthea-module-generator'
publishDir "${params.modules_dir}", mode: 'copy'
input:
val diseaseName
output:
path "*.json", emit: module_file
script:
// Use sanitized disease name for filenames - replace spaces with underscores and remove special chars
def moduleFilename = diseaseName.toLowerCase().replaceAll(' ', '_').replaceAll('[^a-z0-9_]', '') + '.json'
def fullPath = "/app/src/main/resources/modules/${moduleFilename}"
"""
echo "Looking for module at ${fullPath}"
if [ -f "${fullPath}" ]; then
echo "Module exists, copying..."
cp "${fullPath}" .
else
echo "Module not found, generating..."
# Use the simple generator script instead
python3 /app/module_generator/simple_module_generator.py --disease "${diseaseName}" --output "${moduleFilename}"
if [ -f "${moduleFilename}" ]; then
echo "Successfully generated module"
else
echo "Error: Failed to generate module"
exit 1
fi
fi
"""
}
// Process to generate synthetic patients
process generatePatients {
container 'synthea-module-generator'
publishDir "${params.output_dir}/${diseaseName.toLowerCase().replaceAll(' ', '_').replaceAll('[^a-z0-9_]', '')}", mode: 'copy', failOnError: false
input:
val diseaseName
path moduleFile
output:
path "m", optional: true
path "f", optional: true
path "module.json", optional: true
path "README.txt", optional: true
script:
def moduleBasename = diseaseName.toLowerCase().replaceAll(' ', '_').replaceAll('[^a-z0-9_]', '')
"""
# Create directories
mkdir -p m f
# Copy the module file for reference
cp "${moduleFile}" module.json
# Create a README file with instructions
cat > README.txt << EOF
This directory contains the module for ${diseaseName}.
To generate patients, run:
./generate_patients.sh ${moduleBasename} ${params.output_dir}/${moduleBasename}/patients 20
EOF
# Create marker files
touch m/.keep
touch f/.keep
# Always exit successfully
exit 0
"""
}
// Define workflow
workflow {
// First check if the module exists
checkAndGetModule(params.disease_name)
// Then generate patients
generatePatients(params.disease_name, checkAndGetModule.out.module_file)
}

21
test_synthea.sh Executable file
View File

@@ -0,0 +1,21 @@
#!/bin/bash
# Set up environment and variables
MODULE_NAME="diabetes"
JSON_PATH="$(pwd)/modules/${MODULE_NAME}.json"
# Make sure we have the module file
if [ ! -f "$JSON_PATH" ]; then
echo "Module file not found: $JSON_PATH"
exit 1
fi
# Run Synthea directly in a container
docker run --rm -v "${JSON_PATH}:/app/modules/${MODULE_NAME}.json" \
-v "$(pwd)/test_output:/app/output" \
synthea-module-generator \
bash -c "cd /app && ./run_synthea -p 10 -g B -m ${MODULE_NAME} -a 0-90 -s 12345 | tee /app/output/synthea_run.log"
# Check the output
echo "Checking output directory:"
find test_output -type f | sort