user3543300 user3543300 - 3 months ago 46
Python Question

How to optimize multiprocessing in Python

EDIT 2: Haven't had the opportunity to test out all the answers, but danny's seemed to work and offered an explanation as to why my initial approach may not have worked. guillaume.deslandes answer also seemed to work and utilized a similar approach to my initial code, but I'm not sure as to what "problem" it corrects in my initial code. Haven't had the opportunity to extensively test either, so I can't say for certain which works best, but the bounty deadline is approaching and I cannot split it, so I offered the bounty to danny and marked guillaume.deslandes as correct.

EDIT:
I've had questions about what the video stream is, so I will offer more clarity. The stream is a live video feed from my webcam, accessed via OpenCV. I get each frame as the camera reads it, and send it to a separate process for processing. The process returns text based on computations done on the image. The text is then displayed onto the image. I need to display the stream in realtime, and it is ok if there is a lag between the text and the video being shown (i.e. if the text was applicable to a previous frame, that's ok).


Perhaps an easier way to think of this is that I'm doing image recognition on what the webcam sees. I send one frame at a time to a separate process to do recognition analysis on the frame, and send the text back to be put as a caption on the live feed. Obviously the processing takes more time than simply grabbing frames from the webcam and showing them, so if there is a delay in what the caption is and what the webcam feed shows, that's acceptable and expected.

What's happening now is that the live video I'm displaying is lagging due to the other processes (when I don't send frames to the process for computing, there is no lag). I've also ensured only one frame is enqueued at a time so avoid overloading the queue and causing lag. I've updated the code below to reflect this detail.

I'm using the multiprocessing module in python to help speed up my main program. However I believe I might be doing something incorrectly, as I don't think the computations are happening quite in parallel.

I want my program to read in images from a video stream in the main process, and pass on the frames to two child processes that do computations on them and send text back (containing the results of the computations) to the main process.

However, the main process seems to lag when I use multiprocessing, running about half as fast as without it, leading me to believe that the processes aren't running completely in parallel.

After doing some research, I surmised that the lag may have been due to communicating between the processes using a queue (passing an image from the main to the child, and passing back text from child to main).

However I commented out the computational step and just had the main process pass an image and the child return blank text, and in this case, the main process did not slow down at all. It ran at full speed.

Thus I believe that either

1) I am not optimally using multiprocessing

OR

2) These processes cannot truly be run in parallel (I would understand a little lag, but it's slowing the main process down in half).

Here's a outline of my code. There is only one consumer instead of 2, but both consumers are nearly identical. If anyone could offer guidance, I would appreciate it.

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
#other initialization stuff

def run(self):
while True:
image = self.task_queue.get()
#Do computations on image
self.result_queue.put("text")

return

import cv2

tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()

#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
rval, frame = vc.read()
else:
rval = False

while rval:
if tasks.empty():
tasks.put(image)
else:
text = tasks.get()
#Add text to frame
cv2.putText(frame,text)

#Showing the frame with all the applied modifications
cv2.imshow("preview", frame)

#Getting next frame from camera
rval, frame = vc.read()

Answer

(Updated solution based on you last code sample)

It will get images from the stream, put one in the task queue as soon as it is available, and display the last image with the last text.

I put some active loop in there to simulate a processing longer than the time between two images. I means that the text displayed is not necessarily the one belonging to the image, but the last one computed. If the processing is fast enough, the shift between image and text should be limited.

Note that I force calls to get/put with some try/catch. Per the doc, empty and full are not 100% accurate.

import cv2
import multiprocessing
import random
from time import sleep

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        # Other initialization stuff

    def run(self):
        while True:
            frameNum, frameData = self.task_queue.get()
            # Do computations on image
            # Simulate a processing longer than image fetching
            m = random.randint(0, 1000000)
            while m >= 0:
                m -= 1
            # Put result in queue
            self.result_queue.put("result from image " + str(frameNum))

        return

# No more than one pending task
tasks = multiprocessing.Queue(1)
results = multiprocessing.Queue()
# Init and start consumer
consumer = Consumer(tasks,results)
consumer.start()

#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
    rval, frame = vc.read()
    frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)
else:
    rval = False

# Dummy int to represent frame number for display
frameNum = 0
# String for result
text = None

font = cv2.FONT_HERSHEY_SIMPLEX

# Process loop
while rval:
    # Grab image from stream
    frameNum += 1
    # Put image in task queue if empty
    try:
        tasks.put_nowait((frameNum, frame))
    except:
        pass
    # Get result if ready
    try:
        # Use this if processing is fast enough
        # text = results.get(timeout=0.4)
        # Use this to prefer smooth display over frame/text shift
        text = results.get_nowait()
    except:
        pass

    # Add last available text to last image and display
    print("display:", frameNum, "|", text)
    # Showing the frame with all the applied modifications
    cv2.putText(frame,text,(10,25), font, 1,(255,0,0),2)
    cv2.imshow("preview", frame)
    # Getting next frame from camera
    rval, frame = vc.read()
    # Optional image resize
    # frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)

Here is some output, you can see the delay between image and result, and the result catching back.

> ('display:', 493, '|', 'result from image 483')
> ('display:', 494, '|', 'result from image 483')
> ('display:', 495, '|', 'result from image 489')
> ('display:', 496, '|', 'result from image 490')
> ('display:', 497, '|', 'result from image 495')
> ('display:', 498, '|', 'result from image 496')
Comments