multiprocessing.JoinableQueue.empty() seems broken

2 days ago 2
ARTICLE AD BOX

[I am re-asking a question that was just closed. I have now including code that fails.]

Short version. I'm aware of the fact that multiprocess.Queue.empty() is unreliable. This makes sense to me. I understand that the actions of one thread may take a moment to be visible to another thread. The documentation makes it clear that:

After putting an object on an empty queue there may be an infinitesimal delay before the queue's empty() method returns False. . . .

I can live with this. But in my code, I'm seeing queue.empty() returning True even when the queue has many elements.

This program is a fake piece of recursion. The task queue contains integers indicating the current recursion depth.

For depth 0, we create 500 items of depth 1. For depth less than MAX_RECURSION_DEPTH, we create a random number of tasks of depth + 1; we use a Pareto distribution so that the numbers are mostly small and get truncated to 0, but occasional large numbers sneak for. For depth MAX_RECURSION_DEPTH, we do nothing.

import multiprocessing import queue from collections import deque import numpy as np START_DEPTH = 0 MAX_RECURSION_DEPTH = 5 MAX_LOCAL_QUEUE_SIZE = 1000 WORKER_COUNT = 4 def run_workers(): task_queue = multiprocessing.JoinableQueue(MAX_LOCAL_QUEUE_SIZE) task_count = multiprocessing.Value("L") print_lock = multiprocessing.Lock() task_queue.put(START_DEPTH) task_count.value = 1 workers = [Worker(task_queue, task_count, print_lock) for i in range(WORKER_COUNT)] for worker in workers: worker.start() task_queue.join() for worker in workers: worker.kill() class Worker(multiprocessing.Process): # task_queue is a JoinableQueue. def __init__(self, task_queue, task_queue_count, print_lock): super().__init__() self.task_queue = task_queue self.task_queue_count = task_queue_count self.print_lock = print_lock def run(self): while True: depth = self.task_queue.get() # Following is for debugging, only with self.task_queue_count.get_lock(): self.task_queue_count.value -= 1 self.handle_one_task(depth) self.task_queue.task_done() def handle_one_task(self, depth): with self.print_lock: print(f'{self.name} {depth=} qs={self.task_queue_count.value}') local_queue = deque([depth]) while local_queue: current_index = local_queue.popleft() if current_index == START_DEPTH: results_count = 50 else: # usually a small number, but it has a large tail results_count = int(np.random.pareto(1)) if current_index + 1 == MAX_RECURSION_DEPTH: # <write results to result_queue. Not needed here> continue # Print a message if task_queue_empty() is true, yet there appear to # be items in the task queue. if self.task_queue.empty(): temp = self.task_queue_count.value if temp > 0: with self.print_lock: print(f"{self.name} Queue appears empty qs={temp}") task_queue_full = False # I don't trust task_queue.full() write_count = 0 for _ in range(results_count): # At the START_DEPTH, there is only a single task, so we want to get # other processes started. Otherwise, we prefer to write to the # local queue if its not full. if current_index == START_DEPTH or (len(local_queue) >= MAX_LOCAL_QUEUE_SIZE and not task_queue_full): try: self.task_queue.put(current_index + 1, False) with self.task_queue_count.get_lock(): self.task_queue_count.value += 1 write_count += 1 continue except queue.Full: task_queue_full = True # fall through to writing to local queue local_queue.append(current_index + 1) if write_count > 0: with self.print_lock: print(f'{self.name} {depth=} {write_count=} qs={self.task_queue_count.value}') if __name__ == '__main__': run_workers()

I currently prefer writing to the local queue. But if the task queue is empty (which I know it is when current_index == START_DEPTH and may also be at other times), then I'd like to give some work to the other processes. I was hoping to use self.task_queue.empty() for both cases.

Unfortunately, when I run this code, I see output like:

Worker-2 Queue appears empty qs=14 Worker-4 Queue appears empty qs=14 Worker-2 Queue appears empty qs=14 Worker-1 depth=4 qs=16 Worker-4 Queue appears empty qs=19 Worker-2 Queue appears empty qs=24 Worker-4 Queue appears empty qs=30

This isn't an "infinitesimal delay".

Note that I'm running under MacOS, so that queue.qsize() is not implemented.

Read Entire Article