Compare commits

...

7 Commits

Author SHA1 Message Date
67bd6692b0 Clean up pipeline files
- Remove unused variable and redundant comments/echo statements in main.nf
- Remove obsolete files: simple.nf, test.nf, generate_patients.sh,
  test_synthea.sh, trace.txt, docker-compose.yml
  (all referenced local-only synthea-module-generator image)
2026-03-25 15:09:07 +01:00
f8df39d9af Fix script to write outputs to Nextflow work dir
- Save pwd before cd /app so outputs go to correct location
- Use set +e and PIPESTATUS to handle java exit code gracefully
- Exit 0 if FHIR files were generated successfully
2026-03-25 14:51:09 +01:00
02d93f9360 Fix image pull and gender arg for WES
- Push as Docker V2 manifest (no OCI index/attestation) so K8s can pull
- Tag as v3 to avoid cached image issues
- Fix gender: omit -g flag for 0.5 (both), Synthea only accepts M or F
2026-03-25 14:29:59 +01:00
f29323323b Use pre-built jar and v2 tag to bypass Gradle permission issue
- Call java -jar synthea-with-dependencies.jar directly instead of
  run_synthea which invokes Gradle at runtime
- Tag image as v2 to force K8s to pull fresh image
2026-03-25 13:30:24 +01:00
a871107728 Fix Dockerfile permissions for K8s execution
Make .gradle, output, build, and modules directories writable (chmod 777)
so the container works when K8s runs it as a non-root user.
2026-03-25 12:57:45 +01:00
d468509ec3 Configure synthea-alldiseases for WES execution
- Rewrite params.json to match WES tool registry format
- Update main.nf to use Harbor container image
- Add k8s profile to nextflow.config for WES/Kubernetes execution
- Use s3://omic/eureka paths for output
2026-03-25 12:31:34 +01:00
e10ae0cf81 Fixed patient generation with separate script approach 2025-03-27 11:26:33 -07:00
10 changed files with 350 additions and 302 deletions

View File

@@ -50,21 +50,12 @@ RUN mkdir -p /app/modules
# Test a simple module generation to ensure Synthea works # Test a simple module generation to ensure Synthea works
RUN ./run_synthea -p 1 -m hypertension RUN ./run_synthea -p 1 -m hypertension
# Set up a symlink from mounted modules to Synthea modules directory # Make directories writable for K8s (may run as non-root)
RUN echo '#!/bin/sh\n\ RUN chmod -R 777 /app/.gradle /app/output /app/build /app/modules \
# Update modules symlinks\n\ /app/src/main/resources/modules
# Load environment variables\n\
if [ -f /app/.env ]; then\n\
export $(grep -v "^#" /app/.env | xargs)\n\
fi\n\
\n\
exec "$@"' > /app/entrypoint.sh && chmod +x /app/entrypoint.sh
# Set PYTHONPATH to ensure modules can be found # Set PYTHONPATH to ensure modules can be found
ENV PYTHONPATH="/app" ENV PYTHONPATH="/app"
# Set entrypoint to use our script
ENTRYPOINT ["/app/entrypoint.sh"]
# Default command when container runs # Default command when container runs
CMD ["tail", "-f", "/dev/null"] CMD ["tail", "-f", "/dev/null"]

View File

@@ -1,47 +0,0 @@
version: '3.8'
services:
synthea:
build:
context: .
dockerfile: Dockerfile
volumes:
- ./modules:/app/modules:ro # Mount modules directory read-only
- ./output:/app/output # Mount output directory for patient data
- ./.env:/app/.env:ro # Mount environment variables file
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
working_dir: /app
command: tail -f /dev/null # Keep container running
healthcheck:
test: ["CMD", "/app/healthcheck.sh"]
interval: 30s
timeout: 10s
retries: 3
start_period: 5s
restart: unless-stopped
ports:
- "8080:8080" # Only needed if you want to access the Synthea web interface
module-generator:
build:
context: .
dockerfile: Dockerfile
volumes:
- ./modules:/app/modules # Mount modules directory for writing
- ./module_generator:/app/module_generator
- ./src:/app/src
- ./scripts:/app/scripts
- ./.env:/app/.env:ro # Mount environment variables file
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
working_dir: /app
command: python3 /app/module_generator/run_module_generator.py --batch-size 5 --max-modules 10 --prioritize
depends_on:
- synthea
profiles:
- generator # This service won't start by default, only when explicitly requested
volumes:
synthea-output:
driver: local

