656 lines
25 KiB
Python
656 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
CCLE Data Preparation Pipeline for Metabolomics Analysis
|
|
|
|
This script prepares metabolomics and gene expression data for analysis with the corto algorithm.
|
|
It ensures compatibility with corto's requirements while providing optional additional preprocessing steps.
|
|
|
|
Basic Usage:
|
|
python prepare_data.py --metabolomics_file data/metabolomics.csv --expression_file data/expression.txt
|
|
|
|
Advanced Usage with Additional Preprocessing:
|
|
python prepare_data.py --metabolomics_file data/metabolomics.csv \
|
|
--expression_file data/expression.txt \
|
|
--cnv_file data/cnv.csv \
|
|
--normalization standard \
|
|
--outlier_detection zscore \
|
|
--imputation knn
|
|
|
|
For detailed information about options, use the --help flag.
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from scipy import stats
|
|
from sklearn.linear_model import LinearRegression
|
|
from sklearn.preprocessing import StandardScaler, RobustScaler
|
|
from sklearn.impute import KNNImputer
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
import warnings
|
|
import argparse
|
|
|
|
@dataclass
|
|
class DataQualityMetrics:
|
|
"""Track data quality metrics through processing"""
|
|
initial_shape: Tuple[int, int]
|
|
final_shape: Tuple[int, int]
|
|
removed_features: List[str]
|
|
zero_var_features: List[str]
|
|
missing_value_counts: Dict[str, int]
|
|
extreme_value_counts: Dict[str, int]
|
|
sample_correlations: Optional[pd.Series]
|
|
processing_steps: List[str]
|
|
|
|
@dataclass
|
|
class PreprocessingConfig:
|
|
"""Configuration for preprocessing steps"""
|
|
# Corto-compatible preprocessing
|
|
remove_zero_variance: bool = True
|
|
min_variance: float = 1e-10
|
|
remove_duplicates: bool = True
|
|
cnv_correction: bool = True
|
|
|
|
# Centroid detection parameters
|
|
centroid_detection_threshold: float = 0.1 # Fraction of features to select as centroids (0.1 = top 10%)
|
|
|
|
# Additional preprocessing (disabled by default)
|
|
normalization: Optional[str] = None # ['standard', 'robust', 'log']
|
|
feature_selection: Optional[str] = None # ['variance', 'cv']
|
|
outlier_detection: Optional[str] = None # ['zscore', 'iqr']
|
|
imputation: Optional[str] = None # ['mean', 'median', 'knn']
|
|
|
|
# Processing options
|
|
save_intermediate: bool = False
|
|
dry_run: bool = False
|
|
n_jobs: int = 1
|
|
|
|
# Thresholds
|
|
min_samples_threshold: float = 0.5
|
|
outlier_threshold: float = 3.0
|
|
feature_selection_threshold: float = 0.5
|
|
|
|
class ModularDataPrep:
|
|
"""Main class for data preparation pipeline"""
|
|
|
|
def __init__(self, config: Optional[PreprocessingConfig] = None):
|
|
self.config = config or PreprocessingConfig()
|
|
self.logger = logging.getLogger(__name__)
|
|
self.metrics: Dict[str, Any] = {}
|
|
self.scalers: Dict[str, Any] = {}
|
|
self.intermediate_data: Dict[str, pd.DataFrame] = {}
|
|
|
|
def save_intermediate_step(self, df: pd.DataFrame, name: str, step: str) -> None:
|
|
"""Save intermediate data if configured"""
|
|
if self.config.save_intermediate:
|
|
output_file = f"intermediate_{name}_{step}.csv"
|
|
df.to_csv(output_file)
|
|
self.logger.info(f"Saved intermediate data to {output_file}")
|
|
self.intermediate_data[f"{name}_{step}"] = df
|
|
|
|
def validate_ccle_format(self, df: pd.DataFrame, data_type: str) -> None:
|
|
"""
|
|
Validate expected CCLE data format
|
|
|
|
Args:
|
|
df: Input dataframe
|
|
data_type: Type of data ('metabolomics', 'expression', 'cnv')
|
|
|
|
Raises:
|
|
ValueError: If data format doesn't match CCLE requirements
|
|
"""
|
|
if df.empty:
|
|
raise ValueError(f"Empty dataframe provided for {data_type}")
|
|
|
|
if df.isna().all().all():
|
|
raise ValueError(f"All values are NA in {data_type} data")
|
|
|
|
if data_type == 'metabolomics':
|
|
if 'CCLE_ID' not in df.columns:
|
|
raise ValueError("Metabolomics data must have CCLE_ID column")
|
|
|
|
elif data_type == 'expression':
|
|
if not {'gene_id', 'transcript_id'}.intersection(df.columns):
|
|
raise ValueError("Expression data must have gene_id and transcript_id columns")
|
|
|
|
# Check for numeric data after removing ID columns
|
|
id_cols = []
|
|
if data_type == 'metabolomics':
|
|
id_cols = ['CCLE_ID']
|
|
elif data_type == 'expression':
|
|
id_cols = ['gene_id', 'transcript_id']
|
|
|
|
data_cols = df.drop(columns=[col for col in id_cols if col in df.columns])
|
|
if not data_cols.select_dtypes(include=[np.number]).columns.any():
|
|
raise ValueError(f"No numeric data columns found in {data_type} data")
|
|
|
|
def preprocess_ccle_data(self, df: pd.DataFrame, data_type: str) -> pd.DataFrame:
|
|
"""
|
|
Preprocess CCLE format data to get numeric matrix
|
|
|
|
Args:
|
|
df: Input dataframe
|
|
data_type: Type of data ('metabolomics', 'expression', 'cnv')
|
|
|
|
Returns:
|
|
Preprocessed numeric dataframe
|
|
"""
|
|
self.logger.info(f"Preprocessing {data_type} data")
|
|
|
|
if data_type == 'metabolomics':
|
|
# For metabolomics, set CCLE_ID as index and drop DepMap_ID
|
|
if 'CCLE_ID' in df.columns:
|
|
# Drop DepMap_ID if it exists and get only numeric columns
|
|
columns_to_drop = ['DepMap_ID'] if 'DepMap_ID' in df.columns else []
|
|
df = df.set_index('CCLE_ID').drop(columns=columns_to_drop)
|
|
|
|
# Convert all remaining columns to numeric
|
|
numeric_df = df.apply(pd.to_numeric, errors='coerce')
|
|
self.logger.info("Processed metabolomics data to numeric format")
|
|
return numeric_df
|
|
|
|
elif data_type == 'expression':
|
|
# For expression data, set gene/transcript IDs as multi-index
|
|
if {'gene_id', 'transcript_id'}.intersection(df.columns):
|
|
df = df.set_index(['gene_id', 'transcript_id'])
|
|
# Convert all remaining columns to numeric
|
|
numeric_df = df.apply(pd.to_numeric, errors='coerce')
|
|
self.logger.info("Processed expression data to numeric format")
|
|
return numeric_df
|
|
|
|
# If we reached here without returning, something went wrong
|
|
raise ValueError(f"Could not process {data_type} data into numeric format")
|
|
|
|
def remove_zero_variance_features(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
|
|
"""Remove features with variance below threshold"""
|
|
variances = df.var()
|
|
zero_var_features = variances[variances <= self.config.min_variance].index.tolist()
|
|
if zero_var_features:
|
|
self.logger.info(f"Removing {len(zero_var_features)} zero variance features from {name}")
|
|
df = df.drop(columns=zero_var_features)
|
|
self.metrics[f"{name}_zero_var_features"] = zero_var_features
|
|
return df
|
|
|
|
def normalize_data(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
|
|
"""Apply selected normalization method"""
|
|
if self.config.normalization == 'standard':
|
|
scaler = StandardScaler()
|
|
elif self.config.normalization == 'robust':
|
|
scaler = RobustScaler()
|
|
elif self.config.normalization == 'log':
|
|
return np.log1p(df) # log1p handles zeros gracefully
|
|
else:
|
|
return df
|
|
|
|
self.scalers[name] = scaler
|
|
return pd.DataFrame(
|
|
scaler.fit_transform(df),
|
|
index=df.index,
|
|
columns=df.columns
|
|
)
|
|
|
|
def handle_outliers(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
|
|
"""Handle outliers using selected method"""
|
|
if self.config.outlier_detection == 'zscore':
|
|
z_scores = stats.zscore(df)
|
|
outlier_mask = abs(z_scores) > self.config.outlier_threshold
|
|
elif self.config.outlier_detection == 'iqr':
|
|
Q1 = df.quantile(0.25)
|
|
Q3 = df.quantile(0.75)
|
|
IQR = Q3 - Q1
|
|
outlier_mask = ((df < (Q1 - 1.5 * IQR)) | (df > (Q3 + 1.5 * IQR)))
|
|
else:
|
|
return df
|
|
|
|
# Replace outliers with NaN for later imputation
|
|
df[outlier_mask] = np.nan
|
|
return df
|
|
|
|
def impute_missing_values(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
|
|
"""Impute missing values using selected method"""
|
|
if self.config.imputation == 'mean':
|
|
return df.fillna(df.mean())
|
|
elif self.config.imputation == 'median':
|
|
return df.fillna(df.median())
|
|
elif self.config.imputation == 'knn':
|
|
imputer = KNNImputer(n_neighbors=5)
|
|
return pd.DataFrame(
|
|
imputer.fit_transform(df),
|
|
index=df.index,
|
|
columns=df.columns
|
|
)
|
|
return df
|
|
|
|
def detect_centroids(self, expression_data: pd.DataFrame) -> List[str]:
|
|
"""
|
|
Auto-detect potential centroids from expression data based on network properties.
|
|
|
|
This method identifies potential centroids by:
|
|
1. Calculating feature variance (higher variance = more informative)
|
|
2. Calculating feature connectivity (correlation with other features)
|
|
3. Scoring features based on both variance and connectivity
|
|
4. Selecting top N% as centroids, where N is defined by centroid_detection_threshold
|
|
|
|
Args:
|
|
expression_data: Expression matrix
|
|
|
|
Returns:
|
|
List of detected centroid feature names
|
|
|
|
Note:
|
|
The centroid_detection_threshold parameter (default 0.1 = 10%) determines
|
|
what fraction of features are selected as centroids. Higher values will
|
|
select more centroids but may include less informative features.
|
|
"""
|
|
# Calculate variance for each feature
|
|
variances = expression_data.var()
|
|
|
|
# Calculate connectivity (correlation with other features)
|
|
connectivity = expression_data.corr().abs().sum()
|
|
|
|
# Score features based on variance and connectivity
|
|
scores = variances * connectivity
|
|
|
|
# Select top N% as centroids
|
|
num_centroids = int(len(scores) * self.config.centroid_detection_threshold)
|
|
centroids = scores.nlargest(num_centroids).index.tolist()
|
|
|
|
self.logger.info(
|
|
f"Detected {len(centroids)} potential centroids "
|
|
f"(top {self.config.centroid_detection_threshold*100:.1f}% of features)"
|
|
)
|
|
return centroids
|
|
|
|
def select_features(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
|
|
"""Select features using specified method"""
|
|
if self.config.feature_selection == 'variance':
|
|
selector = df.var()
|
|
threshold = np.percentile(selector, self.config.feature_selection_threshold * 100)
|
|
selected = selector[selector >= threshold].index
|
|
elif self.config.feature_selection == 'cv':
|
|
cv = df.std() / df.mean()
|
|
threshold = np.percentile(cv, self.config.feature_selection_threshold * 100)
|
|
selected = cv[cv >= threshold].index
|
|
else:
|
|
return df
|
|
|
|
return df[selected]
|
|
|
|
def preprocess_matrix(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
|
|
"""Process a single matrix through all selected preprocessing steps"""
|
|
if self.config.dry_run:
|
|
self.logger.info(f"\nDry run: would preprocess {name} matrix with steps:")
|
|
steps = []
|
|
if self.config.remove_zero_variance:
|
|
steps.append("- Remove zero variance features")
|
|
if self.config.remove_duplicates:
|
|
steps.append("- Remove duplicates")
|
|
if self.config.normalization:
|
|
steps.append(f"- Apply {self.config.normalization} normalization")
|
|
if self.config.outlier_detection:
|
|
steps.append(f"- Detect outliers using {self.config.outlier_detection}")
|
|
if self.config.imputation:
|
|
steps.append(f"- Impute missing values using {self.config.imputation}")
|
|
if self.config.feature_selection:
|
|
steps.append(f"- Select features using {self.config.feature_selection}")
|
|
|
|
for step in steps:
|
|
self.logger.info(step)
|
|
return df
|
|
|
|
self.logger.info(f"\nPreprocessing {name} matrix")
|
|
processed = df.copy()
|
|
steps = []
|
|
|
|
# Corto-compatible preprocessing
|
|
if self.config.remove_zero_variance:
|
|
processed = self.remove_zero_variance_features(processed, name)
|
|
steps.append('zero_variance_removal')
|
|
self.save_intermediate_step(processed, name, 'zero_var_removed')
|
|
|
|
if self.config.remove_duplicates:
|
|
processed = processed[~processed.index.duplicated(keep='first')]
|
|
steps.append('duplicate_removal')
|
|
self.save_intermediate_step(processed, name, 'duplicates_removed')
|
|
|
|
# Additional preprocessing steps
|
|
if self.config.normalization:
|
|
processed = self.normalize_data(processed, name)
|
|
steps.append(f'normalization_{self.config.normalization}')
|
|
self.save_intermediate_step(processed, name, 'normalized')
|
|
|
|
if self.config.outlier_detection:
|
|
processed = self.handle_outliers(processed, name)
|
|
steps.append(f'outlier_detection_{self.config.outlier_detection}')
|
|
self.save_intermediate_step(processed, name, 'outliers_handled')
|
|
|
|
if self.config.imputation:
|
|
processed = self.impute_missing_values(processed, name)
|
|
steps.append(f'imputation_{self.config.imputation}')
|
|
self.save_intermediate_step(processed, name, 'imputed')
|
|
|
|
if self.config.feature_selection:
|
|
processed = self.select_features(processed, name)
|
|
steps.append(f'feature_selection_{self.config.feature_selection}')
|
|
self.save_intermediate_step(processed, name, 'features_selected')
|
|
|
|
self.metrics[f"{name}_processing_steps"] = steps
|
|
return processed
|
|
|
|
def apply_cnv_correction(
|
|
self,
|
|
expression_data: pd.DataFrame,
|
|
cnv_data: pd.DataFrame,
|
|
centroids: List[str]
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Correct expression data based on CNV data, following corto's approach
|
|
|
|
Args:
|
|
expression_data: Expression matrix
|
|
cnv_data: Copy number variation matrix
|
|
centroids: List of centroid feature names
|
|
|
|
Returns:
|
|
Corrected expression matrix
|
|
"""
|
|
self.logger.info("Applying CNV correction")
|
|
|
|
# Get common features and samples
|
|
common_features = list(set(expression_data.index) & set(cnv_data.index))
|
|
common_samples = list(set(expression_data.columns) & set(cnv_data.columns))
|
|
|
|
if len(common_features) <= 1:
|
|
raise ValueError("One or fewer features in common between CNV and expression data")
|
|
if len(common_samples) <= 1:
|
|
raise ValueError("One or fewer samples in common between CNV and expression data")
|
|
|
|
# Subset data to common elements
|
|
expr = expression_data.loc[common_features, common_samples]
|
|
cnv = cnv_data.loc[common_features, common_samples]
|
|
|
|
# Get targets (non-centroids)
|
|
targets = list(set(common_features) - set(centroids))
|
|
|
|
# Correct expression based on CNV for targets only
|
|
target_expr = expr.loc[targets]
|
|
target_cnv = cnv.loc[targets]
|
|
|
|
self.logger.info(f"Calculating residuals for {len(targets)} target features")
|
|
|
|
# Calculate residuals for each target
|
|
corrected_targets = pd.DataFrame(index=target_expr.index, columns=target_expr.columns)
|
|
for feature in targets:
|
|
# Fit linear model: expression ~ cnv
|
|
X = target_cnv.loc[feature].values.reshape(-1, 1)
|
|
y = target_expr.loc[feature].values
|
|
model = LinearRegression()
|
|
model.fit(X, y)
|
|
|
|
# Calculate residuals
|
|
residuals = y - model.predict(X)
|
|
corrected_targets.loc[feature] = residuals
|
|
|
|
# Replace target values with residuals
|
|
corrected_expr = expr.copy()
|
|
corrected_expr.loc[targets] = corrected_targets
|
|
|
|
self.logger.info("CNV correction complete")
|
|
return corrected_expr
|
|
|
|
def prepare_matrices(
|
|
self,
|
|
metabolomics_data: pd.DataFrame,
|
|
expression_data: pd.DataFrame,
|
|
centroids: Optional[List[str]] = None,
|
|
cnv_data: Optional[pd.DataFrame] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Prepare metabolomics and expression matrices for corto analysis
|
|
|
|
Args:
|
|
metabolomics_data: Raw metabolomics data
|
|
expression_data: Raw expression data
|
|
centroids: Optional list of centroid features
|
|
cnv_data: Optional CNV data for correction
|
|
|
|
Returns:
|
|
Dictionary containing processed matrices and quality metrics
|
|
"""
|
|
# Validate input formats
|
|
self.validate_ccle_format(metabolomics_data, 'metabolomics')
|
|
self.validate_ccle_format(expression_data, 'expression')
|
|
if cnv_data is not None:
|
|
self.validate_ccle_format(cnv_data, 'cnv')
|
|
|
|
# Preprocess data into correct format
|
|
metabolomics_data = self.preprocess_ccle_data(metabolomics_data, 'metabolomics')
|
|
expression_data = self.preprocess_ccle_data(expression_data, 'expression')
|
|
if cnv_data is not None:
|
|
cnv_data = self.preprocess_ccle_data(cnv_data, 'cnv')
|
|
|
|
# Process metabolomics data
|
|
processed_met = self.preprocess_matrix(metabolomics_data, 'metabolomics')
|
|
|
|
# Process expression data
|
|
processed_exp = self.preprocess_matrix(expression_data, 'expression')
|
|
|
|
# Apply CNV correction if data provided
|
|
if cnv_data is not None and self.config.cnv_correction:
|
|
self.logger.info("Applying CNV correction")
|
|
|
|
# Use provided centroids or detect them
|
|
if centroids is None:
|
|
centroids = self.detect_centroids(expression_data)
|
|
self.logger.info("Using auto-detected centroids")
|
|
else:
|
|
self.logger.info(f"Using {len(centroids)} provided centroids")
|
|
|
|
# Apply CNV correction
|
|
processed_exp = self.apply_cnv_correction(
|
|
processed_exp,
|
|
cnv_data,
|
|
centroids
|
|
)
|
|
|
|
return {
|
|
'metabolomics': processed_met,
|
|
'expression': processed_exp,
|
|
'quality_metrics': self.metrics
|
|
}
|
|
|
|
def parse_arguments() -> argparse.Namespace:
|
|
"""Parse command line arguments"""
|
|
parser = argparse.ArgumentParser(
|
|
description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter
|
|
)
|
|
|
|
# Required with defaults
|
|
parser.add_argument(
|
|
'--metabolomics_file',
|
|
default='CCLE_metabolomics_20190502.csv',
|
|
help='Path to metabolomics data CSV file'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--expression_file',
|
|
default='CCLE_RNAseq_rsem_transcripts_tpm_20180929.txt',
|
|
help='Path to gene expression data file'
|
|
)
|
|
|
|
# Optional input/output
|
|
parser.add_argument(
|
|
'--cnv_file',
|
|
help='Path to copy number variation data file (optional)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--output_prefix',
|
|
default='prepared',
|
|
help='Prefix for output files (default: prepared)'
|
|
)
|
|
|
|
# Additional preprocessing options
|
|
parser.add_argument(
|
|
'--normalization',
|
|
choices=['standard', 'robust', 'log'],
|
|
help='Normalization method (optional)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--outlier_detection',
|
|
choices=['zscore', 'iqr'],
|
|
help='Outlier detection method (optional)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--centroids',
|
|
required=False,
|
|
help='Optional: Comma-separated list of centroid feature names. If not provided, centroids will be auto-detected.'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--centroid_threshold',
|
|
type=float,
|
|
default=0.1,
|
|
help='Fraction of features to select as centroids when auto-detecting (default: 0.1 = top 10%)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--imputation',
|
|
choices=['mean', 'median', 'knn'],
|
|
help='Missing value imputation method (optional)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--feature_selection',
|
|
choices=['variance', 'cv'],
|
|
help='Feature selection method (optional)'
|
|
)
|
|
|
|
# Processing options
|
|
parser.add_argument(
|
|
'--save_intermediate',
|
|
action='store_true',
|
|
help='Save intermediate data after each processing step'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--dry_run',
|
|
action='store_true',
|
|
help='Preview preprocessing steps without executing'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--n_jobs',
|
|
type=int,
|
|
default=1,
|
|
help='Number of parallel jobs for applicable operations (default: 1)'
|
|
)
|
|
|
|
# Logging options
|
|
parser.add_argument(
|
|
'--verbose',
|
|
action='store_true',
|
|
help='Enable verbose logging'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--log_file',
|
|
help='Path to log file (optional, default: console output)'
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
def main() -> Dict[str, Any]:
|
|
"""Main function to run the preprocessing pipeline"""
|
|
# Parse arguments
|
|
args = parse_arguments()
|
|
|
|
# Set up logging
|
|
log_level = logging.INFO if args.verbose else logging.WARNING
|
|
log_config = {
|
|
'level': log_level,
|
|
'format': '%(asctime)s - %(levelname)s - %(message)s'
|
|
}
|
|
|
|
if args.log_file:
|
|
log_config['filename'] = args.log_file
|
|
|
|
logging.basicConfig(**log_config)
|
|
|
|
# Create preprocessing configuration from arguments
|
|
config = PreprocessingConfig(
|
|
normalization=args.normalization,
|
|
outlier_detection=args.outlier_detection,
|
|
imputation=args.imputation,
|
|
feature_selection=args.feature_selection,
|
|
save_intermediate=args.save_intermediate,
|
|
dry_run=args.dry_run,
|
|
n_jobs=args.n_jobs,
|
|
centroid_detection_threshold=args.centroid_threshold
|
|
)
|
|
|
|
try:
|
|
# Initialize preprocessor
|
|
prep = ModularDataPrep(config)
|
|
|
|
# Read input data
|
|
logging.info(f"Reading metabolomics data from {args.metabolomics_file}")
|
|
met_df = pd.read_csv(args.metabolomics_file)
|
|
|
|
logging.info(f"Reading expression data from {args.expression_file}")
|
|
exp_df = pd.read_csv(args.expression_file, sep='\t')
|
|
|
|
cnv_df = None
|
|
if args.cnv_file:
|
|
logging.info(f"Reading CNV data from {args.cnv_file}")
|
|
cnv_df = pd.read_csv(args.cnv_file)
|
|
|
|
# Prepare matrices
|
|
centroids = args.centroids.split(',') if args.centroids else None
|
|
prepared_data = prep.prepare_matrices(
|
|
met_df,
|
|
exp_df,
|
|
centroids=centroids, # Now optional
|
|
cnv_data=cnv_df
|
|
)
|
|
|
|
# Save processed data
|
|
metabolomics_out = f"{args.output_prefix}_metabolomics.csv"
|
|
expression_out = f"{args.output_prefix}_expression.csv"
|
|
metrics_out = f"{args.output_prefix}_metrics.txt"
|
|
|
|
prepared_data['metabolomics'].to_csv(metabolomics_out)
|
|
prepared_data['expression'].to_csv(expression_out)
|
|
|
|
# Save quality metrics
|
|
with open(metrics_out, 'w') as f:
|
|
f.write("Data Preparation Metrics\n")
|
|
f.write("=======================\n")
|
|
metrics = prepared_data['quality_metrics']
|
|
for metric_name, metric_value in metrics.items():
|
|
if isinstance(metric_value, (list, dict)):
|
|
f.write(f"\n{metric_name}:\n")
|
|
if isinstance(metric_value, list):
|
|
for item in metric_value:
|
|
f.write(f" - {item}\n")
|
|
else:
|
|
for k, v in metric_value.items():
|
|
f.write(f" {k}: {v}\n")
|
|
else:
|
|
f.write(f"{metric_name}: {metric_value}\n")
|
|
|
|
logging.info(f"Processed data saved to {metabolomics_out} and {expression_out}")
|
|
logging.info(f"Quality metrics saved to {metrics_out}")
|
|
|
|
return prepared_data
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in preprocessing pipeline: {str(e)}")
|
|
raise
|
|
|
|
if __name__ == "__main__":
|
|
main() |