`multiprocessing.JoinableQueue.empty()` seems badly broken

2 days ago 2
ARTICLE AD BOX

Short version. I'm aware of the fact that multiprocess.Queue.empty() is unreliable. This makes sense to me. I understand that the actions of one thread may take a moment to be visible to another thread. The documentation makes it clear that:

After putting an object on an empty queue there may be an infinitesimal delay before the queue's empty() method returns False. . . .

I can live with this. But in my code, I'm seeing ueue.empty() returning True even when the queue has several hundred elements in it.

To prove my case, I added a shared variable task_count to my code, incremented it (with a lock) on put(), and decremented it on get(). It's value is zero at the beginning and end of my code, so I believe I have it implemented accurately. I'm using MacOS, so queue.count() doesn't work. I can plainly see that there are multiple tasks on my queue even when it claims to be empty.

My code is an implementation of a recursive algorithm that uses the task queue to keep track of pending work. Initially there is one task on the task queue, but that task creates additional tasks.

class Worker(multiprocessing.Process): # task_queue is a JoinableQueue. def __init__(task_queue, result_queue, task_queue_count): self.task_queue = task_queue self.result_queue = result_queue self.task_queue_count = task_queue_count def run(self): info = self.task_queue.get() # Following is for debugging, only with self.task_queue_count.get_lock(): self.task_queue_count.value -= 1 if info is None: # Poison pill for shutdown self.task_queue.task_done() break self.handle_one_task(info) self.task_queue.task_done() def handle_one_task(self, info): local_queue = deque([info]) while local_queue: current_index, task = local_queue.popleft() results = code_not_shown_here(current_index, task) if not results: continue if current_index + 1 == MAX_RECURSION_DEPTH # <write results to result_queue> continue for result in results: if len(local_queue) < MAX_LOCAL_QUEUE_SIZE: local_queue.append((current_index + 1), result) else: self.task_queue.put((current_index + 1), result) # following is for debugging only with self.task_queue_count.get_lock(): self.task_queue_count.value += 1

The code uses a local queue to avoid excessive amounts of movement to and from the task queue. if the size of results is small, it can emulate tail recursion. However when I actually checked for self.task_queue.empty(), I'd get True when the self.task_queue_count.value was large.

I was trying to code such that if the task queue was empty, the final loop would put all of its results on the task queue rather than try to handle them locally. There may be other threads waiting to do its work.

Read Entire Article