143
main.nf
View File

@@ -2,127 +2,64 @@
nextflow.enable.dsl=2 nextflow.enable.dsl=2
/* params.disease_name = null
* Synthea Disease Module Generator Pipeline params.outdir = null
* params.population = 10
* A Nextflow pipeline to generate and manage Synthea disease modules params.gender = 0.5
*/ params.min_age = 0
params.max_age = 90
params.seed = null
// Load API key from .env file if it exists if (!params.disease_name) {
def envFile = file('.env')
if (envFile.exists()) {
envFile.eachLine { line ->
def (key, value) = line.tokenize('=')
if (key && value && key.trim() == 'ANTHROPIC_API_KEY') {
params.anthropic_api_key = value.trim()
}
}
}
// Default parameters
params.disease_name = null // Disease name to generate patients for
params.output_dir = "output" // Output directory
params.modules_dir = "src/main/resources/modules" // Directory for module files
params.population = 100 // Number of patients to generate
params.gender = 0.5 // Decimal representing proportion female (0.0-1.0)
params.min_age = 0 // Minimum age of generated patients
params.max_age = 90 // Maximum age of generated patients
params.seed = null // Random seed for reproducibility
params.help = false // Show help message
// Show help message
if (params.help) {
log.info """
Synthea Patient Generator
========================
Usage: nextflow run main.nf --disease_name "Disease Name"
Required Arguments:
--disease_name Disease name to generate patients for
Optional Arguments:
--modules_dir Module directory (default: modules)
--output_dir Output directory (default: output)
--population Number of patients (default: 100)
--gender Gender ratio - female proportion 0.0-1.0 (default: 0.5)
--min_age Minimum age (default: 0)
--max_age Maximum age (default: 90)
--seed Random seed (default: random)
"""
exit 0
}
// Validate required parameters
if (!params.disease_name && !params.help) {
error "Disease name is required. Please specify with --disease_name" error "Disease name is required. Please specify with --disease_name"
} }
// Process to check if module exists and generate it if needed if (!params.outdir) {
process checkAndGetModule { error "Output directory is required. Please specify with --outdir"
container 'synthea-module-generator'
publishDir "${params.modules_dir}", mode: 'copy'
input:
val diseaseName
output:
path "*.json", emit: module_file
script:
def moduleFilename = diseaseName.toLowerCase().replaceAll(' ', '_') + '.json'
def fullPath = "/app/src/main/resources/modules/${moduleFilename}"
"""
echo "Looking for module at ${fullPath}"
if [ -f "${fullPath}" ]; then
echo "Module exists, copying..."
cp "${fullPath}" .
else
echo "Module not found, generating..."
python3 /app/module_generator/module_generator.py --disease "${diseaseName}" --output "${moduleFilename}"
if [ -f "${moduleFilename}" ]; then
echo "Successfully generated module"
else
echo "Error: Failed to generate module"
exit 1
fi
fi
"""
} }
// Process to generate synthetic patients
process generatePatients { process generatePatients {
container 'synthea-module-generator' container 'harbor.cluster.omic.ai/omic/synthea-alldiseases:v3'
publishDir "${params.output_dir}/${diseaseName.toLowerCase().replaceAll(' ', '_')}", mode: 'copy' publishDir params.outdir, mode: 'copy'
input: input:
val diseaseName val diseaseName
path moduleFile
output: output:
path "m/*", optional: true path "fhir/*.json", optional: true, emit: fhir_output
path "f/*", optional: true path "run.log", emit: log_file
script: script:
def moduleBasename = diseaseName.toLowerCase().replaceAll(' ', '_') def genderArg = params.gender < 0.5 ? "-g M" : (params.gender > 0.5 ? "-g F" : "")
def genderArg = params.gender < 0.5 ? "M" : (params.gender > 0.5 ? "F" : "B") def seedArg = params.seed ? "-s ${params.seed}" : ""
def seedValue = params.seed ?: new Random().nextInt(1000000)
""" """
# Copy module and run Synthea set +e
cp "${moduleFile}" /app/modules/ WORKDIR=\$(pwd)
cd /app && ./run_synthea -p ${params.population} -g ${genderArg} -m ${moduleBasename} -a ${params.min_age}-${params.max_age} -s ${seedValue}
# Organize output by gender # Run Synthea via pre-built jar (Gradle is not writable in K8s)
mkdir -p m f cd /app
find /app/output/fhir -type f -name "*.json" ! -name "*hospital*" ! -name "*practitioner*" | xargs grep -l '"gender":"male"' | xargs -I{} cp {} m/ java -jar /app/build/libs/synthea-with-dependencies.jar \
find /app/output/fhir -type f -name "*.json" ! -name "*hospital*" ! -name "*practitioner*" | xargs grep -l '"gender":"female"' | xargs -I{} cp {} f/ -p ${params.population} \
${genderArg} \
-a ${params.min_age}-${params.max_age} \
${seedArg} 2>&1 | tee \${WORKDIR}/run.log
JAVA_EXIT=\${PIPESTATUS[0]}
cd \${WORKDIR}
mkdir -p fhir
if [ -d /app/output/fhir ]; then
cp /app/output/fhir/*.json fhir/ 2>/dev/null || true
fi
# Succeed if FHIR output was produced
if [ -n "\$(ls fhir/*.json 2>/dev/null)" ]; then
exit 0
else
exit \${JAVA_EXIT}
fi
""" """
} }
// Define workflow
workflow { workflow {
// First check if the module exists generatePatients(params.disease_name)
checkAndGetModule(params.disease_name)
// Then generate patients
generatePatients(params.disease_name, checkAndGetModule.out.module_file)
} }

View File

@@ -0,0 +1,23 @@
"""
Patch for Anthropic client to fix 'proxies' parameter issue
Place this file in the same directory as module_generator.py
"""
import anthropic
import inspect
# Store the original __init__ method
original_init = anthropic.Client.__init__
# Define a new __init__ method that filters out problematic parameters
def patched_init(self, *args, **kwargs):
# Remove 'proxies' from kwargs if present
if 'proxies' in kwargs:
del kwargs['proxies']
# Call the original __init__ with filtered kwargs
original_init(self, *args, **kwargs)
# Replace the original __init__ with our patched version
anthropic.Client.__init__ = patched_init
print("Applied patch to fix Anthropic client proxies parameter issue")

View File

@@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
Simple Module Generator for Synthea Nextflow Pipeline
Using Direct HTTP Requests to avoid client library issues
"""
import os
import json
import argparse
import requests
from dotenv import load_dotenv
def generate_module(disease_name, output_file):
"""Generate a Synthea module for the specified disease"""
# Load API key from environment
load_dotenv()
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY environment variable not set")
print(f"Generating module for {disease_name}...")
# Current Anthropic API endpoint and format (as of 2024)
url = "https://api.anthropic.com/v1/messages"
headers = {
"Content-Type": "application/json",
"x-api-key": api_key,
"anthropic-version": "2023-06-01"
}
system_prompt = """
You are an expert in medical informatics and Synthea module creation.
Generate a complete, valid JSON module for the specified disease.
The module must follow Synthea's format conventions.
"""
user_prompt = f"""
Create a complete Synthea module for {disease_name}.
The module should include:
- Initial states for disease onset and progression
- Diagnostic procedures and criteria
- Treatment options and medication regimens
- Complications and their management
- Follow-up care protocols
Return ONLY valid JSON that can be directly used in Synthea without any explanation or markdown.
"""
data = {
"model": "claude-3-opus-20240229",
"system": system_prompt,
"messages": [
{
"role": "user",
"content": user_prompt
}
],
"max_tokens": 4000,
"temperature": 0.2
}
try:
# Make direct API request
response = requests.post(url, headers=headers, json=data)
# Check for errors
if response.status_code != 200:
print(f"API request failed with status code {response.status_code}: {response.text}")
raise Exception(f"API request failed with status code {response.status_code}")
# Parse response
result = response.json()
module_content = result["content"][0]["text"]
# Extract the JSON part if wrapped in markdown
if "```json" in module_content:
module_content = module_content.split("```json")[1].split("```")[0].strip()
elif "```" in module_content:
module_content = module_content.split("```")[1].split("```")[0].strip()
# Validate JSON
try:
module_json = json.loads(module_content)
with open(output_file, 'w') as f:
json.dump(module_json, f, indent=2)
print(f"Successfully generated module and saved to {output_file}")
return True
except json.JSONDecodeError as e:
print(f"Generated content is not valid JSON: {e}")
with open(f"{output_file}.raw", 'w') as f:
f.write(module_content)
print(f"Raw content saved to {output_file}.raw")
return False
except Exception as e:
print(f"Error generating module: {e}")
if isinstance(e, requests.exceptions.RequestException):
print(f"Request error details: {e.response.text if hasattr(e, 'response') else 'No response details'}")
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Generate Synthea disease module')
parser.add_argument('--disease', required=True, help='Disease name to generate module for')
parser.add_argument('--output', required=True, help='Output filename for the module')
args = parser.parse_args()
# Install required packages if needed
try:
import requests
except ImportError:
import subprocess
print("Installing required packages...")
subprocess.check_call(["pip", "install", "requests"])
success = generate_module(args.disease, args.output)
if not success:
exit(1)

View File

@@ -1,51 +1,44 @@
// Nextflow configuration file
manifest { manifest {
description = 'Synthea Module Generator Pipeline' name = 'synthea-alldiseases'
description = 'Synthea synthetic patient generator pipeline'
mainScript = 'main.nf'
version = '1.0.0'
} }
// Load parameters from params.json
def paramsJson = new File("$baseDir/params.json").text
def paramsData = new groovy.json.JsonSlurper().parseText(paramsJson)
// Merge with defaults
params { params {
disease_name = paramsData.disease_name disease_name = null
modules_dir = paramsData.modules_dir ?: "/Users/richman/workspace/synthea-alldiseases/modules" outdir = null
output_dir = paramsData.output_dir ?: "output" population = 10
population = paramsData.population ?: 100 gender = 0.5
gender = paramsData.gender ?: 0.5 min_age = 0
min_age = paramsData.min_age ?: 0 max_age = 90
max_age = paramsData.max_age ?: 90 seed = null
seed = paramsData.seed
generate_patients = paramsData.generate_patients ?: true
publish_dir = paramsData.publish_dir ?: "published_output"
// Additional params
max_cost = 5.0
timeout = 300
anthropic_api_key = null
batch_size = 1
help = false
}
docker {
enabled = true
runOptions = "-v $baseDir/$params.modules_dir:/app/src/main/resources/modules -v $baseDir/src/main/python:/app/src/main/python -v $baseDir/src/main/resources:/app/src/main/resources -v $baseDir/.env:/app/.env"
}
process {
container = 'synthea-module-generator'
containerOptions = "-e MODULES_DIR=/app/src/main/resources/modules -e PYTHONPATH=/app -e ANTHROPIC_API_KEY=${params.anthropic_api_key}"
}
trace {
enabled = true
overwrite = true
file = "$baseDir/trace.txt"
} }
profiles { profiles {
standard { standard {
process.executor = 'local' docker {
enabled = true
temp = 'auto'
}
}
k8s {
process {
executor = 'k8s'
container = 'harbor.cluster.omic.ai/omic/synthea-alldiseases:v3'
}
docker {
enabled = true
}
k8s {
storageClaimName = 'eureka-pvc'
storageMountPath = '/omic/eureka'
}
} }
} }
process {
cpus = 2
memory = '4 GB'
}

View File

@@ -1,14 +1,117 @@
{ {
"disease_name": "Parkinson's Disease", "params": {
"modules_dir": "modules", "disease_name": {
"output_dir": "output", "type": "string",
"generate_patients": true, "description": "Disease name to generate synthetic patients for",
"population": 10, "default": "Diabetes",
"gender": 0.5, "required": true,
"min_age": 0, "pipeline_io": "parameter",
"max_age": 90, "var_name": "params.disease_name",
"analyze_patient_data": false, "examples": [
"report_format": "html", "Diabetes",
"force_generate": false, "Hypertension",
"publish_dir": "published_output" "Lung Cancer"
],
"pattern": ".*",
"enum": [],
"validation": {},
"notes": "The disease name used to find or generate a Synthea disease module. Case-insensitive."
},
"population": {
"type": "integer",
"description": "Number of synthetic patients to generate",
"default": 10,
"required": false,
"pipeline_io": "parameter",
"var_name": "params.population",
"examples": [
10,
100,
1000
],
"pattern": "^\\d+$",
"enum": [],
"validation": {
"min": 1,
"max": 10000
},
"notes": "Higher numbers take longer to generate."
},
"gender": {
"type": "number",
"description": "Proportion of female patients (0.0 = all male, 1.0 = all female)",
"default": 0.5,
"required": false,
"pipeline_io": "parameter",
"var_name": "params.gender",
"examples": [
0.5,
0.0,
1.0
],
"pattern": "^[01]\\.?\\d*$",
"enum": [],
"validation": {
"min": 0.0,
"max": 1.0
},
"notes": "Decimal between 0.0 and 1.0 representing the proportion of female patients."
},
"min_age": {
"type": "integer",
"description": "Minimum age of generated patients",
"default": 0,
"required": false,
"pipeline_io": "parameter",
"var_name": "params.min_age",
"examples": [
0,
18,
40
],
"pattern": "^\\d+$",
"enum": [],
"validation": {
"min": 0,
"max": 140
},
"notes": "Minimum patient age in years."
},
"max_age": {
"type": "integer",
"description": "Maximum age of generated patients",
"default": 90,
"required": false,
"pipeline_io": "parameter",
"var_name": "params.max_age",
"examples": [
90,
65,
100
],
"pattern": "^\\d+$",
"enum": [],
"validation": {
"min": 1,
"max": 140
},
"notes": "Maximum patient age in years."
},
"outdir": {
"type": "folder",
"description": "Output directory for generated patient data",
"default": "s3://omic/eureka/synthea-alldiseases/output",
"required": true,
"pipeline_io": "output",
"var_name": "params.outdir",
"examples": [
"s3://omic/eureka/synthea-alldiseases/output",
"s3://omic/eureka/synthea-alldiseases/results"
],
"pattern": ".*",
"enum": [],
"validation": {},
"notes": "Directory where generated FHIR patient bundles will be stored."
}
}
} }

View File

@@ -1,77 +0,0 @@
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
// Default parameters
params.disease_name = "Diabetes" // Default disease name
params.output_dir = "output" // Output directory
params.modules_dir = "modules" // Directory for module files
// Process to generate synthetic patients
process generatePatients {
publishDir "${params.output_dir}/${params.disease_name.toLowerCase().replaceAll(' ', '_')}", mode: 'copy'
input:
path moduleFile
output:
path "**"
script:
"""
echo "Module file: ${moduleFile}"
echo "Disease: ${params.disease_name}"
# Check if Docker is available
if command -v docker &>/dev/null; then
echo "Docker is available, looking for Synthea container..."
# Find the Synthea container
container_id=\$(docker ps --format '{{.ID}}' --filter "name=synthea" | head -1)
if [ -n "\$container_id" ]; then
echo "Using Synthea container \$container_id"
# Copy module to container
docker exec \$container_id mkdir -p /app/modules
docker cp "${moduleFile}" \$container_id:/app/modules/
# Run Synthea with minimal parameters
docker exec \$container_id bash -c "cd /app && ./run_synthea -p 1 -m ${params.disease_name.toLowerCase().replaceAll(' ', '_')}"
# Copy output from container
docker cp \$container_id:/app/output/fhir ./ || mkdir -p ./fhir
docker cp \$container_id:/app/output/metadata ./ || mkdir -p ./metadata
echo "Completed patient generation"
else
echo "No Synthea container found, creating mock output for testing"
mkdir -p ./fhir ./metadata
echo "Mock FHIR data for ${params.disease_name}" > ./fhir/mock_patient.json
echo "Mock metadata for ${params.disease_name}" > ./metadata/mock_stats.json
fi
else
echo "Docker not available, creating mock output for testing"
mkdir -p ./fhir ./metadata
echo "Mock FHIR data for ${params.disease_name}" > ./fhir/mock_patient.json
echo "Mock metadata for ${params.disease_name}" > ./metadata/mock_stats.json
fi
"""
}
// Define workflow
workflow {
// Prepare module file
moduleFilename = params.disease_name.toLowerCase().replaceAll(' ', '_') + '.json'
moduleFile = file("${params.modules_dir}/${moduleFilename}")
if (!moduleFile.exists()) {
error "Module file not found: ${moduleFile}"
}
// Create a channel with the module file
moduleChannel = Channel.fromPath(moduleFile)
// Generate patients
generatePatients(moduleChannel)
}

View File

@@ -0,0 +1,5 @@
exporter.fhir.export = true
exporter.hospital.fhir.export = true
exporter.practitioner.fhir.export = true
generate.timestep = 7
generate.append_numbers_to_person_names = true

View File

@@ -1,2 +0,0 @@
task_id hash native_id name status exit submit duration realtime %cpu peak_rss peak_vmem rchar wchar
1 48/6b3902 74409 checkAndGetModule FAILED 1 2025-03-23 11:46:23.178 1.1s 995ms - - - - -