Added custom data prep and matrix combination steps meant to perform similar but improved functions to the R code. Added readme detailing the code.

This commit is contained in:
2024-12-16 15:03:40 +00:00
parent bf380f2768
commit 21d77e3faa
3 changed files with 1150 additions and 70 deletions

656
corto-data-prep.py Normal file
View File

@@ -0,0 +1,656 @@
#!/usr/bin/env python3
"""
CCLE Data Preparation Pipeline for Metabolomics Analysis
This script prepares metabolomics and gene expression data for analysis with the corto algorithm.
It ensures compatibility with corto's requirements while providing optional additional preprocessing steps.
Basic Usage:
python prepare_data.py --metabolomics_file data/metabolomics.csv --expression_file data/expression.txt
Advanced Usage with Additional Preprocessing:
python prepare_data.py --metabolomics_file data/metabolomics.csv \
--expression_file data/expression.txt \
--cnv_file data/cnv.csv \
--normalization standard \
--outlier_detection zscore \
--imputation knn
For detailed information about options, use the --help flag.
"""
import pandas as pd
import numpy as np
from scipy import stats
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.impute import KNNImputer
import logging
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Any
import warnings
import argparse
@dataclass
class DataQualityMetrics:
"""Track data quality metrics through processing"""
initial_shape: Tuple[int, int]
final_shape: Tuple[int, int]
removed_features: List[str]
zero_var_features: List[str]
missing_value_counts: Dict[str, int]
extreme_value_counts: Dict[str, int]
sample_correlations: Optional[pd.Series]
processing_steps: List[str]
@dataclass
class PreprocessingConfig:
"""Configuration for preprocessing steps"""
# Corto-compatible preprocessing
remove_zero_variance: bool = True
min_variance: float = 1e-10
remove_duplicates: bool = True
cnv_correction: bool = True
# Centroid detection parameters
centroid_detection_threshold: float = 0.1 # Fraction of features to select as centroids (0.1 = top 10%)
# Additional preprocessing (disabled by default)
normalization: Optional[str] = None # ['standard', 'robust', 'log']
feature_selection: Optional[str] = None # ['variance', 'cv']
outlier_detection: Optional[str] = None # ['zscore', 'iqr']
imputation: Optional[str] = None # ['mean', 'median', 'knn']
# Processing options
save_intermediate: bool = False
dry_run: bool = False
n_jobs: int = 1
# Thresholds
min_samples_threshold: float = 0.5
outlier_threshold: float = 3.0
feature_selection_threshold: float = 0.5
class ModularDataPrep:
"""Main class for data preparation pipeline"""
def __init__(self, config: Optional[PreprocessingConfig] = None):
self.config = config or PreprocessingConfig()
self.logger = logging.getLogger(__name__)
self.metrics: Dict[str, Any] = {}
self.scalers: Dict[str, Any] = {}
self.intermediate_data: Dict[str, pd.DataFrame] = {}
def save_intermediate_step(self, df: pd.DataFrame, name: str, step: str) -> None:
"""Save intermediate data if configured"""
if self.config.save_intermediate:
output_file = f"intermediate_{name}_{step}.csv"
df.to_csv(output_file)
self.logger.info(f"Saved intermediate data to {output_file}")
self.intermediate_data[f"{name}_{step}"] = df
def validate_ccle_format(self, df: pd.DataFrame, data_type: str) -> None:
"""
Validate expected CCLE data format
Args:
df: Input dataframe
data_type: Type of data ('metabolomics', 'expression', 'cnv')
Raises:
ValueError: If data format doesn't match CCLE requirements
"""
if df.empty:
raise ValueError(f"Empty dataframe provided for {data_type}")
if df.isna().all().all():
raise ValueError(f"All values are NA in {data_type} data")
if data_type == 'metabolomics':
if 'CCLE_ID' not in df.columns:
raise ValueError("Metabolomics data must have CCLE_ID column")
elif data_type == 'expression':
if not {'gene_id', 'transcript_id'}.intersection(df.columns):
raise ValueError("Expression data must have gene_id and transcript_id columns")
# Check for numeric data after removing ID columns
id_cols = []
if data_type == 'metabolomics':
id_cols = ['CCLE_ID']
elif data_type == 'expression':
id_cols = ['gene_id', 'transcript_id']
data_cols = df.drop(columns=[col for col in id_cols if col in df.columns])
if not data_cols.select_dtypes(include=[np.number]).columns.any():
raise ValueError(f"No numeric data columns found in {data_type} data")
def preprocess_ccle_data(self, df: pd.DataFrame, data_type: str) -> pd.DataFrame:
"""
Preprocess CCLE format data to get numeric matrix
Args:
df: Input dataframe
data_type: Type of data ('metabolomics', 'expression', 'cnv')
Returns:
Preprocessed numeric dataframe
"""
self.logger.info(f"Preprocessing {data_type} data")
if data_type == 'metabolomics':
# For metabolomics, set CCLE_ID as index and drop DepMap_ID
if 'CCLE_ID' in df.columns:
# Drop DepMap_ID if it exists and get only numeric columns
columns_to_drop = ['DepMap_ID'] if 'DepMap_ID' in df.columns else []
df = df.set_index('CCLE_ID').drop(columns=columns_to_drop)
# Convert all remaining columns to numeric
numeric_df = df.apply(pd.to_numeric, errors='coerce')
self.logger.info("Processed metabolomics data to numeric format")
return numeric_df
elif data_type == 'expression':
# For expression data, set gene/transcript IDs as multi-index
if {'gene_id', 'transcript_id'}.intersection(df.columns):
df = df.set_index(['gene_id', 'transcript_id'])
# Convert all remaining columns to numeric
numeric_df = df.apply(pd.to_numeric, errors='coerce')
self.logger.info("Processed expression data to numeric format")
return numeric_df
# If we reached here without returning, something went wrong
raise ValueError(f"Could not process {data_type} data into numeric format")
def remove_zero_variance_features(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
"""Remove features with variance below threshold"""
variances = df.var()
zero_var_features = variances[variances <= self.config.min_variance].index.tolist()
if zero_var_features:
self.logger.info(f"Removing {len(zero_var_features)} zero variance features from {name}")
df = df.drop(columns=zero_var_features)
self.metrics[f"{name}_zero_var_features"] = zero_var_features
return df
def normalize_data(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
"""Apply selected normalization method"""
if self.config.normalization == 'standard':
scaler = StandardScaler()
elif self.config.normalization == 'robust':
scaler = RobustScaler()
elif self.config.normalization == 'log':
return np.log1p(df) # log1p handles zeros gracefully
else:
return df
self.scalers[name] = scaler
return pd.DataFrame(
scaler.fit_transform(df),
index=df.index,
columns=df.columns
)
def handle_outliers(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
"""Handle outliers using selected method"""
if self.config.outlier_detection == 'zscore':
z_scores = stats.zscore(df)
outlier_mask = abs(z_scores) > self.config.outlier_threshold
elif self.config.outlier_detection == 'iqr':
Q1 = df.quantile(0.25)
Q3 = df.quantile(0.75)
IQR = Q3 - Q1
outlier_mask = ((df < (Q1 - 1.5 * IQR)) | (df > (Q3 + 1.5 * IQR)))
else:
return df
# Replace outliers with NaN for later imputation
df[outlier_mask] = np.nan
return df
def impute_missing_values(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
"""Impute missing values using selected method"""
if self.config.imputation == 'mean':
return df.fillna(df.mean())
elif self.config.imputation == 'median':
return df.fillna(df.median())
elif self.config.imputation == 'knn':
imputer = KNNImputer(n_neighbors=5)
return pd.DataFrame(
imputer.fit_transform(df),
index=df.index,
columns=df.columns
)
return df
def detect_centroids(self, expression_data: pd.DataFrame) -> List[str]:
"""
Auto-detect potential centroids from expression data based on network properties.
This method identifies potential centroids by:
1. Calculating feature variance (higher variance = more informative)
2. Calculating feature connectivity (correlation with other features)
3. Scoring features based on both variance and connectivity
4. Selecting top N% as centroids, where N is defined by centroid_detection_threshold
Args:
expression_data: Expression matrix
Returns:
List of detected centroid feature names
Note:
The centroid_detection_threshold parameter (default 0.1 = 10%) determines
what fraction of features are selected as centroids. Higher values will
select more centroids but may include less informative features.
"""
# Calculate variance for each feature
variances = expression_data.var()
# Calculate connectivity (correlation with other features)
connectivity = expression_data.corr().abs().sum()
# Score features based on variance and connectivity
scores = variances * connectivity
# Select top N% as centroids
num_centroids = int(len(scores) * self.config.centroid_detection_threshold)
centroids = scores.nlargest(num_centroids).index.tolist()
self.logger.info(
f"Detected {len(centroids)} potential centroids "
f"(top {self.config.centroid_detection_threshold*100:.1f}% of features)"
)
return centroids
def select_features(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
"""Select features using specified method"""
if self.config.feature_selection == 'variance':
selector = df.var()
threshold = np.percentile(selector, self.config.feature_selection_threshold * 100)
selected = selector[selector >= threshold].index
elif self.config.feature_selection == 'cv':
cv = df.std() / df.mean()
threshold = np.percentile(cv, self.config.feature_selection_threshold * 100)
selected = cv[cv >= threshold].index
else:
return df
return df[selected]
def preprocess_matrix(self, df: pd.DataFrame, name: str) -> pd.DataFrame:
"""Process a single matrix through all selected preprocessing steps"""
if self.config.dry_run:
self.logger.info(f"\nDry run: would preprocess {name} matrix with steps:")
steps = []
if self.config.remove_zero_variance:
steps.append("- Remove zero variance features")
if self.config.remove_duplicates:
steps.append("- Remove duplicates")
if self.config.normalization:
steps.append(f"- Apply {self.config.normalization} normalization")
if self.config.outlier_detection:
steps.append(f"- Detect outliers using {self.config.outlier_detection}")
if self.config.imputation:
steps.append(f"- Impute missing values using {self.config.imputation}")
if self.config.feature_selection:
steps.append(f"- Select features using {self.config.feature_selection}")
for step in steps:
self.logger.info(step)
return df
self.logger.info(f"\nPreprocessing {name} matrix")
processed = df.copy()
steps = []
# Corto-compatible preprocessing
if self.config.remove_zero_variance:
processed = self.remove_zero_variance_features(processed, name)
steps.append('zero_variance_removal')
self.save_intermediate_step(processed, name, 'zero_var_removed')
if self.config.remove_duplicates:
processed = processed[~processed.index.duplicated(keep='first')]
steps.append('duplicate_removal')
self.save_intermediate_step(processed, name, 'duplicates_removed')
# Additional preprocessing steps
if self.config.normalization:
processed = self.normalize_data(processed, name)
steps.append(f'normalization_{self.config.normalization}')
self.save_intermediate_step(processed, name, 'normalized')
if self.config.outlier_detection:
processed = self.handle_outliers(processed, name)
steps.append(f'outlier_detection_{self.config.outlier_detection}')
self.save_intermediate_step(processed, name, 'outliers_handled')
if self.config.imputation:
processed = self.impute_missing_values(processed, name)
steps.append(f'imputation_{self.config.imputation}')
self.save_intermediate_step(processed, name, 'imputed')
if self.config.feature_selection:
processed = self.select_features(processed, name)
steps.append(f'feature_selection_{self.config.feature_selection}')
self.save_intermediate_step(processed, name, 'features_selected')
self.metrics[f"{name}_processing_steps"] = steps
return processed
def apply_cnv_correction(
self,
expression_data: pd.DataFrame,
cnv_data: pd.DataFrame,
centroids: List[str]
) -> pd.DataFrame:
"""
Correct expression data based on CNV data, following corto's approach
Args:
expression_data: Expression matrix
cnv_data: Copy number variation matrix
centroids: List of centroid feature names
Returns:
Corrected expression matrix
"""
self.logger.info("Applying CNV correction")
# Get common features and samples
common_features = list(set(expression_data.index) & set(cnv_data.index))
common_samples = list(set(expression_data.columns) & set(cnv_data.columns))
if len(common_features) <= 1:
raise ValueError("One or fewer features in common between CNV and expression data")
if len(common_samples) <= 1:
raise ValueError("One or fewer samples in common between CNV and expression data")
# Subset data to common elements
expr = expression_data.loc[common_features, common_samples]
cnv = cnv_data.loc[common_features, common_samples]
# Get targets (non-centroids)
targets = list(set(common_features) - set(centroids))
# Correct expression based on CNV for targets only
target_expr = expr.loc[targets]
target_cnv = cnv.loc[targets]
self.logger.info(f"Calculating residuals for {len(targets)} target features")
# Calculate residuals for each target
corrected_targets = pd.DataFrame(index=target_expr.index, columns=target_expr.columns)
for feature in targets:
# Fit linear model: expression ~ cnv
X = target_cnv.loc[feature].values.reshape(-1, 1)
y = target_expr.loc[feature].values
model = LinearRegression()
model.fit(X, y)
# Calculate residuals
residuals = y - model.predict(X)
corrected_targets.loc[feature] = residuals
# Replace target values with residuals
corrected_expr = expr.copy()
corrected_expr.loc[targets] = corrected_targets
self.logger.info("CNV correction complete")
return corrected_expr
def prepare_matrices(
self,
metabolomics_data: pd.DataFrame,
expression_data: pd.DataFrame,
centroids: Optional[List[str]] = None,
cnv_data: Optional[pd.DataFrame] = None
) -> Dict[str, Any]:
"""
Prepare metabolomics and expression matrices for corto analysis
Args:
metabolomics_data: Raw metabolomics data
expression_data: Raw expression data
centroids: Optional list of centroid features
cnv_data: Optional CNV data for correction
Returns:
Dictionary containing processed matrices and quality metrics
"""
# Validate input formats
self.validate_ccle_format(metabolomics_data, 'metabolomics')
self.validate_ccle_format(expression_data, 'expression')
if cnv_data is not None:
self.validate_ccle_format(cnv_data, 'cnv')
# Preprocess data into correct format
metabolomics_data = self.preprocess_ccle_data(metabolomics_data, 'metabolomics')
expression_data = self.preprocess_ccle_data(expression_data, 'expression')
if cnv_data is not None:
cnv_data = self.preprocess_ccle_data(cnv_data, 'cnv')
# Process metabolomics data
processed_met = self.preprocess_matrix(metabolomics_data, 'metabolomics')
# Process expression data
processed_exp = self.preprocess_matrix(expression_data, 'expression')
# Apply CNV correction if data provided
if cnv_data is not None and self.config.cnv_correction:
self.logger.info("Applying CNV correction")
# Use provided centroids or detect them
if centroids is None:
centroids = self.detect_centroids(expression_data)
self.logger.info("Using auto-detected centroids")
else:
self.logger.info(f"Using {len(centroids)} provided centroids")
# Apply CNV correction
processed_exp = self.apply_cnv_correction(
processed_exp,
cnv_data,
centroids
)
return {
'metabolomics': processed_met,
'expression': processed_exp,
'quality_metrics': self.metrics
}
def parse_arguments() -> argparse.Namespace:
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
# Required with defaults
parser.add_argument(
'--metabolomics_file',
default='CCLE_metabolomics_20190502.csv',
help='Path to metabolomics data CSV file'
)
parser.add_argument(
'--expression_file',
default='CCLE_RNAseq_rsem_transcripts_tpm_20180929.txt',
help='Path to gene expression data file'
)
# Optional input/output
parser.add_argument(
'--cnv_file',
help='Path to copy number variation data file (optional)'
)
parser.add_argument(
'--output_prefix',
default='prepared',
help='Prefix for output files (default: prepared)'
)
# Additional preprocessing options
parser.add_argument(
'--normalization',
choices=['standard', 'robust', 'log'],
help='Normalization method (optional)'
)
parser.add_argument(
'--outlier_detection',
choices=['zscore', 'iqr'],
help='Outlier detection method (optional)'
)
parser.add_argument(
'--centroids',
required=False,
help='Optional: Comma-separated list of centroid feature names. If not provided, centroids will be auto-detected.'
)
parser.add_argument(
'--centroid_threshold',
type=float,
default=0.1,
help='Fraction of features to select as centroids when auto-detecting (default: 0.1 = top 10%)'
)
parser.add_argument(
'--imputation',
choices=['mean', 'median', 'knn'],
help='Missing value imputation method (optional)'
)
parser.add_argument(
'--feature_selection',
choices=['variance', 'cv'],
help='Feature selection method (optional)'
)
# Processing options
parser.add_argument(
'--save_intermediate',
action='store_true',
help='Save intermediate data after each processing step'
)
parser.add_argument(
'--dry_run',
action='store_true',
help='Preview preprocessing steps without executing'
)
parser.add_argument(
'--n_jobs',
type=int,
default=1,
help='Number of parallel jobs for applicable operations (default: 1)'
)
# Logging options
parser.add_argument(
'--verbose',
action='store_true',
help='Enable verbose logging'
)
parser.add_argument(
'--log_file',
help='Path to log file (optional, default: console output)'
)
return parser.parse_args()
def main() -> Dict[str, Any]:
"""Main function to run the preprocessing pipeline"""
# Parse arguments
args = parse_arguments()
# Set up logging
log_level = logging.INFO if args.verbose else logging.WARNING
log_config = {
'level': log_level,
'format': '%(asctime)s - %(levelname)s - %(message)s'
}
if args.log_file:
log_config['filename'] = args.log_file
logging.basicConfig(**log_config)
# Create preprocessing configuration from arguments
config = PreprocessingConfig(
normalization=args.normalization,
outlier_detection=args.outlier_detection,
imputation=args.imputation,
feature_selection=args.feature_selection,
save_intermediate=args.save_intermediate,
dry_run=args.dry_run,
n_jobs=args.n_jobs,
centroid_detection_threshold=args.centroid_threshold
)
try:
# Initialize preprocessor
prep = ModularDataPrep(config)
# Read input data
logging.info(f"Reading metabolomics data from {args.metabolomics_file}")
met_df = pd.read_csv(args.metabolomics_file)
logging.info(f"Reading expression data from {args.expression_file}")
exp_df = pd.read_csv(args.expression_file, sep='\t')
cnv_df = None
if args.cnv_file:
logging.info(f"Reading CNV data from {args.cnv_file}")
cnv_df = pd.read_csv(args.cnv_file)
# Prepare matrices
centroids = args.centroids.split(',') if args.centroids else None
prepared_data = prep.prepare_matrices(
met_df,
exp_df,
centroids=centroids, # Now optional
cnv_data=cnv_df
)
# Save processed data
metabolomics_out = f"{args.output_prefix}_metabolomics.csv"
expression_out = f"{args.output_prefix}_expression.csv"
metrics_out = f"{args.output_prefix}_metrics.txt"
prepared_data['metabolomics'].to_csv(metabolomics_out)
prepared_data['expression'].to_csv(expression_out)
# Save quality metrics
with open(metrics_out, 'w') as f:
f.write("Data Preparation Metrics\n")
f.write("=======================\n")
metrics = prepared_data['quality_metrics']
for metric_name, metric_value in metrics.items():
if isinstance(metric_value, (list, dict)):
f.write(f"\n{metric_name}:\n")
if isinstance(metric_value, list):
for item in metric_value:
f.write(f" - {item}\n")
else:
for k, v in metric_value.items():
f.write(f" {k}: {v}\n")
else:
f.write(f"{metric_name}: {metric_value}\n")
logging.info(f"Processed data saved to {metabolomics_out} and {expression_out}")
logging.info(f"Quality metrics saved to {metrics_out}")
return prepared_data
except Exception as e:
logging.error(f"Error in preprocessing pipeline: {str(e)}")
raise
if __name__ == "__main__":
main()