OS: Ubuntu
Python version: (e.g., Python 3.13.1)
Package version: 0.51b0
Aiokafka[lz4] version: 0.12.0
we have a memory leakage caused by asyncio when using AIOKafkaConsumer in our fastapi app, exactly as documented in aiokafka documentation.
we do think that using getone() within a while loop instead of anext solves that issue but we want to follow aiokafka best practices.
add the following code as part of the fastapi app startup lifespan:
from aiokafka import AIOKafkaConsumer import asyncio async def consume(): consumer = AIOKafkaConsumer( 'my_topic', 'my_other_topic', bootstrap_servers='localhost:9092', group_id="my-group") # Get cluster layout and join group `my-group` await consumer.start() try: # Consume messages async for msg in consumer: print("consumed: ", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() asyncio.run(consume())
then trigger the consume once with a message, and the memory will scale up exponentially.
you can check the heap using guppy3 and tracemalloc. the best way is just to measure the memory utilization of the process.
Expected Resultstable memory utilization when using aiokafka's best practices.
Actual Resultexponentially increasing memory utilization.
Additional contextNo response
Would you like to implement a fix?No
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4