Large reference/model files excluded from repo - to be staged to S3 or baked into Docker images.
261 lines
8.5 KiB
Plaintext
261 lines
8.5 KiB
Plaintext
nextflow.enable.dsl=2
|
|
|
|
process CONVERT_TO_TXT {
|
|
memory 1.GB
|
|
|
|
container = "${params.container_ecotyper}"
|
|
containerOptions = "${params.containerOptions}"
|
|
publishDir "${params.ecotyper_outdir}/fractions", mode: 'copy'
|
|
// debug true
|
|
|
|
input:
|
|
path input_expression
|
|
|
|
output:
|
|
path '*.txt', emit: input_expression_txt
|
|
|
|
script:
|
|
"""
|
|
#!/bin/bash
|
|
name="\$(basename $input_expression .csv)"
|
|
sed -E 's/("([^"]*)")?,/\\2\\t/g' $input_expression > \${name}.txt
|
|
"""
|
|
}
|
|
|
|
process CIBERSORTx_FRACTIONS {
|
|
memory 4.GB
|
|
|
|
container = "${params.container_ecotyper}"
|
|
containerOptions = "${params.containerOptions}"
|
|
publishDir "${params.ecotyper_outdir}/fractions", mode: 'copy'
|
|
// debug true
|
|
|
|
input:
|
|
path input_expression, stageAs: 'input.txt'
|
|
path signature_matrix, stageAs: 'signature.txt'
|
|
|
|
output:
|
|
path "CIBERSORTx_Results.txt", emit: fractions_results
|
|
|
|
script:
|
|
"""
|
|
#!/bin/bash
|
|
set -ex
|
|
|
|
echo "Starting CIBERSORTx Fractions analysis..."
|
|
mkdir -p /src/data /src/outdir || { echo "Failed to create directories"; exit 1; }
|
|
|
|
# Prepare input files
|
|
echo "Preparing input files..."
|
|
sed 's/"//g' "input.txt" | tr -d '\\r' > /src/data/mixture.txt || { echo "Failed to prepare mixture file"; exit 1; }
|
|
sed 's/"//g' "signature.txt" | tr -d '\\r' > /src/data/signature.txt || { echo "Failed to prepare signature file"; exit 1; }
|
|
|
|
echo "Running CIBERSORTx Fractions"
|
|
/src/CIBERSORTxFractions \\
|
|
--mixture /src/data/mixture.txt \\
|
|
--sigmatrix /src/data/signature.txt \\
|
|
--outdir /src/outdir \\
|
|
--username "${params.cibersortx_username}" \\
|
|
--token "${params.cibersortx_token}"
|
|
|
|
if [ -f /src/outdir/CIBERSORTx_Results.txt ]; then
|
|
cp /src/outdir/CIBERSORTx_Results.txt .
|
|
else
|
|
echo "Error: CIBERSORTx_Results.txt not found"
|
|
ls -la /src/outdir/
|
|
exit 1
|
|
fi
|
|
"""
|
|
}
|
|
|
|
process CIBERSORTx_HIRES {
|
|
memory 1.GB
|
|
|
|
container = "${params.container_ecotyper}"
|
|
containerOptions = "${params.containerOptions}"
|
|
publishDir "${params.ecotyper_outdir}/hires", mode: 'copy'
|
|
// debug true
|
|
|
|
input:
|
|
path expression_matrix, stageAs: 'expression.txt'
|
|
path fractions_results, stageAs: 'fractions.txt'
|
|
path signature_matrix, stageAs: 'signature.txt'
|
|
|
|
output:
|
|
path "CIBERSORTx_HiRes_Matrix.txt", emit: hires_matrix
|
|
|
|
script:
|
|
"""
|
|
#!/opt/conda/envs/ecotyper/bin/python3
|
|
import pandas as pd
|
|
import numpy as np
|
|
import os
|
|
import glob
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
# Create directories
|
|
print("Starting CIBERSORTx HiRes analysis...")
|
|
for dir_name in ['/src/data', '/src/outdir', '/src/intermediate', '/src/temp']:
|
|
Path(dir_name).mkdir(parents=True, exist_ok=True)
|
|
|
|
# Read input files
|
|
print("=== READING INPUT FILES ===")
|
|
mixture_df = pd.read_csv('expression.txt', sep='\t', index_col=0)
|
|
signature_df = pd.read_csv('signature.txt', sep='\t', index_col=0)
|
|
fractions_df = pd.read_csv('fractions.txt', sep='\t')
|
|
|
|
# Data quality checks
|
|
print("=== DATA QUALITY CHECKS ===")
|
|
def check_data_quality(df, name):
|
|
print(f"Checking {name}:")
|
|
print(f" NaN values: {df.isna().sum().sum()}")
|
|
print(f" Negative values: {(df < 0).sum().sum()}")
|
|
print(f" Zero values: {(df == 0).sum().sum()}")
|
|
print(f" Sample stats:")
|
|
print(df.describe().round(3))
|
|
|
|
check_data_quality(mixture_df, "Mixture")
|
|
check_data_quality(signature_df, "Signature")
|
|
|
|
# Clean and normalize gene names
|
|
print("=== PROCESSING GENE NAMES ===")
|
|
def normalize_gene_name(x):
|
|
return str(x).strip().upper()
|
|
|
|
mixture_df.index = mixture_df.index.map(normalize_gene_name)
|
|
signature_df.index = signature_df.index.map(normalize_gene_name)
|
|
|
|
# Find common genes with exact matching
|
|
print("=== FINDING COMMON GENES ===")
|
|
common_genes = sorted(set(mixture_df.index) & set(signature_df.index))
|
|
print(f"Number of common genes found: {len(common_genes)}")
|
|
|
|
if len(common_genes) == 0:
|
|
raise ValueError("No common genes found between mixture and signature files")
|
|
|
|
# Create aligned matrices with exact same gene order
|
|
mixture_filtered = mixture_df.loc[common_genes].copy()
|
|
signature_filtered = signature_df.loc[common_genes].copy()
|
|
|
|
# Verify gene order matches exactly
|
|
if not (mixture_filtered.index == signature_filtered.index).all():
|
|
raise ValueError("Gene order mismatch after filtering")
|
|
|
|
# Replace any NaN values with 0
|
|
mixture_filtered = mixture_filtered.fillna(0)
|
|
signature_filtered = signature_filtered.fillna(0)
|
|
|
|
# Prepare output files
|
|
print("=== PREPARING OUTPUT FILES ===")
|
|
|
|
# Save mixture file
|
|
mixture_filtered.index.name = 'genesinput'
|
|
mixture_filtered.to_csv('/src/data/mixture.txt', sep='\t', float_format='%.6f')
|
|
|
|
# Save signature file (no transpose)
|
|
signature_filtered.index.name = 'genesinput'
|
|
signature_filtered.to_csv('/src/data/signature.txt', sep='\t', float_format='%.6f')
|
|
|
|
# Process and save weights file
|
|
meta_cols = ['Mixture', 'P-value', 'Correlation', 'RMSE']
|
|
data_cols = [col for col in fractions_df.columns if col not in meta_cols]
|
|
weights_df = fractions_df[data_cols].copy()
|
|
weights_df.fillna(0, inplace=True)
|
|
weights_df.to_csv('/src/outdir/CIBERSORTxGEP_NA_Weights.txt', sep='\t', index=False)
|
|
|
|
# Print dimensions and verify
|
|
print("=== MATRIX VALIDATION ===")
|
|
print(f"Matrix dimensions:")
|
|
print(f" Mixture: {mixture_filtered.shape[0]} genes x {mixture_filtered.shape[1]} samples")
|
|
print(f" Signature: {signature_filtered.shape[0]} genes x {signature_filtered.shape[1]} samples")
|
|
print(f" Weights: {len(data_cols)} cols")
|
|
|
|
if len(data_cols) < 1:
|
|
raise ValueError(f"Invalid weights columns count: {len(data_cols)}")
|
|
|
|
# Additional validations
|
|
if not np.isfinite(mixture_filtered.values).all():
|
|
raise ValueError("Non-finite values found in mixture matrix")
|
|
if not np.isfinite(signature_filtered.values).all():
|
|
raise ValueError("Non-finite values found in signature matrix")
|
|
if not np.isfinite(weights_df.values).all():
|
|
raise ValueError("Non-finite values found in weights matrix")
|
|
|
|
window_size = len(data_cols)
|
|
print(f"Using window size: {window_size}")
|
|
|
|
# Run CIBERSORTx
|
|
print("=== RUNNING CIBERSORTX HIRES ===")
|
|
cmd = [
|
|
'/src/CIBERSORTxHiRes',
|
|
'--mixture', '/src/data/mixture.txt',
|
|
'--sigmatrix', '/src/data/signature.txt',
|
|
'--cibresults', '/src/outdir/CIBERSORTxGEP_NA_Weights.txt',
|
|
'--outdir', '/src/outdir',
|
|
'--window', str(window_size),
|
|
'--username', '${params.cibersortx_username}',
|
|
'--token', '${params.cibersortx_token}',
|
|
'--QN', 'true'
|
|
]
|
|
|
|
result = subprocess.run(cmd, check=True)
|
|
|
|
# List all files in output directory
|
|
print("=== CHECKING OUTPUT FILES ===")
|
|
output_dir = Path('/src/outdir')
|
|
print("Files in output directory:")
|
|
for f in output_dir.glob('*'):
|
|
print(f" {f.name} ({f.stat().st_size} bytes)")
|
|
|
|
# Try to find any file that might be our result
|
|
gep_files = list(output_dir.glob('*GEP*.txt'))
|
|
if not gep_files:
|
|
filtered_files = list(output_dir.glob('*Filtered*.txt'))
|
|
if filtered_files:
|
|
result_file = filtered_files[0]
|
|
else:
|
|
all_txt_files = list(output_dir.glob('*.txt'))
|
|
if all_txt_files:
|
|
result_file = max(all_txt_files, key=lambda x: x.stat().st_size)
|
|
else:
|
|
raise FileNotFoundError("No suitable output files found")
|
|
else:
|
|
result_file = gep_files[0]
|
|
|
|
print(f"Using result file: {result_file}")
|
|
subprocess.run(['cp', str(result_file), 'CIBERSORTx_HiRes_Matrix.txt'], check=True)
|
|
"""
|
|
}
|
|
|
|
process ADD_TISSUE_NAMES_TO_CIBERSORTX{
|
|
memory 1.GB
|
|
|
|
container = "${params.container_ecotyper}"
|
|
containerOptions = "${params.containerOptions}"
|
|
publishDir "${params.ecotyper_outdir}/hires", mode: 'copy'
|
|
// debug true
|
|
|
|
input:
|
|
path EXPRESSION_MATRIX
|
|
path CIBERSORTx_HIRES
|
|
|
|
output:
|
|
path '*_immune_cells.csv'
|
|
|
|
script:
|
|
"""
|
|
#!/opt/conda/envs/ecotyper/bin/python3
|
|
import pandas as pd
|
|
|
|
cibersort_df = pd.read_csv('${CIBERSORTx_HIRES}', sep='\t')
|
|
expression_df = pd.read_csv('${EXPRESSION_MATRIX}', sep='\t', index_col=0)
|
|
|
|
pat_name = '${EXPRESSION_MATRIX}'.split('_TPM.txt')[0]
|
|
tissue_name = [i.split(':')[1] for i in expression_df.columns]
|
|
cibersort_df = cibersort_df.set_index(pd.Index(tissue_name, dtype='str'))
|
|
cibersort_df.index.name = 'Tissues'
|
|
cibersort_df.to_csv(pat_name + '_immune_cells.csv')
|
|
"""
|
|
}
|