Python并发实践_03_并发实战之一
16S数据质控流程,一次下机lane包括很多的项目,每个项目有独立的合同号,一个项目可能包含16S或者ITS两种,通过一个完整的pipeline,将上游拆分好的数据全部整理成可以直接分析的数据。原本这个工作是通过并行的sge实现,是运行层面的并行,现在在程序层面实现并行处理,可以脱离sge系统工作。
1 import os 2 import sys 3 import re 4 import time 5 import collections 6 from multiprocessing import Process,JoinableQueue,Queue,cpu_count 7 from threading import Thread 8 from settings import primer,pandaseq_soft 9 from programs import * 10 11 Result = collections.namedtuple("Result","compact sample_name HQ_fq") 12 13 def parse_sam_barcode_file(sam_barcode_file): 14 for line in open(sam_barcode_file): 15 yield line.strip().split(\'\t\') 16 17 def proc(compact,sample_name,work_path,lib_method,data_type): 18 split_path = \'%s/Split\'%work_path 19 QC_path = \'%s/QC\'%work_path 20 compact_path = \'%s/%s\'%(QC_path,compact) 21 if not os.path.exists(compact_path): 22 os.makedirs(compact_path) 23 sample_path = \'%s/%s\'%(compact_path,sample_name) 24 if not os.path.exists(sample_path): 25 os.makedirs(sample_path) 26 original_path = \'%s/%s/%s\'%(split_path,compact,sample_name) 27 (read1,read2) = os.popen(\'ls %s/*\'%original_path).read().strip().split(\'\n\') 28 pandaseq_fq = \'%s/pandaseq.fq\'%sample_path 29 pandaseq_log = \'%s/pandaseq.log\'%sample_path 30 pandaseq(pandaseq_soft,read1,read2,pandaseq_fq,primer[lib_method][data_type][\'forward\'],primer[lib_method][data_type][\'reverse\'],pandaseq_log) 31 high_quality_fq = \'%s/high_quality.fq\'%sample_path 32 high_quality_log = \'%s/high_quality.stat\'%sample_path 33 QC(pandaseq_fq,high_quality_fq,high_quality_log,data_type) 34 return Result(compact,sample_name,high_quality_fq) 35 36 def worker(work_path,jobs,results): 37 while True: 38 try: 39 compact,sample_name,lib_method,data_type = jobs.get() 40 try: 41 result = proc(compact,sample_name,work_path,lib_method,data_type) 42 sys.stderr.write( \'Process %s is finished doing with compact:%s sample_name:%s\n\'%(os.getpid(),compact,sample_name) ) 43 results.put(result) 44 except: 45 sys.stderr.write(\'Process %s is FIALED !!! %s/%s may be some problem!\n\'%(os.getpid(),compact,sample_name)) 46 jobs.put((compact,sample_name,lib_method,data_type)) 47 sys.stderr.write(\'The job is repushed into the queue,with compact:%s sample_name:%s\n\'%(compact,sample_name)) 48 finally: 49 jobs.task_done() 50 51 def add_jobs(work_path,sam_barcode_file_list,jobs): 52 job_num = 0 53 data_type_hash = {} 54 for todo,sam_barcode_file in enumerate(sam_barcode_file_list): 55 sam_barcode_file = sam_barcode_file.strip() 56 if not os.path.isfile(sam_barcode_file): 57 continue 58 lib_method = get_lib_method(sam_barcode_file) 59 if lib_method is None: 60 continue 61 print \'sam_barcode_file loading: %s ...... ok\n\'%sam_barcode_file 62 for compact,sample_name,barcode_info,data_type in parse_sam_barcode_file(sam_barcode_file): 63 print \'sam_barcode_file loading: %s ...... ok\n\'%sam_barcode_file 64 for compact,sample_name,barcode_info,data_type in parse_sam_barcode_file(sam_barcode_file): 65 if not data_type_hash.has_key(compact): 66 data_type_hash[compact] = {} 67 if not data_type_hash[compact].has_key(data_type): 68 data_type_hash[compact][data_type] = [] 69 data_type_hash[compact][data_type].append(sample_name) 70 jobs.put((compact,sample_name,lib_method,data_type)) 71 job_num += 1 72 sys.stderr.write(\'The job is pushed into the queue,with compact:%s sample_name:%s\n\'%(compact,sample_name)) 73 sys.stderr.write(\'\n### All %s jobs have been pushed into the queue ###\n\'%job_num) 74 return data_type_hash 75 76 def create_processes(concurrency,jobs,work_path,results): 77 print \'\nBegin create jobs with %s Process...\n\'%concurrency 78 for _ in range(concurrency): 79 process = Process(target=worker,args=(work_path,jobs,results)) 80 process.daemon = True 81 process.start() 82 83 def main(work_path,sam_barcode_file_list): 84 global concurrency 85 split_path = \'%s/Split\'%work_path 86 QC_path = \'%s/QC\'%work_path 87 jobs = JoinableQueue() 88 results = Queue() 89 90 canceled = False 91 data_type_hash = add_jobs(split_path,sam_barcode_file_list,jobs) 92 create_processes(concurrency,jobs,work_path,results) 93 try: 94 jobs.join() 95 except KeyboardInterrupt: 96 sys.stderr.write(\'cancelling ...\n\') 97 canceled = True 98 finally: 99 job_num = 0 100 finished_hash = {} 101 while not results.empty(): 102 result = results.get_nowait() 103 job_num += 1 104 if not finished_hash.has_key(result.compact): 105 finished_hash[result.compact] = [] 106 finished_hash[result.compact].append(result.sample_name) 107 sys.stderr.write(\'all %s work finished!\n\n\'%job_num) 108 log_out = open(\'%s/work.log\'%QC_path,\'w\') 109 for compact,sample_list in finished_hash.iteritems(): 110 for sample_name in sample_list: 111 log_out.write(\'%s\t%s has been finished\n\'%(compact,sample_name)) 112 log_out.close() 113 if canceled: 114 return False 115 116 for compact in os.listdir(QC_path): 117 compact_dir = \'%s/%s\'%(QC_path,compact) 118 if not os.path.isdir(compact_dir): 119 continue 120 sys.stderr.write(\'Begin stat compact: %s\n\'%compact) 121 reads_stat(compact_dir) 122 sys.stderr.write(\'All campact stat finished!\n\n\') 123 124 reads_stat_all(QC_path,split_path) 125 126 merge_threads = set() 127 for compact,subitem in data_type_hash.iteritems(): 128 compact_dir = \'%s/%s\'%(QC_path,compact) 129 for data_type,sample_list in subitem.iteritems(): 130 merged_file = \'%s/%s/%s.together.fna\'%(QC_path,compact,data_type) 131 t = Thread(target=sampleMerge,args=(sample_list,data_type,compact_dir,merged_file)) 132 merge_threads.add(t) 133 t.start() 134 while True: 135 if threading.activeCount() < concurrency: 136 break 137 for t in threading.enumerate(): 138 if t in merge_threads: 139 t.join() 140 141 sys.stderr.write(\'\n All pipeline is done ! \n\') 142 143 if __name__ == \'__main__\': 144 sys.argv.pop(0) 145 if len(sys.argv) < 1: 146 sys.stderr.write(\'Usage: python run_pipeline.py work_path [process_num] \n process_num default is cpu_count\n\') 147 sys.exit() 148 work_path = sys.argv.pop(0) 149 work_path = os.path.abspath(work_path) 150 sys.stderr.write(\'Workdir is %s,pipeline begin\n\'%work_path) 151 sam_barcode_file_list = os.popen(\'ls %s/Split/sam_barcode.*\'%work_path).read().strip().split(\'\n\') 152 if len(sys.argv) != 0: 153 concurrency = int(sys.argv.pop(0)) 154 else: 155 concurrency = cpu_count() 156 157 main(work_path,sam_barcode_file_list)
下面是一些辅助程序:
1 from __future__ import division 2 from threading import Thread,Lock 3 from multiprocessing import cpu_count 4 import threading 5 import sys 6 import os 7 import re 8 import types 9 from Bio import SeqIO 10 from Bio.SeqRecord import SeqRecord 11 def fq_reads_num(fq_file): 12 wc_out = os.popen(\'wc -l %s\'%fq_file).read().strip() 13 result = int(re.search(\'^(\d+)\',wc_out).group(1)) / 4 14 return int(result) 15 16 def Q_ave(self): 17 Q_sum = 0 18 for qlist in self.letter_annotations.itervalues(): 19 for q in qlist: 20 Q_sum += q 21 Q_ave = Q_sum / len(self) 22 return Q_ave 23 24 def QC(file,out_file,out_stat_file,data_type): 25 SeqRecord.Q_ave = Q_ave 26 out_stat = open(out_stat_file,\'w\') 27 out = open(out_file,\'w\') 28 29 count = 0 30 high_count = 0 31 for record in SeqIO.parse(open(file),\'fastq\'): 32 count += 1 33 if record.Q_ave() < 20: 34 continue 35 if len(record) < 220 or len(record) > 500: 36 continue 37 out.write(record.format(\'fastq\')) 38 high_count += 1 39 high_ratio = high_count / count 40 out_stat.write(\'%s\t%s\t%s\t%s\n\'%(data_type,count,high_count,high_ratio)) 41 42 class MyList(list): 43 def __str__(self): 44 out_str = \'\' 45 for item in self: 46 out_str += item 47 out_str += \'\t\' 48 return out_str.strip() 49 50 def parse_stat(stat_file): 51 tabs = os.popen(\'cat %s\'%stat_file).read().strip().split(\'\t\') 52 yield tabs 53 54 def parse_stat_files(compact_path): 55 for f in os.popen(\'ls %s/*/*.stat\'%compact_path): 56 stat_file = f.strip() 57 sample_name = re.search(\'%s\/(\S+)\/high_quality\.stat\'%compact_path,stat_file).group(1) 58 yield stat_file,sample_name 59 60 def reads_stat(compact_path): 61 out = open(\'%s/reads_stat.xls\'%compact_path,\'w\') 62 sample_reads = {} 63 out = open(\'%s/reads_stat.xls\'%compact_path,\'w\') 64 sample_reads = {} 65 for stat_file,sample_name in parse_stat_files(compact_path): 66 for tabs in parse_stat(stat_file): 67 sample_reads[sample_name] = tabs 68 69 out.write(\'sample_name\tsample_type\traw_reads\tHQ_reads\tHQ_ratio\n\') 70 for sample,tabs in sample_reads.iteritems(): 71 tabs = MyList(tabs) 72 out.write(\'%s\t%s\n\'%(sample,str(tabs))) 73 out.close() 74 75 def raw_stat_thread(fq_file,lock,compact,sample_name,tabs,out): 76 global total_reads 77 # sys.stderr.write(\'thread %s stat with %s %s\n\'%(threading.currentThread().ident,compact,sample_name)) 78 raw_reads = fq_reads_num(fq_file) 79 lock.acquire() 80 total_reads += raw_reads 81 data_type = tabs.pop(0) 82 ratio = int(tabs[1]) / raw_reads * 100 83 tabs = str(MyList(tabs)) 84 out.write(\'%s\t%s\t%s\t%s\t%s\t%2.2f%%\n\'%(compact,sample_name,data_type,raw_reads,tabs,ratio)) 85 lock.release() 86 # sys.stderr.write(\'thread %s finished doing with %s %s\n\'%(threading.currentThread().ident,compact,sample_name)) 87 88 total_reads = 0 89 def reads_stat_all(work_path,original_path): 90 global total_reads 91 sys.stderr.write(\'\nmerge stat is begin ... \n\') 92 out = open(\'%s/reads_stat.xls\'%work_path,\'w\') 93 compact_hash = {} 94 for f in os.listdir(work_path): 95 compact = f.strip() 96 compact_path = \'%s/%s\'%(work_path,compact) 97 if not os.path.isdir(compact_path): 98 continue 99 if not compact_hash.has_key(compact): 100 compact_hash[compact] = {} 101 for stat_file,sample_name in parse_stat_files(compact_path): 102 for tabs in parse_stat(stat_file): 103 compact_hash[compact][sample_name] = tabs 104 out.write(\'compact\tsample_name\tdata_type\traw_reads\tpandaseq_reads\tHQ_reads\tratio\n\') 105 lock = Lock() 106 active_threads = set() 107 for compact,sample in compact_hash.iteritems(): 108 sys.stderr.write(\'doing with %s stat\n\'%compact) 109 for sample_name,tabs in sample.iteritems(): 110 original_fq = os.popen(\'ls %s/%s/%s/*\'%(original_path,compact,sample_name)).read().strip().split(\'\n\').pop(0) 111 t = Thread(target=raw_stat_thread,args=(original_fq,lock,compact,sample_name,tabs,out)) 112 active_threads.add(t) 113 t.start() 114 while True: 115 if threading.activeCount() < cpu_count(): 116 break 117 out.flush() 118 for t in threading.enumerate(): 119 if t in active_threads: 120 sys.stderr.write(\'thread %s is still alive, wait ...\n\'%t.ident) 121 t.join() 122 sys.stderr.write(\'Unaligned stating ...\n\') 123 out.write(\'\n###\n\') 124 unalign_fq = os.popen(\'ls %s/Unalign/*\'%original_path).read().strip().split(\'\n\').pop(0) 125 unalign_reads = fq_reads_num(unalign_fq) 126 total_reads += unalign_reads 127 ratio = unalign_reads / total_reads * 100 128 out.write(\'Unalign\t%s\t%2.2f%%\n\'%(unalign_reads,ratio)) 129 out.close() 130 sys.stderr.write(\'merge stat is all finished!\n\n\') 131 132 def pandaseq(pandaseq_soft,read1,read2,fa_out,f_primer,r_primer,log_out): 133 cmd = \'%s -F -f %s -r %s -w %s -p %s -q %s -g %s -l 220 -L 500\'%(pandaseq_soft,read1,read2,fa_out,f_primer,r_primer,log_out) 134 os.system(cmd) 135 136 def sampleMerge(sample_list,data_type,file_path,outfile): 137 outhandle = open(outfile,\'w\') 138 # sys.stderr.write(\'Begin merge into %s\n\'%file_path) 139 reads_num = {} 140 f_template = \'%s/%s/high_quality.fq\' 141 for sample in sample_list: 142 f = f_template%(file_path,sample) 143 sample = re.sub(\'[-_]\',\'.\',sample) 144 sample = \'%s%s\'%(data_type,sample) 145 if not reads_num.has_key(sample): 146 reads_num[sample] = 0 147 for record in SeqIO.parse(open(f),\'fastq\'): 148 reads_num[sample] += 1 149 outhandle.write(\'>%s_%s\n%s\n\'%(sample,reads_num[sample],str(record.seq))) 150 outhandle.close() 151 sys.stderr.write(\'merge file: %s is finished\n\'%outfile) 152 153 def get_lib_method(file): 154 file = os.path.basename(file) 155 if re.match(\'^sam_barcode.l$\',file): 156 lib_method = \'Self\' 157 elif re.match(\'^sam_barcode.s\d+$\',file): 158 lib_method = \'HXT\' 159 else: 160 lib_method = None 161 return lib_method
settings.py中包含不同建库方式的引物序列。
这个程序也算是前几天的学习成果展示了
版权声明:本文为lyon2014原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。