nextflow.enable.dsl=2 process CONVERT_TO_TXT { memory 1.GB container = "${params.container_ecotyper}" containerOptions = "${params.containerOptions}" publishDir "${params.ecotyper_outdir}/fractions", mode: 'copy' // debug true input: path input_expression output: path '*.txt', emit: input_expression_txt script: """ #!/bin/bash name="\$(basename $input_expression .csv)" sed -E 's/("([^"]*)")?,/\\2\\t/g' $input_expression > \${name}.txt """ } process CIBERSORTx_FRACTIONS { memory 4.GB container = "${params.container_ecotyper}" containerOptions = "${params.containerOptions}" publishDir "${params.ecotyper_outdir}/fractions", mode: 'copy' // debug true input: path input_expression, stageAs: 'input.txt' path signature_matrix, stageAs: 'signature.txt' output: path "CIBERSORTx_Results.txt", emit: fractions_results script: """ #!/bin/bash set -ex echo "Starting CIBERSORTx Fractions analysis..." mkdir -p /src/data /src/outdir || { echo "Failed to create directories"; exit 1; } # Prepare input files echo "Preparing input files..." sed 's/"//g' "input.txt" | tr -d '\\r' > /src/data/mixture.txt || { echo "Failed to prepare mixture file"; exit 1; } sed 's/"//g' "signature.txt" | tr -d '\\r' > /src/data/signature.txt || { echo "Failed to prepare signature file"; exit 1; } echo "Running CIBERSORTx Fractions" /src/CIBERSORTxFractions \\ --mixture /src/data/mixture.txt \\ --sigmatrix /src/data/signature.txt \\ --outdir /src/outdir \\ --username "${params.cibersortx_username}" \\ --token "${params.cibersortx_token}" if [ -f /src/outdir/CIBERSORTx_Results.txt ]; then cp /src/outdir/CIBERSORTx_Results.txt . else echo "Error: CIBERSORTx_Results.txt not found" ls -la /src/outdir/ exit 1 fi """ } process CIBERSORTx_HIRES { memory 1.GB container = "${params.container_ecotyper}" containerOptions = "${params.containerOptions}" publishDir "${params.ecotyper_outdir}/hires", mode: 'copy' // debug true input: path expression_matrix, stageAs: 'expression.txt' path fractions_results, stageAs: 'fractions.txt' path signature_matrix, stageAs: 'signature.txt' output: path "CIBERSORTx_HiRes_Matrix.txt", emit: hires_matrix script: """ #!/opt/conda/envs/ecotyper/bin/python3 import pandas as pd import numpy as np import os import glob import subprocess from pathlib import Path # Create directories print("Starting CIBERSORTx HiRes analysis...") for dir_name in ['/src/data', '/src/outdir', '/src/intermediate', '/src/temp']: Path(dir_name).mkdir(parents=True, exist_ok=True) # Read input files print("=== READING INPUT FILES ===") mixture_df = pd.read_csv('expression.txt', sep='\t', index_col=0) signature_df = pd.read_csv('signature.txt', sep='\t', index_col=0) fractions_df = pd.read_csv('fractions.txt', sep='\t') # Data quality checks print("=== DATA QUALITY CHECKS ===") def check_data_quality(df, name): print(f"Checking {name}:") print(f" NaN values: {df.isna().sum().sum()}") print(f" Negative values: {(df < 0).sum().sum()}") print(f" Zero values: {(df == 0).sum().sum()}") print(f" Sample stats:") print(df.describe().round(3)) check_data_quality(mixture_df, "Mixture") check_data_quality(signature_df, "Signature") # Clean and normalize gene names print("=== PROCESSING GENE NAMES ===") def normalize_gene_name(x): return str(x).strip().upper() mixture_df.index = mixture_df.index.map(normalize_gene_name) signature_df.index = signature_df.index.map(normalize_gene_name) # Find common genes with exact matching print("=== FINDING COMMON GENES ===") common_genes = sorted(set(mixture_df.index) & set(signature_df.index)) print(f"Number of common genes found: {len(common_genes)}") if len(common_genes) == 0: raise ValueError("No common genes found between mixture and signature files") # Create aligned matrices with exact same gene order mixture_filtered = mixture_df.loc[common_genes].copy() signature_filtered = signature_df.loc[common_genes].copy() # Verify gene order matches exactly if not (mixture_filtered.index == signature_filtered.index).all(): raise ValueError("Gene order mismatch after filtering") # Replace any NaN values with 0 mixture_filtered = mixture_filtered.fillna(0) signature_filtered = signature_filtered.fillna(0) # Prepare output files print("=== PREPARING OUTPUT FILES ===") # Save mixture file mixture_filtered.index.name = 'genesinput' mixture_filtered.to_csv('/src/data/mixture.txt', sep='\t', float_format='%.6f') # Save signature file (no transpose) signature_filtered.index.name = 'genesinput' signature_filtered.to_csv('/src/data/signature.txt', sep='\t', float_format='%.6f') # Process and save weights file meta_cols = ['Mixture', 'P-value', 'Correlation', 'RMSE'] data_cols = [col for col in fractions_df.columns if col not in meta_cols] weights_df = fractions_df[data_cols].copy() weights_df.fillna(0, inplace=True) weights_df.to_csv('/src/outdir/CIBERSORTxGEP_NA_Weights.txt', sep='\t', index=False) # Print dimensions and verify print("=== MATRIX VALIDATION ===") print(f"Matrix dimensions:") print(f" Mixture: {mixture_filtered.shape[0]} genes x {mixture_filtered.shape[1]} samples") print(f" Signature: {signature_filtered.shape[0]} genes x {signature_filtered.shape[1]} samples") print(f" Weights: {len(data_cols)} cols") if len(data_cols) < 1: raise ValueError(f"Invalid weights columns count: {len(data_cols)}") # Additional validations if not np.isfinite(mixture_filtered.values).all(): raise ValueError("Non-finite values found in mixture matrix") if not np.isfinite(signature_filtered.values).all(): raise ValueError("Non-finite values found in signature matrix") if not np.isfinite(weights_df.values).all(): raise ValueError("Non-finite values found in weights matrix") window_size = len(data_cols) print(f"Using window size: {window_size}") # Run CIBERSORTx print("=== RUNNING CIBERSORTX HIRES ===") cmd = [ '/src/CIBERSORTxHiRes', '--mixture', '/src/data/mixture.txt', '--sigmatrix', '/src/data/signature.txt', '--cibresults', '/src/outdir/CIBERSORTxGEP_NA_Weights.txt', '--outdir', '/src/outdir', '--window', str(window_size), '--username', '${params.cibersortx_username}', '--token', '${params.cibersortx_token}', '--QN', 'true' ] result = subprocess.run(cmd, check=True) # List all files in output directory print("=== CHECKING OUTPUT FILES ===") output_dir = Path('/src/outdir') print("Files in output directory:") for f in output_dir.glob('*'): print(f" {f.name} ({f.stat().st_size} bytes)") # Try to find any file that might be our result gep_files = list(output_dir.glob('*GEP*.txt')) if not gep_files: filtered_files = list(output_dir.glob('*Filtered*.txt')) if filtered_files: result_file = filtered_files[0] else: all_txt_files = list(output_dir.glob('*.txt')) if all_txt_files: result_file = max(all_txt_files, key=lambda x: x.stat().st_size) else: raise FileNotFoundError("No suitable output files found") else: result_file = gep_files[0] print(f"Using result file: {result_file}") subprocess.run(['cp', str(result_file), 'CIBERSORTx_HiRes_Matrix.txt'], check=True) """ } process ADD_TISSUE_NAMES_TO_CIBERSORTX{ memory 1.GB container = "${params.container_ecotyper}" containerOptions = "${params.containerOptions}" publishDir "${params.ecotyper_outdir}/hires", mode: 'copy' // debug true input: path EXPRESSION_MATRIX path CIBERSORTx_HIRES output: path '*_immune_cells.csv' script: """ #!/opt/conda/envs/ecotyper/bin/python3 import pandas as pd cibersort_df = pd.read_csv('${CIBERSORTx_HIRES}', sep='\t') expression_df = pd.read_csv('${EXPRESSION_MATRIX}', sep='\t', index_col=0) pat_name = '${EXPRESSION_MATRIX}'.split('_TPM.txt')[0] tissue_name = [i.split(':')[1] for i in expression_df.columns] cibersort_df = cibersort_df.set_index(pd.Index(tissue_name, dtype='str')) cibersort_df.index.name = 'Tissues' cibersort_df.to_csv(pat_name + '_immune_cells.csv') """ }