Files
digital-patients/main_filter_outputs.nf
Olamide Isreal 9e6a16c19b Initial commit: digital-patients pipeline (clean, no large files)
Large reference/model files excluded from repo - to be staged to S3 or baked into Docker images.
2026-03-26 15:15:23 +01:00

426 lines
13 KiB
Plaintext

nextflow.enable.dsl=2
// ==================== FILTER VARIANTS ====================
process FILTER_VARIANTS {
memory 16.GB
container "${params.container_synthea}"
containerOptions "${params.containerOptions}"
publishDir "${params.outdir}/${params.project_name}/filtered", mode: 'copy'
input:
path vcf
output:
path "*_filtered_variants.vcf", emit: filtered_vcf
path "*_filtered_variants_summary.csv", emit: variants_summary
script:
"""
#!/opt/conda/envs/synthea/bin/python3
import pandas as pd
import numpy as np
import sys
patient_name = "${vcf}".replace('.vcf', '')
print(f"Processing VCF file: ${vcf}", file=sys.stderr)
# Read VCF file header
header = []
with open("${vcf}") as f:
for line in f:
if line.startswith('#'):
header.append(line)
else:
break
print(f"Header has {len(header)} lines", file=sys.stderr)
# Count total variants first
total_variants = 0
with open("${vcf}") as f:
for line in f:
if not line.startswith('#'):
total_variants += 1
print(f"Total variants: {total_variants}", file=sys.stderr)
if total_variants == 0:
print("No variants found, creating empty output", file=sys.stderr)
# Create empty output files
with open(f"{patient_name}_filtered_variants.vcf", 'w') as f:
f.writelines(header)
pd.DataFrame(columns=['CHROM', 'POS', 'REF', 'ALT', 'QUAL']).to_csv(
f"{patient_name}_filtered_variants_summary.csv", index=False
)
else:
# Read VCF data in chunks to avoid memory issues
vcf_columns = ['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT', 'SAMPLE']
chunk_size = 100000
print(f"Reading VCF in chunks of {chunk_size}", file=sys.stderr)
# Read file and process in chunks
df_list = []
with open("${vcf}") as f:
chunk = []
for line in f:
if not line.startswith('#'):
chunk.append(line.strip().split('\\t'))
if len(chunk) >= chunk_size:
df_chunk = pd.DataFrame(chunk, columns=vcf_columns)
df_chunk['QUAL_numeric'] = pd.to_numeric(df_chunk['QUAL'], errors='coerce')
df_list.append(df_chunk[['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'QUAL_numeric', 'FILTER', 'INFO', 'FORMAT', 'SAMPLE']])
chunk = []
print(f"Processed {len(df_list) * chunk_size} variants", file=sys.stderr)
# Process remaining chunk
if chunk:
df_chunk = pd.DataFrame(chunk, columns=vcf_columns)
df_chunk['QUAL_numeric'] = pd.to_numeric(df_chunk['QUAL'], errors='coerce')
df_list.append(df_chunk[['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'QUAL_numeric', 'FILTER', 'INFO', 'FORMAT', 'SAMPLE']])
print(f"Concatenating {len(df_list)} chunks", file=sys.stderr)
df = pd.concat(df_list, ignore_index=True)
print(f"Sorting by QUAL score", file=sys.stderr)
# Sort by QUAL score (proxy for clinical significance)
df_sorted = df.sort_values('QUAL_numeric', ascending=False)
# Take top N variants
top_n = min(${params.top_n_variants}, len(df_sorted))
df_filtered = df_sorted.head(top_n)
print(f"Selected top {top_n} variants", file=sys.stderr)
# Write filtered VCF (keep original VCF columns only)
print(f"Writing filtered VCF", file=sys.stderr)
with open(f"{patient_name}_filtered_variants.vcf", 'w') as f:
f.writelines(header)
for _, row in df_filtered.iterrows():
vcf_line = '\\t'.join(str(row[col]) for col in vcf_columns) + '\\n'
f.write(vcf_line)
# Create summary CSV
print(f"Writing summary CSV", file=sys.stderr)
summary = df_filtered[['CHROM', 'POS', 'REF', 'ALT', 'QUAL_numeric']].copy()
summary.columns = ['CHROM', 'POS', 'REF', 'ALT', 'QUAL']
summary.to_csv(f"{patient_name}_filtered_variants_summary.csv", index=False)
print(f"Done!", file=sys.stderr)
"""
}
// ==================== FILTER TRANSCRIPTOME ====================
process FILTER_TRANSCRIPTOME {
memory 2.GB
container "${params.container_borzoi}"
containerOptions "${params.containerOptions}"
publishDir "${params.outdir}/${params.project_name}/filtered", mode: 'copy'
input:
path tpm_file
output:
path "*_filtered_TPM.csv", emit: filtered_tpm
script:
"""
#!/opt/conda/envs/borzoi/bin/python
import pandas as pd
import numpy as np
patient_name = "${tpm_file}".replace('_TPM.csv', '')
# Read TPM data
df = pd.read_csv("${tpm_file}", index_col=0)
# Calculate mean expression across tissues
df['mean_expression'] = df.mean(axis=1)
# Calculate log2 fold change from median (as reference)
median_expr = df.drop('mean_expression', axis=1).median(axis=1)
df['log2FC'] = np.log2((df['mean_expression'] + 1) / (median_expr + 1))
# Filter: |log2FC| > 1.5
# Note: In production, you'd need proper control samples for statistical testing
df['abs_log2FC'] = abs(df['log2FC'])
df_filtered = df[df['abs_log2FC'] > ${params.transcriptome_log2fc_threshold}]
# Sort by absolute fold change and take top N
df_filtered = df_filtered.sort_values('abs_log2FC', ascending=False)
df_filtered = df_filtered.head(${params.top_n_genes})
# Remove helper columns
df_output = df_filtered.drop(['mean_expression', 'log2FC', 'abs_log2FC'], axis=1)
# Save filtered results
df_output.to_csv(f"{patient_name}_filtered_TPM.csv")
"""
}
// ==================== FILTER PROTEOME ====================
process FILTER_PROTEOME {
memory 2.GB
container "${params.container_rna2protexpression}"
containerOptions "${params.containerOptions}"
publishDir "${params.outdir}/${params.project_name}/filtered", mode: 'copy'
input:
path protein_file
output:
path "*_filtered_Protein_Expression.csv", emit: filtered_proteins
script:
"""
#!/opt/conda/envs/rna2protexpresson/bin/python3
import pandas as pd
import numpy as np
patient_name = "${protein_file}".replace('_Protein_Expression_log2.csv', '')
# Read protein expression data
df = pd.read_csv("${protein_file}", index_col=0)
# Calculate mean expression across tissues
df['mean_expression'] = df.mean(axis=1)
# Calculate 75th percentile threshold
threshold_75 = df['mean_expression'].quantile(0.75)
# Filter: Expression > 75th percentile
df_filtered = df[df['mean_expression'] > threshold_75]
# Sort by expression level and take top N
df_filtered = df_filtered.sort_values('mean_expression', ascending=False)
df_filtered = df_filtered.head(${params.top_n_proteins})
# Remove helper column
df_output = df_filtered.drop('mean_expression', axis=1)
# Save filtered results
df_output.to_csv(f"{patient_name}_filtered_Protein_Expression.csv")
"""
}
// ==================== FILTER IMMUNE CELLS ====================
process FILTER_IMMUNE_CELLS {
memory 1.GB
container "${params.container_ecotyper}"
containerOptions "${params.containerOptions}"
publishDir "${params.outdir}/${params.project_name}/filtered", mode: 'copy'
input:
path immune_file
output:
path "*_filtered_immune_cells.csv", emit: filtered_immune
script:
"""
#!/opt/conda/envs/ecotyper/bin/python3
import pandas as pd
patient_name = "${immune_file}".replace('_immune_cells.csv', '')
# Read immune cell data
df = pd.read_csv("${immune_file}", index_col=0)
df.to_csv(f"{patient_name}_filtered_immune_cells.csv")
"""
}
// ==================== FILTER METABOLOME ====================
process FILTER_METABOLOME {
memory 1.GB
container "${params.container_corto}"
containerOptions "${params.containerOptions}"
publishDir "${params.outdir}/${params.project_name}/filtered", mode: 'copy'
input:
path metabolome_file
output:
path "*_filtered_metabolome.csv", emit: filtered_metabolome
script:
"""
#!/usr/bin/Rscript
library(data.table)
patient_name <- gsub("_metabolome.csv", "", "${metabolome_file}")
# Read metabolome data
df <- fread("${metabolome_file}")
# Assuming the data has columns: pathway, activity_score, p_value
# Adjust based on actual CORTO output format
# Filter: p-value < 0.05 (if p-value column exists)
if ("P_value" %in% colnames(df)) {
df_filtered <- df[df\$P_value < ${params.metabolome_pvalue_threshold}, ]
} else if ("p_value" %in% colnames(df)) {
df_filtered <- df[df\$p_value < ${params.metabolome_pvalue_threshold}, ]
} else {
df_filtered <- df
}
# Sort by activity score (absolute value)
if ("Activity_Score" %in% colnames(df_filtered)) {
df_filtered <- df_filtered[order(-abs(df_filtered\$Activity_Score)), ]
} else if (ncol(df_filtered) > 1) {
# If column names are different, sort by second column (assuming it's the score)
df_filtered <- df_filtered[order(-abs(df_filtered[[2]])), ]
}
# Take top N pathways
df_filtered <- head(df_filtered, ${params.top_n_metabolites})
# Save filtered results
fwrite(df_filtered, paste0(patient_name, "_filtered_metabolome.csv"))
"""
}
// ==================== FILTER MUTATED PROTEINS ====================
process FILTER_MUTATED_PROTEINS {
memory 2.GB
container "${params.container_vcf2prot}"
containerOptions "${params.containerOptions}"
publishDir "${params.outdir}/${params.project_name}/filtered", mode: 'copy'
input:
path fasta_file
path filtered_vcf
output:
path "*_filtered_mutations.fasta", emit: filtered_fasta
script:
"""
#!/opt/conda/envs/vcf2prot/bin/python3
import re
patient_name = "${fasta_file}".replace('_transcript_id_mutations.fasta', '')
# Read filtered VCF to get list of positions to keep
filtered_positions = set()
with open("${filtered_vcf}") as f:
for line in f:
if not line.startswith('#'):
parts = line.strip().split('\\t')
if len(parts) >= 2:
filtered_positions.add(f"{parts[0]}:{parts[1]}")
# Read FASTA and filter based on mutations in filtered VCF
sequences = {}
current_header = None
current_seq = []
with open("${fasta_file}") as f:
for line in f:
if line.startswith('>'):
if current_header:
sequences[current_header] = ''.join(current_seq)
current_header = line.strip()
current_seq = []
else:
current_seq.append(line.strip())
if current_header:
sequences[current_header] = ''.join(current_seq)
# Write filtered FASTA (keep top N based on filtered variants)
with open(f"{patient_name}_filtered_mutations.fasta", 'w') as f:
count = 0
for header, seq in sequences.items():
if count >= ${params.top_n_variants}:
break
f.write(header + '\\n')
# Write sequence in 60 character lines
for i in range(0, len(seq), 60):
f.write(seq[i:i+60] + '\\n')
count += 1
"""
}
// ==================== CREATE SUMMARY REPORT ====================
process CREATE_SUMMARY_REPORT {
memory 1.GB
container "${params.container_synthea}"
containerOptions "${params.containerOptions}"
publishDir "${params.outdir}/${params.project_name}/filtered", mode: 'copy'
input:
path filtered_tpm
path filtered_proteins
path filtered_immune
path filtered_metabolome
path filtered_vcf
output:
path "*_summary_report.json"
script:
"""
#!/opt/conda/envs/synthea/bin/python3
import pandas as pd
import json
patient_name = "${filtered_tpm}".replace('_filtered_TPM.csv', '')
summary = {
'patient_id': patient_name,
'filtering_params': {
'top_n_genes': ${params.top_n_genes},
'top_n_proteins': ${params.top_n_proteins},
'top_n_immune_cells': ${params.top_n_immune_cells},
'top_n_metabolites': ${params.top_n_metabolites},
'top_n_variants': ${params.top_n_variants}
},
'filtered_counts': {}
}
# Count filtered entries
try:
df = pd.read_csv("${filtered_tpm}", index_col=0)
summary['filtered_counts']['genes'] = len(df)
except:
summary['filtered_counts']['genes'] = 0
try:
df = pd.read_csv("${filtered_proteins}", index_col=0)
summary['filtered_counts']['proteins'] = len(df)
except:
summary['filtered_counts']['proteins'] = 0
try:
df = pd.read_csv("${filtered_immune}", index_col=0)
summary['filtered_counts']['immune_cell_types'] = len(df.columns)
except:
summary['filtered_counts']['immune_cell_types'] = 0
try:
df = pd.read_csv("${filtered_metabolome}")
summary['filtered_counts']['metabolic_pathways'] = len(df)
except:
summary['filtered_counts']['metabolic_pathways'] = 0
try:
with open("${filtered_vcf}") as f:
count = sum(1 for line in f if not line.startswith('#'))
summary['filtered_counts']['variants'] = count
except:
summary['filtered_counts']['variants'] = 0
# Save summary
with open(f"{patient_name}_summary_report.json", 'w') as f:
json.dump(summary, f, indent=2)
"""
}