
I have been doing a lot of Python programming lately even though my core competency remains on Java. Since a lot of these programs would benefit from some form of parallelism, I spent some time analyzing capabilities of Python. I used an old computer for this project since I wanted processing to be slow.
I am running on a Dell Latitude E6530 machine where I installed Ubuntu (20.04.2 LTS). Let me first start with lscpu command on this computer.
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
CPU(s): 8
On-line CPU(s) list: 0-7
Thread(s) per core: 2
Core(s) per socket: 4
Socket(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 58
CPU MHz: 1306.125
CPU max MHz: 3700.0000
CPU min MHz: 1200.0000
This is no high end PC. It has one socket and has quad-core for each CPU. Each core is capable of running two threads. So logically count of CPU for this machine is 1x4x2=8. This is not high from any account.
Parallel Processing
Before we go into details of parallel processing, we will briefly touch I/O bound vs CPU bound processes. An execution is considered CPU bound if the performance is correlated to the CPU. On the other hand, an execution is considered I/O bound if the performance is correlated with something on the subsystem. Normally any graphics intensive task, number crunching are all CPU intensive. However, heavy read/ writes to disk by database, network operations will all be part if I/O bound tasks. There are multiple approaches to run a specific task in parallel.
Let us take the example of copying a file from location A to location B. One of the tricks to speed up this process is to split the file in chunks and copy chunks instead of handling the entire file in one thread. Since file operation is inherently I/O intensive, a multi threaded approach will normally help to reduce process time.
Multithreading vs Multiprocessing

Multithreading is a technique in which program is executed ‘concurrently’ by breaking into smaller pieces and processing individually. Each thread in this case is run parallel and runs within the boundaries of the same process. One of the biggest advantage this gives us is that when one of the threads is busy processing, other threads are not blocked and continue on their own. Assuming uniprocessor, threads are normally managed by CPU by time slicing each one of them. There is always an overhead of thread synchronization.
Multiprocessing is on the other hand reliant on availability of more than one physical or logical CPUs. Logical cores were something used by Intel in their older Pentium processors and were called hyper-threading. In the case of multiprocessing, task is broken up in smaller chunks and sent to different CPU cores to process.
Both these techniques has its own advantage and disadvantages. I will not go into full details for each. Next I will jump into my Python IDE and try to implement each of these methods. One quick caveat, I have created the programs by following various manuals and internet articles. They work, but may not be the best approach for the problems. I created them for my reference, so that I can use them in future projects.
Problem Statement
For this project I created a collection in mongo where i have 4 million records. Assume that I am keeping data for a logistics company, and would store the trip plan data for a freight. I have used python faker to fake data. Given below is a sample record.
{ "_id" : ObjectId("60881c9a52361982ebb198bd"), "driverId" : 59673, "driverName" : "Hannah Lucas", "driverLicense" : "b620-284813-13", "truckType" : "Class 8", "trailerCount" : 2, "trailerType" : [ "Automotive Hauler", "Livestock" ], "origin" : "Herman-Cox", "destination" : "Ray, West and Cardenas", "contract" : { "billOfLading" : "msS8-14871205", "agreementFormId" : "C5-297581-037", "customsFormId" : "Y962" }, "tripPlans" : [ { "stopType" : "PICKUP", "estimatedArrival" : ISODate("2011-11-05T07:30:45Z"), "estimatedDeparture" : ISODate("2011-11-05T08:30:45Z"), "location" : { "address" : "63485 Stephanie Mill Suite 960", "city" : "South Erica", "country" : "US", "postal" : "36344", "name" : "Herman-Cox" } }, { "stopType" : "STOP", "estimatedArrival" : ISODate("2011-11-05T18:30:45Z"), "estimatedDeparture" : ISODate("2011-11-05T21:30:45Z"), "location" : { "address" : "52374 Jenna Walks Suite 532", "city" : "Andersonland", "country" : "US", "postal" : "47736", "name" : "Martin and Sons" } }, { "stopType" : "DROPOFF", "estimatedArrival" : ISODate("2011-11-07T08:30:45Z"), "estimatedDeparture" : ISODate("2011-11-07T09:30:45Z"), "location" : { "address" : "79884 Ward Brooks Suite 262", "city" : "Mariabury", "country" : "US", "postal" : "36361", "name" : "Ray, West and Cardenas" } } ] }
For this project, I will go through the entire database starting 2011 and finding how many trailers were moved every month. For trip date I have used pickup date (estimatedArrival in first tripPlans record). Trailer count is present in base object and the only mapping that needs to be done is add this value up.
I am adding a subset of sample response below.
[{'201111': 4747, '201107': 5144, '201110': 5065, '201106': 4992, '201108': 5128, '201105': 5130, '201112': 5027, '201109': 4923, '201104': 397}, {'201801': 5292, '201808': 5172, '201807': 4968, '201906': 5043}, {'202008': 5099, '202007': 5222, '202006': 4902, '202001': 5129, '202004': 4991, '202012': 5239, '202005': 4985, '202003': 5084, '202011': 4986, '202002': 4734, '202010': 5186, '202009': 4912}]
Single Threaded
We will start with the basic solution – creating a single threaded version of the program that extracts the above report. We will open a connection to the database, iterate over all records within specified dates and add up the trailers. Mongo database name is truckies and collection is workitems.
def fetch_data(self, start_date, end_date): dct = {} client = None try: client, conn = MongoCommon.get_db_connection('mongodb://localhost:27017', 'truckies') col = conn['workitems'] for x in col.find({ 'tripPlans.0.estimatedArrival': { '$gte': start_date, '$lt': end_date }}): idx = '%04d%02d' % ((x['tripPlans'][0]['estimatedArrival']).year, (x['tripPlans'][0]['estimatedArrival']).month) cnt_trailers = x['trailerCount'] if idx in dct: dct[idx] = dct[idx] + cnt_trailers else: dct[idx] = cnt_trailers except Exception as e: print(e) if client: MongoCommon.close_connection(client) return dct
This is a very simple logic. We iterate through all the records (within date range) and adjust the count of trailers based on the record. Eventually it returns a dictionary of all counts per month.
Asynchronous Call
Sometimes it helps to make asynchronous calls. I used asyncio in one of the implementations to see how response changes. Database has records from 2011 all the way to 2019. So, I split the date into ranges of one year. If I send the start and end dates as 2011-01-01T00:00:00.000Z and 2020-01-01T00:00:00.000Z respectively, I will get back a list of all dates split in blocks of a year.
For asynchronous call, there are different ways of gathering the response. Each of these equate to a separate design pattern. One of the most common way of getting the response back is to wait for the response. The other way asyncio can send response is through a queue. Normally we use queues when message producers are discrete. These producers may not know each other and initiated at different places.
Here is an example for normal asynchronous call.
async def fetch_data(self, start_date, end_date, num): dct = {} client = None try: print('Starting task [%d]' % num) client, conn = MongoCommon.get_db_connection('mongodb://localhost:27017', 'truckies') col = conn['workitems'] for x in col.find({ 'tripPlans.0.estimatedArrival': { '$gte': start_date, '$lt': end_date }}): idx = '%04d%02d' % ( (x['tripPlans'][0]['estimatedArrival']).year, (x['tripPlans'][0]['estimatedArrival']).month) cnt_trailers = x['trailerCount'] if idx in dct: dct[idx] = dct[idx] + cnt_trailers else: dct[idx] = cnt_trailers except Exception as e: print(e) if client: MongoCommon.close_connection(client) return dct async def fetch_data_as(self, start_date, end_date): processes = [] dtarr = DateUtils.split_date(start_date, end_date) cnt = 0 for arr in dtarr: cnt += 1 p = asyncio.create_task(self.fetch_data(arr[0], arr[1], cnt)) processes.append(p) res = await asyncio.gather(*processes) return res
The main loop still looks like the single threaded option. What we are doing in here is to call the single threaded option asynchronously without waiting for the previous to finish. Line 34 waits for all tasks to complete, gather all responses and finally send it back.
Asynchronous Call – Queue
For our next sample, I am going to still use the same code as above. Additionally instead of sending the responses and waiting for it, I will put them into a queue.
async def fetch_data(self, start_date, end_date, que, num): dct = {} client = None try: print('Starting task [%d]' % num) client, conn = MongoCommon.get_db_connection('mongodb://localhost:27017', 'truckies') col = conn['workitems'] for x in col.find({ 'tripPlans.0.estimatedArrival': { '$gte': start_date, '$lt': end_date }}): idx = '%04d%02d' % ( (x['tripPlans'][0]['estimatedArrival']).year, (x['tripPlans'][0]['estimatedArrival']).month) cnt_trailers = x['trailerCount'] if idx in dct: dct[idx] = dct[idx] + cnt_trailers else: dct[idx] = cnt_trailers await que.put(dct) que.task_done() except Exception as e: print(e) if client: MongoCommon.close_connection(client) async def fetch_data_as(self, start_date, end_date): processes = [] queue = asyncio.Queue() dtarr = DateUtils.split_date(start_date, end_date) cnt = 0 for arr in dtarr: cnt += 1 p = asyncio.create_task(self.fetch_data(arr[0], arr[1], queue, cnt)) processes.append(p) await asyncio.gather(*processes) await queue.join() cfind = [] while not queue.empty(): cfind.append(await queue.get()) return cfind
We are creating a queue on line 29 above. This queue will help in accumulating the results. One line 37 we start waiting for all queues to finish. Eventually when all tasks are complete, we select data from queue and add it to our response. The result looks the same as previous implementation.
Multi Threading
Next we discuss the multi threading approach. In this case we create the threads and wait for them to complete. In our example, we use a ThreadPoolExecutor. This allows us to control the number of threads that we want to start at any time. We are creating a pool of thread and assigning tasks to each of them. Additional tasks submitted will wait for previous tasks to complete.
def fetch_data(self, start_date, end_date, num): dct = {} client = None try: print('Starting thread [%d]' % num) client, conn = MongoCommon.get_db_connection('mongodb://localhost:27017', 'truckies') col = conn['workitems'] for x in col.find({ 'tripPlans.0.estimatedArrival': { '$gte': start_date, '$lt': end_date }}): idx = '%04d%02d' % ( (x['tripPlans'][0]['estimatedArrival']).year, (x['tripPlans'][0]['estimatedArrival']).month) cnt_trailers = x['trailerCount'] if idx in dct: dct[idx] = dct[idx] + cnt_trailers else: dct[idx] = cnt_trailers except Exception as e: print(e) if client: MongoCommon.close_connection(client) return dct def fetch_data_mt(self, start_date, end_date): cfind = [] dtarr = DateUtils.split_date(start_date, end_date) with ThreadPoolExecutor(max_workers=5) as etor: futures = [] cnt = 0 for arr in dtarr: cnt += 1 futures.append(etor.submit(self.fetch_data, start_date=arr[0], end_date=arr[1], num=cnt)) for future in as_completed(futures): cfind.append(future.result()) return cfind
Multi threaded program looks very much like async events. We created a Threadpool on line 30 and started to submit individual tasks. Finally, we wait for the threads to complete and gather all responses on line 37.
Multi Processing
Multi processing depends on availability of multiple processors. Python provides multiprocessing library to work with multiprocessors. Provided below is a equivalent program that uses multiple processors.
def fetch_data(self, start_date, end_date, que, num): dct = {} client = None try: print('Starting process [%d]' % num) client, conn = MongoCommon.get_db_connection('mongodb://localhost:27017', 'truckies') col = conn['workitems'] for x in col.find({ 'tripPlans.0.estimatedArrival': { '$gte': start_date, '$lt': end_date }}): idx = '%04d%02d' % ( (x['tripPlans'][0]['estimatedArrival']).year, (x['tripPlans'][0]['estimatedArrival']).month) cnt_trailers = x['trailerCount'] if idx in dct: dct[idx] = dct[idx] + cnt_trailers else: dct[idx] = cnt_trailers que.put(dct) except Exception as e: print(e) if client: MongoCommon.close_connection(client) def fetch_data_mp(self, start_date, end_date): processes = [] queue = Queue() dtarr = DateUtils.split_date(start_date, end_date) cnt = 0 for arr in dtarr: cnt += 1 p = Process(target=self.fetch_data, args=(arr[0], arr[1], queue, cnt)) processes.append(p) p.start() for proc in processes: proc.join() cfind = [] while not queue.empty(): cfind.append(queue.get()) return cfind
In this case, we create a queue on line 28 to hold all responses. Eventually we create independent processes on line 33 and submit our tasks to it. All processes publish their result to the queue. Finally on line 40 we accumulate all results.
Summary
All of the methods above are ways of getting better performance. Eventually you will need to use one or more of these based on the requirement. This will not be complete without the performance data. So here goes on my test Linux box:
Single Threaded: 18.82s Asynchronous: 29.24s Asynchrounous (Queued): 29.95s Multi Threaded: 27.67s Multi Processing: 7.65s
It may be odd to see that multi threaded programs took more time than single threaded. However, it needs additional thread management by the O/S. So a multi threaded program may not always be faster. For this sample test, multi processing yielded less than half the time needed by other methods.
Hope this reference helps you. Ciao for now!