ARTICLE AD BOX
I am writing a Python consumer for RabbitMQ Streams using an asynchronous library. I want to save the message offset to the stream server when the script is stopped (via SIGINT/Ctrl+C).
However, the store_offset call never seems to complete. I see the "Saving" log, but never the "Saved" log.
async def consume(self, callback, subscriber_name=None, offset_type: OffsetType = OffsetType.FIRST): # ----------------------------------------- # WRAPPED CALLBACK # ----------------------------------------- async def wrapped(msg: AMQPMessage, context: MessageContext): if not msg: print(f"[WARNING] Empty message at offset {context.offset}") return start = msg.find(b"{") if start == -1: print(f"[WARNING] No JSON found at offset {context.offset}: {msg}") return raw_json = msg[start:] try: data = json.loads(raw_json.decode()) except Exception as e: print(f"[WARNING] Failed JSON decode at offset {context.offset}: {e}") return print(f"[Consumer] Received → {data}") print(f"Offset of the data in the stream: {context.offset}") # Store last offset for shutdown self.last_offset = context.offset if asyncio.iscoroutinefunction(callback): await callback(data, context) else: callback(data, context) # ----------------------------------------- # Convert offset # ----------------------------------------- await self.connect_consumer() stored_offset = None if offset_type == OffsetType.OFFSET: try: stored_offset = await self.consumer.query_offset( self.stream_name, subscriber_name=subscriber_name ) print(f"Last stored offset for subscriber {subscriber_name}: {stored_offset}") stored_offset+=1 #increase the offset to next value so that it will start from the new data except OffsetNotFound: offset_type = OffsetType.FIRST print("No stored offset found. Starting from beginning.") offset_spec = self._convert_offset(offset_type, stored_offset) await self.consumer.subscribe( stream=self.stream_name, subscriber_name=subscriber_name, offset_specification=offset_spec, callback=wrapped, ) print(f"[Consumer] Starting stream={self.stream_name}") print(f"[Consumer] Subscribed with the name: {subscriber_name}") # ----------------------------- # Register signal handlers # ----------------------------- loop = asyncio.get_running_loop() loop.add_signal_handler( signal.SIGINT, lambda: asyncio.create_task(self.stop_consume(subscriber_name)) ) loop.add_signal_handler( signal.SIGTERM, lambda: asyncio.create_task(self.stop_consume(subscriber_name)) ) await self.consumer.run() # ===================================================== # STOP CONSUMER # ===================================================== async def stop_consume(self, subscriber_name=None): print("\n[Shutdown] Stopping consumer...") try: await self.consumer.stop() except: pass if self.last_offset is not None: print(f"[Shutdown] Saving last offset: {self.last_offset}") await self.consumer.store_offset( stream=self.stream_name, subscriber_name=subscriber_name, offset=self.last_offset ) print("[Shutdown] Offset saved.") else: print("[Shutdown] No offset to save.") print("[Shutdown] Exiting now.") loop = asyncio.get_running_loop() loop.stop()What I've observed: It seems that when await self.consumer.stop() is called, the main await self.consumer.run() finishes. Because the main coroutine is done, the event loop shuts down before the background task created by the signal handler can finish the network I/O required for store_offset.
My Question: What is the best practice to "hold" the event loop open until the cleanup tasks in my signal handler are fully completed and also say that how this flow is working?
