#!/usr/bin/env python3
"""
Script to map FASTQ files to the yeast genome using STAR aligner.
Takes a directory of FASTQ files and aligns them to a specified yeast genome reference.
"""
import os
import subprocess
import logging
from datetime import datetime
import sys
import re
import gzip
import sys
import tempfile
from pathlib import Path
import shutil
[docs]
def setup_logger(log_dir):
"""Set up the logger to write to both console and file"""
if not os.path.exists(log_dir):
os.makedirs(log_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = os.path.join(log_dir, f"yeast_mapping_{timestamp}.log")
# Create logger
logger = logging.getLogger("yeast_mapper")
logger.setLevel(logging.INFO)
# Create handlers
file_handler = logging.FileHandler(log_file)
console_handler = logging.StreamHandler(sys.stdout)
# Create formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add handlers to logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
[docs]
def find_fastq_files(fastq_dir, suffix=None):
"""Find all FASTQ files in the given directory"""
fastq_files = []
extensions = [suffix] if suffix else [
".fastq", ".fq", ".fastq.gz", ".fq.gz"]
for ext in extensions:
fastq_files.extend(Path(fastq_dir).rglob(f"*{ext}"))
return sorted(fastq_files)
[docs]
def get_sample_name(fastq_file):
"""Extract sample name from FASTQ file path"""
base_name = os.path.basename(fastq_file)
# Remove extensions
sample_name = re.sub(r'\.fastq(\.gz)?$|\.fq(\.gz)?$', '', base_name)
# Remove common sequencing-related suffixes
sample_name = re.sub(r'_R[12]$|_R[12]_001$|_[12]$', '', sample_name)
return sample_name
[docs]
def find_paired_files(fastq_files):
"""Group FASTQ files into pairs if they appear to be paired-end reads"""
paired_files = {}
for fastq_file in fastq_files:
base_name = os.path.basename(fastq_file)
# Look for common paired-end indicators
r1_match = re.search(r'_(R1|1)(_001)?\.f(ast)?q(\.gz)?$', base_name)
r2_match = re.search(r'_(R2|2)(_001)?\.f(ast)?q(\.gz)?$', base_name)
if r1_match:
sample_name = get_sample_name(fastq_file)
if sample_name not in paired_files:
paired_files[sample_name] = {'R1': None, 'R2': None}
paired_files[sample_name]['R1'] = fastq_file
elif r2_match:
sample_name = get_sample_name(fastq_file)
if sample_name not in paired_files:
paired_files[sample_name] = {'R1': None, 'R2': None}
paired_files[sample_name]['R2'] = fastq_file
else:
# Treat as single-end
sample_name = get_sample_name(fastq_file)
if sample_name not in paired_files:
paired_files[sample_name] = {'R1': fastq_file, 'R2': None}
return paired_files
[docs]
def run_star_alignment(fastq_file1, fastq_file2, output_dir, genome_dir, sample_name, threads, logger,
limit_bam_sort_ram=2_000_000_000):
"""Run STAR alignment for single or paired-end reads"""
# Coerce path-like args to str so they can be passed to STAR and joined for logging
fastq_file1 = str(fastq_file1)
if fastq_file2 is not None:
fastq_file2 = str(fastq_file2)
# Create sample output directory
sample_output_dir = os.path.join(output_dir, sample_name)
if not os.path.exists(sample_output_dir):
os.makedirs(sample_output_dir)
# Construct STAR command
star_cmd = [
"STAR",
"--runThreadN", str(threads),
"--genomeDir", genome_dir,
"--outSAMtype", "BAM", "SortedByCoordinate",
"--outFileNamePrefix", os.path.join(sample_output_dir,
f"{sample_name}_"),
"--limitBAMsortRAM", str(limit_bam_sort_ram),
"--readFilesIn", fastq_file1
]
# Add second read file if paired-end
if fastq_file2:
star_cmd.append(fastq_file2)
# If input is gzipped, add appropriate parameter
if str(fastq_file1).endswith('.gz'):
star_cmd.extend(["--readFilesCommand", "zcat"])
# Log the command
logger.info(
f"Running STAR alignment for {sample_name}: {' '.join(star_cmd)}")
# Run STAR
try:
subprocess.run(star_cmd, check=True)
logger.info(f"STAR alignment completed successfully for {sample_name}")
# Create a convenience symlink to the final BAM (overwrite if re-running)
final_bam = os.path.join(
sample_output_dir, f"{sample_name}_Aligned.sortedByCoord.out.bam")
renamed_bam = os.path.join(output_dir, f"{sample_name}.bam")
if os.path.lexists(renamed_bam):
logger.info(f"Replacing existing symlink/file: {renamed_bam}")
os.remove(renamed_bam)
os.symlink(final_bam, renamed_bam)
logger.info(f"Created symlink: {renamed_bam}")
return renamed_bam
except subprocess.CalledProcessError as e:
logger.error(f"STAR alignment failed for {sample_name}: {e}")
return None
[docs]
def check_star_installed():
"""Check if STAR is installed and available"""
try:
subprocess.run(["STAR", "--version"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except FileNotFoundError:
return False
[docs]
def create_genome_index(genome_fasta, genome_dir, threads, logger):
logger.info(f"Creating STAR genome index in {genome_dir}")
# Coerce path-like args to str for string ops and subprocess/logging
genome_fasta = str(genome_fasta)
genome_dir = str(genome_dir)
# Create genome directory if it doesn't exist
if not os.path.exists(genome_dir):
logger.error("Genome directory does not exist")
sys.exit(1)
is_gzipped = genome_fasta.endswith('.gz')
tmp_fasta = None
star_success = False
try:
if is_gzipped:
logger.info(f"Decompressing {genome_fasta} to a temporary file")
fd, tmp_fasta = tempfile.mkstemp(suffix=".fa")
with gzip.open(genome_fasta, 'rb') as f_in, os.fdopen(fd, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
fasta_for_star = tmp_fasta
else:
fasta_for_star = genome_fasta
# For yeast genome, these parameters work well
# Adjusted for smaller genome size
star_cmd = [
"STAR",
"--runMode", "genomeGenerate",
"--runThreadN", str(threads),
"--genomeDir", genome_dir,
"--genomeFastaFiles", fasta_for_star, # Use the appropriate file
# Reduced for smaller genome (default: 14)
"--genomeSAindexNbases", "10",
"--limitGenomeGenerateRAM", "16000000000" # 16GB limit
]
cmd_str = ' '.join(star_cmd)
logger.info(f"Running STAR genome generation: {cmd_str}")
# Run STAR command and capture output for logging
process = subprocess.run(
star_cmd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Log some of the output for debugging purposes
logger.info("STAR completed with output:")
# Log last 10 lines of stdout
for line in process.stdout.splitlines()[-10:]:
logger.info(f"STAR stdout: {line}")
# Check specifically for successful index creation
if not verify_star_index(genome_dir, logger):
logger.error("STAR completed but did not create a valid index")
star_success = False
else:
logger.info("STAR genome index creation completed successfully")
star_success = True
except subprocess.CalledProcessError as e:
logger.error(
f"STAR genome index creation failed with code {e.returncode}")
if e.stdout:
logger.error(f"STAR stdout: {e.stdout}")
if e.stderr:
logger.error(f"STAR stderr: {e.stderr}")
star_success = False
except Exception as e:
logger.error(f"Error during genome index creation: {e}")
star_success = False
finally:
if tmp_fasta and os.path.exists(tmp_fasta):
try:
os.remove(tmp_fasta)
logger.info(f"Removed temporary file {tmp_fasta}")
except Exception as cleanup_error:
logger.error(
f"Failed to remove temporary file: {cleanup_error}")
return star_success
[docs]
def verify_star_index(genome_dir, logger):
"""
Verify that STAR actually created a valid index.
Args:
genome_dir (str): Directory containing the genome index
logger: Logger object for logging messages
Returns:
bool: True if valid index exists, False otherwise
"""
import os
required_files = [
"Genome",
"SA",
"SAindex",
"chrLength.txt",
"chrNameLength.txt",
"chrName.txt",
"chrStart.txt",
"genomeParameters.txt"
]
missing_files = []
for file in required_files:
file_path = os.path.join(genome_dir, file)
if not os.path.exists(file_path):
missing_files.append(file)
if missing_files:
logger.info(
f"Missing required index files: {', '.join(missing_files)}")
return False
logger.info("Verified STAR index was created successfully")
return True
[docs]
def map_fastq_to_bam(fastq_dir, output_dir, genome_fasta, threads,
suffix=[], single_end=True,
limit_bam_sort_ram=2_000_000_000):
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Set up logger
logger = setup_logger(output_dir)
# Check if STAR is installed
if not check_star_installed():
logger.error(
"STAR aligner not found. Please install STAR and add it to your PATH.")
sys.exit(1)
# Verify genome index
genome_dir = Path(genome_fasta).parent
if not verify_star_index(genome_dir, logger):
if Path(genome_fasta).is_file():
logger.info(
f"STAR genome index not found or incomplete in {genome_dir}")
logger.info(
f"Will create index using FASTA file: {genome_fasta}")
index_done = create_genome_index(
genome_fasta, genome_dir, threads, logger)
if not index_done:
logger.error("Failed to create genome index. Exiting.")
sys.exit(1)
else:
logger.error(
f"Genome FASTA file not found: {genome_fasta}")
sys.exit(1)
logger.info(f"START genome index found in {genome_dir}")
# Find FASTQ files
logger.info(f"Searching for FASTQ files in {fastq_dir} with rglob")
fastq_files = find_fastq_files(fastq_dir, suffix=suffix)
if not fastq_files:
logger.error(f"No FASTQ files found in {fastq_dir}")
sys.exit(1)
logger.info(f"Found {len(fastq_files)} FASTQ files")
# Process FASTQ files based on pairing mode
if single_end:
# Force single-end mode
logger.info("Processing all files as single-end reads")
for fastq_file in fastq_files:
sample_name = get_sample_name(fastq_file)
run_star_alignment(str(fastq_file), None, str(output_dir),
str(genome_dir), sample_name, threads, logger,
limit_bam_sort_ram=limit_bam_sort_ram)
else:
logger.error('Paired end not implemented')
# Auto-detect or use paired mode
# paired_files = find_paired_files(fastq_files)
# logger.info(f"Grouped files into {len(paired_files)} sample(s)")
# for sample_name, files in paired_files.items():
# if args.paired and (files['R1'] is None or files['R2'] is None):
# logger.warning(
# f"Skipping {sample_name} - missing R1 or R2 file for paired-end mode")
# continue
# if files['R2'] is not None:
# logger.info(f"Processing {sample_name} as paired-end")
# run_star_alignment(files['R1'], files['R2'], args.output_dir,
# args.genome_dir, sample_name, args.threads, logger)
# else:
# logger.info(f"Processing {sample_name} as single-end")
# run_star_alignment(files['R1'], None, args.output_dir,
# args.genome_dir, sample_name, args.threads, logger)
logger.info("STAR alignment process completed.")