Initial commit: digital-patients pipeline (clean, no large files)
Large reference/model files excluded from repo - to be staged to S3 or baked into Docker images.
This commit is contained in:
260
main_cibersortx.nf
Normal file
260
main_cibersortx.nf
Normal file
@@ -0,0 +1,260 @@
|
||||
nextflow.enable.dsl=2
|
||||
|
||||
process CONVERT_TO_TXT {
|
||||
memory 1.GB
|
||||
|
||||
container = "${params.container_ecotyper}"
|
||||
containerOptions = "${params.containerOptions}"
|
||||
publishDir "${params.ecotyper_outdir}/fractions", mode: 'copy'
|
||||
// debug true
|
||||
|
||||
input:
|
||||
path input_expression
|
||||
|
||||
output:
|
||||
path '*.txt', emit: input_expression_txt
|
||||
|
||||
script:
|
||||
"""
|
||||
#!/bin/bash
|
||||
name="\$(basename $input_expression .csv)"
|
||||
sed -E 's/("([^"]*)")?,/\\2\\t/g' $input_expression > \${name}.txt
|
||||
"""
|
||||
}
|
||||
|
||||
process CIBERSORTx_FRACTIONS {
|
||||
memory 4.GB
|
||||
|
||||
container = "${params.container_ecotyper}"
|
||||
containerOptions = "${params.containerOptions}"
|
||||
publishDir "${params.ecotyper_outdir}/fractions", mode: 'copy'
|
||||
// debug true
|
||||
|
||||
input:
|
||||
path input_expression, stageAs: 'input.txt'
|
||||
path signature_matrix, stageAs: 'signature.txt'
|
||||
|
||||
output:
|
||||
path "CIBERSORTx_Results.txt", emit: fractions_results
|
||||
|
||||
script:
|
||||
"""
|
||||
#!/bin/bash
|
||||
set -ex
|
||||
|
||||
echo "Starting CIBERSORTx Fractions analysis..."
|
||||
mkdir -p /src/data /src/outdir || { echo "Failed to create directories"; exit 1; }
|
||||
|
||||
# Prepare input files
|
||||
echo "Preparing input files..."
|
||||
sed 's/"//g' "input.txt" | tr -d '\\r' > /src/data/mixture.txt || { echo "Failed to prepare mixture file"; exit 1; }
|
||||
sed 's/"//g' "signature.txt" | tr -d '\\r' > /src/data/signature.txt || { echo "Failed to prepare signature file"; exit 1; }
|
||||
|
||||
echo "Running CIBERSORTx Fractions"
|
||||
/src/CIBERSORTxFractions \\
|
||||
--mixture /src/data/mixture.txt \\
|
||||
--sigmatrix /src/data/signature.txt \\
|
||||
--outdir /src/outdir \\
|
||||
--username "${params.cibersortx_username}" \\
|
||||
--token "${params.cibersortx_token}"
|
||||
|
||||
if [ -f /src/outdir/CIBERSORTx_Results.txt ]; then
|
||||
cp /src/outdir/CIBERSORTx_Results.txt .
|
||||
else
|
||||
echo "Error: CIBERSORTx_Results.txt not found"
|
||||
ls -la /src/outdir/
|
||||
exit 1
|
||||
fi
|
||||
"""
|
||||
}
|
||||
|
||||
process CIBERSORTx_HIRES {
|
||||
memory 1.GB
|
||||
|
||||
container = "${params.container_ecotyper}"
|
||||
containerOptions = "${params.containerOptions}"
|
||||
publishDir "${params.ecotyper_outdir}/hires", mode: 'copy'
|
||||
// debug true
|
||||
|
||||
input:
|
||||
path expression_matrix, stageAs: 'expression.txt'
|
||||
path fractions_results, stageAs: 'fractions.txt'
|
||||
path signature_matrix, stageAs: 'signature.txt'
|
||||
|
||||
output:
|
||||
path "CIBERSORTx_HiRes_Matrix.txt", emit: hires_matrix
|
||||
|
||||
script:
|
||||
"""
|
||||
#!/opt/conda/envs/ecotyper/bin/python3
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import os
|
||||
import glob
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
# Create directories
|
||||
print("Starting CIBERSORTx HiRes analysis...")
|
||||
for dir_name in ['/src/data', '/src/outdir', '/src/intermediate', '/src/temp']:
|
||||
Path(dir_name).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Read input files
|
||||
print("=== READING INPUT FILES ===")
|
||||
mixture_df = pd.read_csv('expression.txt', sep='\t', index_col=0)
|
||||
signature_df = pd.read_csv('signature.txt', sep='\t', index_col=0)
|
||||
fractions_df = pd.read_csv('fractions.txt', sep='\t')
|
||||
|
||||
# Data quality checks
|
||||
print("=== DATA QUALITY CHECKS ===")
|
||||
def check_data_quality(df, name):
|
||||
print(f"Checking {name}:")
|
||||
print(f" NaN values: {df.isna().sum().sum()}")
|
||||
print(f" Negative values: {(df < 0).sum().sum()}")
|
||||
print(f" Zero values: {(df == 0).sum().sum()}")
|
||||
print(f" Sample stats:")
|
||||
print(df.describe().round(3))
|
||||
|
||||
check_data_quality(mixture_df, "Mixture")
|
||||
check_data_quality(signature_df, "Signature")
|
||||
|
||||
# Clean and normalize gene names
|
||||
print("=== PROCESSING GENE NAMES ===")
|
||||
def normalize_gene_name(x):
|
||||
return str(x).strip().upper()
|
||||
|
||||
mixture_df.index = mixture_df.index.map(normalize_gene_name)
|
||||
signature_df.index = signature_df.index.map(normalize_gene_name)
|
||||
|
||||
# Find common genes with exact matching
|
||||
print("=== FINDING COMMON GENES ===")
|
||||
common_genes = sorted(set(mixture_df.index) & set(signature_df.index))
|
||||
print(f"Number of common genes found: {len(common_genes)}")
|
||||
|
||||
if len(common_genes) == 0:
|
||||
raise ValueError("No common genes found between mixture and signature files")
|
||||
|
||||
# Create aligned matrices with exact same gene order
|
||||
mixture_filtered = mixture_df.loc[common_genes].copy()
|
||||
signature_filtered = signature_df.loc[common_genes].copy()
|
||||
|
||||
# Verify gene order matches exactly
|
||||
if not (mixture_filtered.index == signature_filtered.index).all():
|
||||
raise ValueError("Gene order mismatch after filtering")
|
||||
|
||||
# Replace any NaN values with 0
|
||||
mixture_filtered = mixture_filtered.fillna(0)
|
||||
signature_filtered = signature_filtered.fillna(0)
|
||||
|
||||
# Prepare output files
|
||||
print("=== PREPARING OUTPUT FILES ===")
|
||||
|
||||
# Save mixture file
|
||||
mixture_filtered.index.name = 'genesinput'
|
||||
mixture_filtered.to_csv('/src/data/mixture.txt', sep='\t', float_format='%.6f')
|
||||
|
||||
# Save signature file (no transpose)
|
||||
signature_filtered.index.name = 'genesinput'
|
||||
signature_filtered.to_csv('/src/data/signature.txt', sep='\t', float_format='%.6f')
|
||||
|
||||
# Process and save weights file
|
||||
meta_cols = ['Mixture', 'P-value', 'Correlation', 'RMSE']
|
||||
data_cols = [col for col in fractions_df.columns if col not in meta_cols]
|
||||
weights_df = fractions_df[data_cols].copy()
|
||||
weights_df.fillna(0, inplace=True)
|
||||
weights_df.to_csv('/src/outdir/CIBERSORTxGEP_NA_Weights.txt', sep='\t', index=False)
|
||||
|
||||
# Print dimensions and verify
|
||||
print("=== MATRIX VALIDATION ===")
|
||||
print(f"Matrix dimensions:")
|
||||
print(f" Mixture: {mixture_filtered.shape[0]} genes x {mixture_filtered.shape[1]} samples")
|
||||
print(f" Signature: {signature_filtered.shape[0]} genes x {signature_filtered.shape[1]} samples")
|
||||
print(f" Weights: {len(data_cols)} cols")
|
||||
|
||||
if len(data_cols) < 1:
|
||||
raise ValueError(f"Invalid weights columns count: {len(data_cols)}")
|
||||
|
||||
# Additional validations
|
||||
if not np.isfinite(mixture_filtered.values).all():
|
||||
raise ValueError("Non-finite values found in mixture matrix")
|
||||
if not np.isfinite(signature_filtered.values).all():
|
||||
raise ValueError("Non-finite values found in signature matrix")
|
||||
if not np.isfinite(weights_df.values).all():
|
||||
raise ValueError("Non-finite values found in weights matrix")
|
||||
|
||||
window_size = len(data_cols)
|
||||
print(f"Using window size: {window_size}")
|
||||
|
||||
# Run CIBERSORTx
|
||||
print("=== RUNNING CIBERSORTX HIRES ===")
|
||||
cmd = [
|
||||
'/src/CIBERSORTxHiRes',
|
||||
'--mixture', '/src/data/mixture.txt',
|
||||
'--sigmatrix', '/src/data/signature.txt',
|
||||
'--cibresults', '/src/outdir/CIBERSORTxGEP_NA_Weights.txt',
|
||||
'--outdir', '/src/outdir',
|
||||
'--window', str(window_size),
|
||||
'--username', '${params.cibersortx_username}',
|
||||
'--token', '${params.cibersortx_token}',
|
||||
'--QN', 'true'
|
||||
]
|
||||
|
||||
result = subprocess.run(cmd, check=True)
|
||||
|
||||
# List all files in output directory
|
||||
print("=== CHECKING OUTPUT FILES ===")
|
||||
output_dir = Path('/src/outdir')
|
||||
print("Files in output directory:")
|
||||
for f in output_dir.glob('*'):
|
||||
print(f" {f.name} ({f.stat().st_size} bytes)")
|
||||
|
||||
# Try to find any file that might be our result
|
||||
gep_files = list(output_dir.glob('*GEP*.txt'))
|
||||
if not gep_files:
|
||||
filtered_files = list(output_dir.glob('*Filtered*.txt'))
|
||||
if filtered_files:
|
||||
result_file = filtered_files[0]
|
||||
else:
|
||||
all_txt_files = list(output_dir.glob('*.txt'))
|
||||
if all_txt_files:
|
||||
result_file = max(all_txt_files, key=lambda x: x.stat().st_size)
|
||||
else:
|
||||
raise FileNotFoundError("No suitable output files found")
|
||||
else:
|
||||
result_file = gep_files[0]
|
||||
|
||||
print(f"Using result file: {result_file}")
|
||||
subprocess.run(['cp', str(result_file), 'CIBERSORTx_HiRes_Matrix.txt'], check=True)
|
||||
"""
|
||||
}
|
||||
|
||||
process ADD_TISSUE_NAMES_TO_CIBERSORTX{
|
||||
memory 1.GB
|
||||
|
||||
container = "${params.container_ecotyper}"
|
||||
containerOptions = "${params.containerOptions}"
|
||||
publishDir "${params.ecotyper_outdir}/hires", mode: 'copy'
|
||||
// debug true
|
||||
|
||||
input:
|
||||
path EXPRESSION_MATRIX
|
||||
path CIBERSORTx_HIRES
|
||||
|
||||
output:
|
||||
path '*_immune_cells.csv'
|
||||
|
||||
script:
|
||||
"""
|
||||
#!/opt/conda/envs/ecotyper/bin/python3
|
||||
import pandas as pd
|
||||
|
||||
cibersort_df = pd.read_csv('${CIBERSORTx_HIRES}', sep='\t')
|
||||
expression_df = pd.read_csv('${EXPRESSION_MATRIX}', sep='\t', index_col=0)
|
||||
|
||||
pat_name = '${EXPRESSION_MATRIX}'.split('_TPM.txt')[0]
|
||||
tissue_name = [i.split(':')[1] for i in expression_df.columns]
|
||||
cibersort_df = cibersort_df.set_index(pd.Index(tissue_name, dtype='str'))
|
||||
cibersort_df.index.name = 'Tissues'
|
||||
cibersort_df.to_csv(pat_name + '_immune_cells.csv')
|
||||
"""
|
||||
}
|
||||
Reference in New Issue
Block a user