Source code for satay.process_satay_bams

import os
import subprocess
from pathlib import Path
import argparse
import logging
import pandas as pd
from datetime import datetime
# import pyranges as pr
from typing import Tuple, Union, List
import shutil


[docs] def setup_logging(output_dir): os.makedirs(output_dir, exist_ok=True) logging.basicConfig( filename=Path(output_dir)/"process_samples.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" )
[docs] def validate_environment(): # Validate bedtools installation if not shutil.which("bedtools"): raise RuntimeError("bedtools is not installed or not in PATH") if not shutil.which("samtools"): raise RuntimeError("samtools is not installed or not in PATH")
# def get_samples(directory): # """Get unique sample identifiers from the directory.""" # try: # sample_dirs = [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))] # samples = sorted(set("_".join(d.split("_")[:2]) for d in sample_dirs if d.startswith("20"))) # logging.info(f"Identified samples: {samples}") # return samples # except Exception as e: # logging.error(f"Error getting samples: {e}") # raise
[docs] def run_command(cmd): """Run a subprocess command with error handling.""" try: subprocess.run(cmd, check=True) logging.info(f"Command succeeded: {' '.join(cmd)}") except subprocess.CalledProcessError as e: logging.error(f"Command failed: {' '.join(cmd)}\nError: {e}") raise
[docs] def merge_bams(sample, output_dir, bam_dir, bam_suffix="Aligned.sortedByCoord.out.bam"): """Merge BAM files for each sample.""" bam_files = [str(bf) for bf in Path(bam_dir).rglob( f"*{sample}*/*{bam_suffix}")] if bam_files: logging.info(f"Merging {', '.join(bam_files)} with samtools.") output_bam = Path(output_dir)/f"{sample}.bam" cmd = ["samtools", "merge", "-f", "-o", str(output_bam)] + \ [str(bam) for bam in bam_files] run_command(cmd) return output_bam else: msg = (f"No bam files found for sample '{sample}' in {bam_dir} " f"matching '*{sample}*/*{bam_suffix}'") logging.error(msg) raise FileNotFoundError(msg)
[docs] def filter_bam(output_bam): """Filter BAM files and convert to BED format. Filter out q < 10 """ output_bam = output_bam bed_file = output_bam.with_suffix(".bed") cmd = ["samtools", "view", "-h", "-F", "2304", "-q", "10", str(output_bam)] try: with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc: with open(bed_file, "w") as bed_out: subprocess.run(["bedtools", "bamtobed"], stdin=proc.stdout, stdout=bed_out, check=True) logging.info(f"Filtered BAM to BED: {output_bam} -> {bed_file}") if bed_file.stat().st_size == 0: raise ValueError( f"No reads passed filtering (primary alignments, MAPQ >= 10) for " f"{output_bam}; the resulting BED is empty. Verify the BAM contains " f"mapped reads, e.g. 'samtools view -c {output_bam}'. A failed or empty " f"alignment step is the usual cause." ) return bed_file except Exception as e: logging.error(f"Error filtering BAM {output_bam}: {e}") raise
[docs] def process_orientation(filtered_bam_bed): """Process BED files based on orientation.""" insertions_file = Path(filtered_bam_bed).with_suffix(".bed.insertions") # cmd = [script_path, str(filtered_bam_bed)] try: # Read the file assuming whitespace-delimited columns without a header. df = pd.read_table(filtered_bam_bed, header=None) except Exception as e: logging.error(f"Error reading file: {e}") raise # Ensure there are at least 6 columns in the input. if df.shape[1] < 6: logging.error("Error: The input file must contain at least 6 columns.") raise ValueError( "DataFrame must have at least 6 columns, but got {} columns.".format(df.shape[1])) # Use only the first six columns and assign column names. df = df.iloc[:, :6] df.columns = ["col1", "col2", "col3", "col4", "col5", "col6"] # Convert columns used for arithmetic to numeric (col2 and col3) df["col2"] = pd.to_numeric(df["col2"], errors="coerce") df["col3"] = pd.to_numeric(df["col3"], errors="coerce") # Compute the insertion site: if strand is "+" use col2, else use col3. df["insertion_site"] = df.apply( lambda row: row["col2"] if row["col6"] == "+" else row["col3"], axis=1 ) # Compute insertion_site + 1. df["insertion_site_plus_1"] = df["insertion_site"] + 1 # Select the desired columns: # col1, insertion_site, insertion_site+1, col4, col5, col6 result = df[["col1", "insertion_site", "insertion_site_plus_1", "col4", "col5", "col6"]] # Output the result as tab-separated values to stdout. result.to_csv(insertions_file, sep="\t", index=False, header=False) return insertions_file
[docs] def sort_files(insertions_file): """Sort insertion files.""" sorted_file = Path(insertions_file).with_suffix(".insertions.sorted") cmd = ["sort", "-k1,1", "-k2n,2", str(insertions_file)] try: with open(sorted_file, "w") as out_file: subprocess.run(cmd, stdout=out_file, check=True) logging.info(f"Sorted file: {insertions_file} -> {sorted_file}") return sorted_file except Exception as e: logging.error(f"Error sorting file {insertions_file}: {e}") raise
[docs] def merge_files(sorted_insertions_file): """Merge intervals mapping to same location using sorted files.""" merged_file = sorted_insertions_file.with_suffix(".sorted.merged") cmd = ["bedtools", "merge", "-i", str(sorted_insertions_file), "-s", "-c", "1,6", "-o", "count,distinct"] try: with open(merged_file, "w") as out_file: subprocess.run(cmd, stdout=out_file, check=True) logging.info(f"Merged file: {sorted_insertions_file} -> {merged_file}") return merged_file except Exception as e: logging.error(f"Error merging file {sorted_insertions_file}: {e}") raise
[docs] def filter_insertions(merged_file): """Remove insertions supported by only 1 read.""" filtered_file = merged_file.with_suffix(".merged.filtered") try: df = pd.read_table(merged_file, header=None) df[df[3] > 1].to_csv(filtered_file, index=False, header=False, sep='\t') logging.info( f"Filtered insertions: {merged_file} -> {filtered_file}") return filtered_file except Exception as e: logging.error(f"Error filtering insertions for {merged_file}: {e}") raise
[docs] def count_insertions_over_intervals( interval_files: List[Union[str, Path]], filtered_insertions_file: Union[str, Path] ) -> None: """ Count insertions over specified genomic intervals using bedtools map. Parameters ---------- interval_files : List[Union[str, Path]] List of paths to interval files (BED/GFF format) filtered_insertions_file : Union[str, Path] Path to the filtered insertions file Notes ----- For each interval file, creates an output file with the format: {filtered_insertions_stem}_{interval_file_stem}.cnts containing the count of unique insertions and sum of insertions per interval. """ # Convert to Path objects filtered_path = Path(filtered_insertions_file) # Validate filtered insertions file if not filtered_path.exists(): raise FileNotFoundError( f"Filtered insertions file not found: {filtered_path}") if filtered_path.stat().st_size == 0: raise ValueError(f"Filtered insertions file is empty: {filtered_path}") # Process each interval file for interval_file in interval_files: interval_path = Path(interval_file) # Validate interval file if not interval_path.exists(): raise FileNotFoundError( f"Interval file not found: {interval_path}") if interval_path.stat().st_size == 0: logging.warning(f"Interval file is empty: {interval_path}") continue # Prepare output file path file_name = interval_path.stem output_file = filtered_path.parent / \ f"{filtered_path.name}_{file_name}.cnts" # Prepare bedtools command cmd = [ "bedtools", "map", "-a", str(interval_path), "-b", str(filtered_path), "-c", "1,4", # count column 1 and sum column 4 "-o", "count,sum" ] logging.info(f"Processing interval file: {interval_path}") try: # Run bedtools command with open(output_file, "w") as out_file: process = subprocess.run( cmd, stdout=out_file, stderr=subprocess.PIPE, check=True, text=True ) # Check if output file was created and has content if not output_file.exists() or output_file.stat().st_size == 0: raise RuntimeError( f"Output file is empty or was not created: {output_file}") logging.info( f"Successfully mapped intervals: {interval_path.name} with " f"{filtered_path.name} -> {output_file.name}" ) except subprocess.CalledProcessError as e: error_msg = e.stderr.strip() if e.stderr else str(e) logging.error( f"bedtools mapping failed for {interval_path.name}: {error_msg}" ) raise RuntimeError(f"bedtools mapping failed: {error_msg}") from e except Exception as e: logging.error( f"Error processing {interval_path.name}: {str(e)}" ) raise
[docs] def map_sample(sample, bam_dir, output_dir, interval_files): """Process a sample""" try: setup_logging(output_dir) logging.info(f"Starting processing sample {sample}") logging.info("Merging bams") output_bam = merge_bams(sample, output_dir, bam_dir) logging.info("Filtering bams") bed_file = filter_bam(output_bam) logging.info("Processing orientation") insertions_file = process_orientation(bed_file) logging.info("Sorting") sorted_file = sort_files(insertions_file) logging.info("Merging sorted") merged_file = merge_files(sorted_file) logging.info("Filtering insertions") filtered_file = filter_insertions(merged_file) logging.info("Counting insertions") count_insertions_over_intervals(interval_files, filtered_file) logging.info(f"Sample {sample} processing completed successfully") except Exception as e: logging.error(f"Sample processing failed: {e}") raise
# This would be to merge output from multiple samples
[docs] def merge_cnts_files( interval_file: Union[str, Path], counts_dir: Union[str, Path], name: str = "", format: str = 'gff' ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Merge count files from multiple samples into consolidated transposon and read count matrices. Parameters ---------- interval_file : str or Path Path to the interval file (GFF or BED format) counts_dir : str or Path Directory containing the count files to merge name : str, optional Prefix for output files, by default "" format : str, optional Format of the interval file ('gff' or 'bed'), by default 'gff' Returns ------- Tuple[pd.DataFrame, pd.DataFrame] Two DataFrames containing merged transposon counts and read counts respectively """ # Input validation if format not in ['gff', 'bed']: raise ValueError("Format must be either 'gff' or 'bed'") interval_path = Path(interval_file) counts_path = Path(counts_dir) if not interval_path.exists(): raise FileNotFoundError(f"Interval file not found: {interval_path}") if not counts_path.exists(): raise FileNotFoundError(f"Counts directory not found: {counts_path}") # Find count files file_name = interval_path.stem cnt_files = list(counts_path.rglob(f"*_{file_name}*.cnts")) if not cnt_files: raise ValueError( f"No count files found matching pattern '*_{file_name}*.cnts' in {counts_dir}") logging.info(f"Found {len(cnt_files)} count files to process") # Initialize lists for DataFrames tn_dfs: List[pd.DataFrame] = [] read_dfs: List[pd.DataFrame] = [] def process_id_column(df: pd.DataFrame, file_format: str) -> pd.Series: """Helper function to process ID column based on file format""" if file_format == 'bed': return df[[0, 1, 2]].astype(str).agg('_'.join, axis=1) else: # gff format id_col = df[df.columns[8]] try: return (id_col.str.split("locus_tag=", expand=True)[1] .str.split(";", expand=True)[0]) except (IndexError, KeyError): raise ValueError( "Invalid GFF format: missing or malformed locus_tag field") # Process each count file for file in cnt_files: try: sample_id = file.stem.split(".bed")[0].replace(".bam", '') logging.info(f"Processing sample: {sample_id}") # Read and validate input file ndf = pd.read_table(file, header=None) required_cols = [ndf.shape[1] - 2, ndf.shape[1] - 1] # Last two columns if any(col < 0 for col in required_cols): raise ValueError( f"File {file} does not have the expected number of columns") # Rename columns ndf = ndf.rename(columns={ ndf.columns[required_cols[0]]: "tn_per_gene", ndf.columns[required_cols[1]]: "read_per_gene" }) # Process ID column ndf['ID'] = process_id_column(ndf, format) # Clean and transform data ndf = ndf[["ID", "tn_per_gene", "read_per_gene"]].set_index('ID') ndf["read_per_gene"] = pd.to_numeric( ndf["read_per_gene"].replace({".": "0"}), errors='coerce' ).fillna(0) # Append transformed data tn_dfs.append(ndf[["tn_per_gene"]].rename( columns={"tn_per_gene": sample_id}).T) read_dfs.append( ndf[["read_per_gene"]].rename( columns={"read_per_gene": sample_id}).T ) except Exception as e: logging.error(f"Error processing file {file}: {str(e)}") raise # Merge and convert data tn_df = pd.concat(tn_dfs).fillna(0).astype(int).sort_index() read_df = pd.concat(read_dfs).fillna(0).astype(int).sort_index() # Save output files today_str = datetime.today().strftime("%Y-%m-%d") output_prefix = f"{today_str}_{name}" if name else today_str print(output_prefix) output_tn = counts_path / f"{output_prefix}_transposon_counts.csv" output_reads = counts_path / f"{output_prefix}_read_counts.csv" tn_df.T.to_csv(output_tn) read_df.T.to_csv(output_reads) logging.info(f"Saved transposon counts to: {output_tn}") logging.info(f"Saved read counts to: {output_reads}") return tn_df, read_df