Fix stage_data.nf: use synthea:cudf container, Python-based downloads with sshpass/expect fallback
This commit is contained in:
163
stage_data.nf
163
stage_data.nf
@@ -1,83 +1,104 @@
|
|||||||
nextflow.enable.dsl=2
|
nextflow.enable.dsl=2
|
||||||
|
|
||||||
process STAGE_DATA {
|
process STAGE_DATA {
|
||||||
container 'ubuntu:22.04'
|
container 'harbor.cluster.omic.ai/omic/digital-patients/synthea:cudf'
|
||||||
memory '4 GB'
|
memory '4 GB'
|
||||||
cpus 2
|
cpus 2
|
||||||
|
|
||||||
script:
|
script:
|
||||||
"""
|
"""
|
||||||
BASE="/omic/eureka/digital-patients"
|
#!/opt/conda/envs/synthea/bin/python3
|
||||||
|
import subprocess, os, sys
|
||||||
|
|
||||||
|
BASE = "/omic/eureka/digital-patients"
|
||||||
|
|
||||||
|
def run(cmd):
|
||||||
|
print(f"RUN: {cmd}", flush=True)
|
||||||
|
r = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
||||||
|
if r.stdout: print(r.stdout, flush=True)
|
||||||
|
if r.stderr: print(r.stderr, flush=True)
|
||||||
|
return r.returncode
|
||||||
|
|
||||||
|
print("=== Cleaning up old _parts directories ===", flush=True)
|
||||||
|
run(f"find {BASE} -name '*_parts' -type d -exec rm -rf {{}} + 2>/dev/null || true")
|
||||||
|
|
||||||
|
print("=== Creating directory structure ===", flush=True)
|
||||||
|
for d in ["imputed", "healthy", "supporting-data/vcf", "supporting-data/ucsc-liftover", "supporting-data/genome", "output"]:
|
||||||
|
os.makedirs(f"{BASE}/{d}", exist_ok=True)
|
||||||
|
|
||||||
|
print("=== Current PVC state ===", flush=True)
|
||||||
|
run(f"ls -la {BASE}/")
|
||||||
|
|
||||||
|
print("=== Installing sshpass ===", flush=True)
|
||||||
|
# Try conda first, then apt
|
||||||
|
rc = run("conda install -y -c conda-forge sshpass 2>/dev/null || apt-get update -qq && apt-get install -y -qq sshpass 2>/dev/null || pip install sshpass 2>/dev/null")
|
||||||
|
# Check if sshpass is available
|
||||||
|
rc = run("which sshpass || echo 'sshpass not found, trying expect'")
|
||||||
|
|
||||||
echo "=== Installing dependencies ==="
|
# If sshpass not available, use expect or python paramiko
|
||||||
apt-get update -qq && apt-get install -y -qq openssh-client sshpass curl > /dev/null 2>&1
|
has_sshpass = subprocess.run("which sshpass", shell=True, capture_output=True).returncode == 0
|
||||||
|
|
||||||
echo "=== Cleaning up old _parts directories ==="
|
def download(remote, local_path):
|
||||||
find \$BASE -name "*_parts" -type d -exec rm -rf {} + 2>/dev/null || true
|
if os.path.isfile(local_path):
|
||||||
echo "Cleanup done"
|
size = os.path.getsize(local_path)
|
||||||
|
print(f"SKIP (exists, {size} bytes): {local_path}", flush=True)
|
||||||
echo "=== Creating directory structure ==="
|
return True
|
||||||
mkdir -p \$BASE/imputed \$BASE/healthy \$BASE/supporting-data/vcf \$BASE/supporting-data/ucsc-liftover \$BASE/supporting-data/genome \$BASE/output
|
print(f"Downloading: {remote} -> {local_path}", flush=True)
|
||||||
|
if has_sshpass:
|
||||||
echo "=== Current state of PVC ==="
|
cmd = f"sshpass -p 'bl3rg3r5' scp -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -P 9100 'omic@nucleus.omic.ai:{remote}' '{local_path}'"
|
||||||
ls -la \$BASE/ 2>/dev/null
|
else:
|
||||||
|
# Fallback: use expect
|
||||||
download() {
|
cmd = f'''expect -c "
|
||||||
local remote="\$1"
|
set timeout 3600
|
||||||
local local_path="\$2"
|
spawn scp -o StrictHostKeyChecking=no -P 9100 omic@nucleus.omic.ai:{remote} {local_path}
|
||||||
if [ -f "\$local_path" ]; then
|
expect \\"password:\\"
|
||||||
echo "SKIP (exists): \$local_path"
|
send \\"bl3rg3r5\\r\\"
|
||||||
ls -lh "\$local_path"
|
expect eof
|
||||||
return
|
"'''
|
||||||
fi
|
rc = run(cmd)
|
||||||
echo "Downloading: \$remote -> \$local_path"
|
if rc == 0 and os.path.isfile(local_path):
|
||||||
sshpass -p 'bl3rg3r5' scp -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -P 9100 "omic@nucleus.omic.ai:\$remote" "\$local_path"
|
size = os.path.getsize(local_path)
|
||||||
ls -lh "\$local_path"
|
print(f" OK: {size} bytes", flush=True)
|
||||||
}
|
return True
|
||||||
|
else:
|
||||||
echo "=== Downloading imputed files ==="
|
print(f" FAILED (rc={rc})", flush=True)
|
||||||
download "/mnt/Avatar/imputed/ukbb/imputed/F5_SCHIZO.gwas.imputed_v3.both_sexes.tsv.bgz" "\$BASE/imputed/F5_SCHIZO.gwas.imputed_v3.both_sexes.tsv.bgz"
|
return False
|
||||||
download "/mnt/Avatar/imputed/ukbb/imputed/F5_SCHIZO.gwas.imputed_v3.female.tsv.bgz" "\$BASE/imputed/F5_SCHIZO.gwas.imputed_v3.female.tsv.bgz"
|
|
||||||
download "/mnt/Avatar/imputed/ukbb/imputed/F5_SCHIZO.gwas.imputed_v3.male.tsv.bgz" "\$BASE/imputed/F5_SCHIZO.gwas.imputed_v3.male.tsv.bgz"
|
files = [
|
||||||
|
("/mnt/Avatar/imputed/ukbb/imputed/F5_SCHIZO.gwas.imputed_v3.both_sexes.tsv.bgz", f"{BASE}/imputed/F5_SCHIZO.gwas.imputed_v3.both_sexes.tsv.bgz"),
|
||||||
echo "=== Downloading gnomad files ==="
|
("/mnt/Avatar/imputed/ukbb/imputed/F5_SCHIZO.gwas.imputed_v3.female.tsv.bgz", f"{BASE}/imputed/F5_SCHIZO.gwas.imputed_v3.female.tsv.bgz"),
|
||||||
download "/mnt/Avatar/digital_patient/gnomad.genomes.v4.1.sites.female.txt" "\$BASE/healthy/gnomad.genomes.v4.1.sites.female.txt"
|
("/mnt/Avatar/imputed/ukbb/imputed/F5_SCHIZO.gwas.imputed_v3.male.tsv.bgz", f"{BASE}/imputed/F5_SCHIZO.gwas.imputed_v3.male.tsv.bgz"),
|
||||||
download "/mnt/Avatar/digital_patient/gnomad.genomes.v4.1.sites.male.txt" "\$BASE/healthy/gnomad.genomes.v4.1.sites.male.txt"
|
("/mnt/Avatar/digital_patient/gnomad.genomes.v4.1.sites.female.txt", f"{BASE}/healthy/gnomad.genomes.v4.1.sites.female.txt"),
|
||||||
|
("/mnt/Avatar/digital_patient/gnomad.genomes.v4.1.sites.male.txt", f"{BASE}/healthy/gnomad.genomes.v4.1.sites.male.txt"),
|
||||||
echo "=== Downloading supporting-data files ==="
|
("/mnt/Avatar/dd/synthea/supporting-data/genome/hg38.fa", f"{BASE}/supporting-data/genome/hg38.fa"),
|
||||||
download "/mnt/Avatar/dd/synthea/supporting-data/genome/hg38.fa" "\$BASE/supporting-data/genome/hg38.fa"
|
("/mnt/Avatar/dd/synthea/supporting-data/genome/hg38.dict", f"{BASE}/supporting-data/genome/hg38.dict"),
|
||||||
download "/mnt/Avatar/dd/synthea/supporting-data/genome/hg38.dict" "\$BASE/supporting-data/genome/hg38.dict"
|
("/mnt/Avatar/dd/synthea/supporting-data/ucsc-liftover/hg19ToHg38.over.chain.gz", f"{BASE}/supporting-data/ucsc-liftover/hg19ToHg38.over.chain.gz"),
|
||||||
download "/mnt/Avatar/dd/synthea/supporting-data/ucsc-liftover/hg19ToHg38.over.chain.gz" "\$BASE/supporting-data/ucsc-liftover/hg19ToHg38.over.chain.gz"
|
("/mnt/Avatar/dd/synthea/supporting-data/vcf/vcf_template.vcf", f"{BASE}/supporting-data/vcf/vcf_template.vcf"),
|
||||||
download "/mnt/Avatar/dd/synthea/supporting-data/vcf/vcf_template.vcf" "\$BASE/supporting-data/vcf/vcf_template.vcf"
|
("/mnt/Avatar/imputed/ukbb/metadata/ukbb_phenotypes.csv", f"{BASE}/ukbb_phenotypes_filtered.csv"),
|
||||||
|
]
|
||||||
echo "=== Downloading phenotype file ==="
|
|
||||||
download "/mnt/Avatar/imputed/ukbb/metadata/ukbb_phenotypes.csv" "\$BASE/ukbb_phenotypes_filtered.csv"
|
print("=== Downloading files ===", flush=True)
|
||||||
|
ok = 0
|
||||||
echo "=== Copying small files from DRS (already on PVC via DRS upload) ==="
|
fail = 0
|
||||||
# These were uploaded via DRS and should already be on PVC
|
for remote, local in files:
|
||||||
# If not, they're in the git repo that WES cloned
|
if download(remote, local):
|
||||||
for f in MANE.GRCh38.v1.3.update.tsv regulon.rda LM22_sourceGEP_ensg.txt; do
|
ok += 1
|
||||||
if [ ! -f "\$BASE/\$f" ]; then
|
else:
|
||||||
echo "Small file missing on PVC, checking if available from WES workdir..."
|
fail += 1
|
||||||
# WES clones the repo, so the file might be in the current workdir's repo
|
|
||||||
else
|
print(f"\\n=== Download results: {ok} ok, {fail} failed ===", flush=True)
|
||||||
echo "SKIP (exists): \$BASE/\$f"
|
|
||||||
fi
|
print("\\n=== Final verification ===", flush=True)
|
||||||
done
|
run(f"echo 'Imputed:' && ls -lh {BASE}/imputed/*.bgz 2>/dev/null || echo ' NONE'")
|
||||||
|
run(f"echo 'Healthy:' && ls -lh {BASE}/healthy/*.txt 2>/dev/null || echo ' NONE'")
|
||||||
echo "=== Final verification ==="
|
run(f"echo 'Genome:' && ls -lh {BASE}/supporting-data/genome/* 2>/dev/null || echo ' NONE'")
|
||||||
echo "Imputed:"
|
run(f"echo 'Support:' && ls -lh {BASE}/supporting-data/vcf/* {BASE}/supporting-data/ucsc-liftover/* 2>/dev/null || echo ' NONE'")
|
||||||
ls -lh \$BASE/imputed/*.bgz 2>/dev/null || echo " NONE"
|
run(f"echo 'Small:' && ls -lh {BASE}/MANE* {BASE}/regulon* {BASE}/LM22* {BASE}/ukbb* 2>/dev/null || echo ' NONE'")
|
||||||
echo "Healthy:"
|
|
||||||
ls -lh \$BASE/healthy/*.txt 2>/dev/null || echo " NONE"
|
if fail > 0:
|
||||||
echo "Genome:"
|
print("STAGING FAILED - some files missing", flush=True)
|
||||||
ls -lh \$BASE/supporting-data/genome/* 2>/dev/null || echo " NONE"
|
sys.exit(1)
|
||||||
echo "Supporting-data:"
|
print("=== STAGING COMPLETE ===", flush=True)
|
||||||
ls -lh \$BASE/supporting-data/vcf/* \$BASE/supporting-data/ucsc-liftover/* 2>/dev/null || echo " NONE"
|
|
||||||
echo "Small files:"
|
|
||||||
ls -lh \$BASE/MANE* \$BASE/regulon* \$BASE/LM22* \$BASE/ukbb* 2>/dev/null || echo " NONE"
|
|
||||||
|
|
||||||
echo "=== STAGING COMPLETE ==="
|
|
||||||
"""
|
"""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user