defparse_gzip_fastq(filename:str): with gzip.open(filename, 'rt') as f: whileTrue: try: lines = [next(f).strip("\n") for _ inrange(4)] except: break ifnotall(lines): break id = lines[0][1:] sequence = lines[1] quality = lines[3] yield FastqRecord(id, sequence, quality)
3. 读取Barcode序列
Barcode是任意两个序列的2V2组合,而且在read_2上,所以需要反向互补。
import itertools
defreverse_complement(seq: str) -> str: """Compute reverse complement of a sequence""" basecomplement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'} letters = list(seq) letters = [basecomplement[base] for base in letters] return''.join(letters[::-1])
defread_barcode(barcode: str): """Read barcode from file""" codes = List[str]() withopen(barcode, 'r') as f: codes = [reverse_complement(line.strip("\n")) for line in f] return [cc for cc in itertools.product(codes, codes)]
最终拆分Barcode的Main函数如下
defsplit_barcodes(fq1:str,fq2:str, barcodes: List[Tuple[str,str]]): """make files for every barcode""" read1_files = {barcode:open(f"{barcode[0]}_{barcode[1]}_1.fastq","a+") for barcode in barcodes} read2_files = {barcode:open(f"{barcode[0]}_{barcode[1]}_2.fastq","a+") for barcode in barcodes} """read raw fastq files""" for record1, record2 inzip(parse_gzip_fastq(fq1), parse_gzip_fastq(fq2)): for barcode in barcodes: barcode_l, barcode_r = barcode if hamming_distance(record2.sequence[:len(barcode_l)], barcode_l) <= 1and hamming_distance(record2.sequence[len(barcode_r)+15:len(barcode_r)*2+15], barcode_r) <= 3: read1_files[barcode].write(f"{record1.id}\n") read1_files[barcode].write(record1.sequence+"\n") read1_files[barcode].write("+\n") read1_files[barcode].write(record1.quality+"\n") read2_files[barcode].write(f"{record2.id}\n") read2_files[barcode].write(record2.sequence+"\n") read2_files[barcode].write("+\n") read2_files[barcode].write(record2.quality+"\n") [f.close() for f in read1_files.values()] [f.close() for f in read2_files.values()]