3

I am trying to write python script for customized filtering for fastq file (size >3 GB). My proposed script is as follows:

def filtering(read):
    time.sleep(0.1)

    if len(read) >= 15 and \
       len(read) <= 30 and \
       np.mean(read.qual) >= 30 and \
       sum(i < 20 for i in read.qual) <=2:
           flag1 = True
           return  flag1

pool = multiprocessing.Pool(5)
pool.ncpus = 4
for file in os.listdir('/moRNA_data/trimmed_fastq'):
    if file.endswith('.fq'):
        fastq_file = HTSeq.FastqReader( os.path.join('/moRNA_data/trimmed_fastq',file))
        name = str(file.split('.')[0]) + '.fastq'
        file_out = open(os.path.join('/moRNA_data/filtered_fastq',name), "w" )
        for read in tqdm(fastq_file):    
            flag1 = pool.apply_async(filtering,read)
            if flag1:
                read.write_to_fastq_file( file_out )
pool.close()
pool.join()

The problem with this script is that it works well with small file like upto 100-200 MB but when it goes to directory having multiple sample each having size more than 3 GB, it works well for first file but after that it hangs so severely that I had to restart the server [ I can't even kill the process]. I need suggestions to improve this script to make fastq file processing faster. Platform details is as follows:

OS : Ubuntu-18.02
Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
CPU(s):              12
On-line CPU(s) list: 0-11
Thread(s) per core:  2
Core(s) per socket:  6
Socket(s):           1
NUMA node(s):        1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               158
Model name:          Intel(R) Core(TM) i7-8700 
CPU @ 3.20GHz
Stepping:            10
CPU MHz:             800.560
CPU max MHz:         4600.0000
CPU min MHz:         800.0000
BogoMIPS:            6384.00
Virtualization:      VT-x
L1d cache:           32K
L1i cache:           32K
L2 cache:            256K
L3 cache:            12288K
NUMA node0 CPU(s):   0-11

Thanks.

Chris_Rands
  • 3,948
  • 12
  • 31
Lot_to_learn
  • 530
  • 3
  • 14
  • Make sure that filtering() actually returns something, though that's likely not the root of the problem. – Devon Ryan Jun 12 '19 at 08:56
  • @DevonRyan just to clarify, tqdm only reports time in such cases it does not do a list conversion as you indicated- you can easily test with with an infinite generator https://pastebin.com/NJvwf5DT – Chris_Rands Jun 12 '19 at 08:58
  • Take out the time.sleep() call for starters. and I think you're missing an else: flag1 = False clause. If you still have issues, remove the multiprocessing code to assist your debugging – Chris_Rands Jun 12 '19 at 09:07
  • Did you try it on just one CPU or two just to test if the problem is with the threading and other applications on the server? Also how much RAM and memory is available? – llrs Jun 12 '19 at 09:49
  • @llrs : This codes works amazingly with small files. As I have mentioned in questions. I also tries with Pool(2) but same server hangs after processing of one file very severely. – Lot_to_learn Jun 12 '19 at 11:09
  • @Chris_Rands : My aim to ignore those reads which do not satisfy the given conditions. So I don't think it change something even if I write else statement. For tqdm approach, my only aim is to check how fast this script is checking sequences. When I run this script then for the first file, I saw that it is processing 8000-10000 reads per second (very fast) and process a fastq file of size 3 GB in few seconds. But after that it's block everything. I can't even kill the process. Thanks. – Lot_to_learn Jun 12 '19 at 11:11
  • @DevonRyan: I checked that too by printing it on small test file. But it prints only multiprocessing object details rather than its actual value. I also tried flag1.get but gets object description again. Again On small file, this script is OK. Many thanks for positive EDIT in question to make it more informative. – Lot_to_learn Jun 12 '19 at 11:16
  • you can ignore my comment about setting flag1 = False, it was an issue that the previous edit caused changing the indentation. Now (after my edit) the function returns None (which is falsly anyway) without an else-clause. you should still remove the sleep() call but i think your issue is due to the multiprocessing, strip that part out and see if it works – Chris_Rands Jun 12 '19 at 11:27
  • @Chris_Rands : Sorry if I am not able to understand your suggestions. Multiprocessing is the main part of program and my main goal is to split filtration of millions of reads in multiple process to make it faster. If I remove it then it will be a simple for loop script and definitely will work but extremely slowly. So what will you recommend in place of multiprocessing. – Lot_to_learn Jun 12 '19 at 11:38
  • it's unclear to me if you want to multiprocess per file or also within each file (per read). the per file is multiprocessing is much simpler. personally, i'm not a big fan of python's multiprocessing module so i would solve this (per file) via a pipelining wrapper like snakemake or ruffus – Chris_Rands Jun 12 '19 at 12:28
  • @Chris_Rands: It is not multiprocessing per file but it is within file. If you check the argument I am passing in pool.apply_async(filtering,read), I am passing a read sequence to a function. Read sequence I am getting using for loop. I may have little knowledge about per file application of multiprocessing. As far as I know, I can use it on very heavy file to make filtration task faster to split in many parallel processes. – Lot_to_learn Jun 12 '19 at 12:45
  • 1
    If you can't kill the process and your only option is to restart your server, it seems that you exceed your available RAM. So you could monitor the RAM usage at first and check how to clear the memory. – Mr_Z Jun 12 '19 at 13:41
  • @Lot_to_learn As detailed in the multiprocessing API docs, Pool.apply_async returns an AsyncResult object, not the result of the call directly, so I don't think your code is doing what you expect. – mbrig Jun 12 '19 at 16:06
  • If this is not just an exercise for you, I would suggest to use already existing tools. bdduk.sh from bbtools should have all the options you need. And btw: You cannot use np.mean to calculate the average quality. Because the phred scores behave logarithmic and not linear. – finswimmer Jun 12 '19 at 16:44
  • Note that str(file.split('.')[0]) is likely not doing what you want when a file name contains more than one dot. And it seems to me that str is unnecessary: you already have strings. You might be interested in os.path.splitext. – bli Jun 12 '19 at 20:33
  • @bli: That's I have taken care of. All files names are Sample_ID.fastq . So In all the files there is only one dot. For str you are right. I can remove that. – Lot_to_learn Jun 13 '19 at 04:05
  • @mbrig : I also realize the same that applying if condition to Pool.apply_async object may be the reason to hang system because this may set an infinite loop. – Lot_to_learn Jun 13 '19 at 04:07
  • @finswimmer : Thanks for suggesting bbduk. I have check. I have found quality trimming based on mean quality and based on length. There are still two consition for which I have to write script. Filter reads having more than 2 bases under quality score 20 and filter reads with unique sequence having counts less than 10 in fastq file. – Lot_to_learn Jun 13 '19 at 04:11

1 Answers1

6

Knowing a high-performance language will make a huge difference here. See the example below in C. I haven't tested, but it should be easy to modify for your purpose.

C++, Rust, Go, Nim and Julia can be as fast.

// Download https://raw.githubusercontent.com/lh3/minimap2/master/kseq.h
// Compile with: gcc -O2 -o myprog this-file.c -lz
#include <zlib.h>  
#include <stdio.h>  
#include "kseq.h"
// STEP 1: declare the type of file handler and the read() function  
KSEQ_INIT(gzFile, gzread)  

int main(int argc, char *argv[])  
{  
    gzFile fp;  
    kseq_t *seq;  
    if (argc == 1) {  
        fprintf(stderr, "Usage: %s <in.seq>\n", argv[0]);  
        return 1;  
    }  
    fp = gzopen(argv[1], "r"); // STEP 2: open the file handler  
    seq = kseq_init(fp); // STEP 3: initialize seq  
    while (kseq_read(seq) >= 0) { // STEP 4: read sequence  
        int i, sum, low;
        if (seq->qual.l == 0) continue; // no quality
        if (seq->seq.l < 15 || seq->seq.l > 30) continue;
        for (i = sum = low = 0; i < seq->seq.l; ++i) {
            int qual = seq->qual.s[i] - 33; // assuming Sanger quality
            sum += qual;
            if (qual < 20) ++low;
        }
        if (sum < seq->seq.l * 30.0 || low > 2) continue;
        printf("@%s\n", seq->name.s); // output can be made faster
        puts(seq->seq.s);
        printf("+\n");
        puts(seq->qual.s);
    }  
    kseq_destroy(seq); // STEP 5: destroy seq  
    gzclose(fp); // STEP 6: close the file handler  
    return 0;  
}  

To apply the program to all *.fq in a directory (again, not tested):

mkdir -p filtered
ls *.fq | xargs -i echo ./myprog {} \> filtered/{} | sh

You can also replace the final sh with parallel, but the above should be fast enough.

user172818
  • 6,515
  • 2
  • 13
  • 29
  • :Thanks for your response. Till now I have done all my analysis in either python or R. I have never touched C/C++. Although I studies those language but I left using this before 8 or more years. But still I think I should try your way [although it is very much time taking for me to start heavy text file processing in C/C++]. For time being, Do you have any python alternative solution for this problem? – Lot_to_learn Jun 14 '19 at 01:52
  • @Lot_to_learn The inner loop mimics your script. It won't take you much time to modify it for other purposes even if you don't know C well. For tasks like this, C will be times faster than python whatever you do. – user172818 Jun 14 '19 at 02:54