Simulating a producer-consumer scenario with asyncio queues in Python
Write a Python program that uses asyncio queues to simulate a producer-consumer scenario with multiple producers and a single consumer.
Sample Solution:
Code:
import asyncio
import random
async def producer(queue, id):
for i in range(3):
item = f"Item: {id}-{i}"
await queue.put(item)
print(f"Producer {id} produced-> {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumer consumed {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
producers = [asyncio.create_task(producer(queue, i)) for i in range(3)]
consumer_task = asyncio.create_task(consumer(queue))
await asyncio.gather(*producers)
await queue.join()
await queue.put(None) # Signal the consumer to stop
await consumer_task
# Run the event loop
asyncio.run(main())
Output:
Producer 0 produced-> Item: 0-0 Producer 1 produced-> Item: 1-0 Producer 2 produced-> Item: 2-0 Consumer consumed Item: 0-0 Consumer consumed Item: 1-0 Consumer consumed Item: 2-0 Producer 1 produced-> Item: 1-1 Consumer consumed Item: 1-1 Producer 0 produced-> Item: 0-1 Consumer consumed Item: 0-1 Producer 2 produced-> Item: 2-1 Consumer consumed Item: 2-1 Producer 1 produced-> Item: 1-2 Consumer consumed Item: 1-2 Producer 0 produced-> Item: 0-2 Consumer consumed Item: 0-2 Producer 2 produced-> Item: 2-2 Consumer consumed Item: 2-2
Explanation:
In the above exercise -
- The "producer()" coroutine simulates a producer that adds items to the queue. Each producer produces 3 items with an associated ID.
- The "consumer()" coroutine simulates a single consumer that consumes items from the queue. It runs in an infinite loop, consuming items until a None value is encountered, indicating it should stop.
- The "main()" coroutine sets up the event loop and the queue. It creates three producer tasks and one consumer task using asyncio.create_task().
- The 'producers' produce items and add them to the queue asynchronously. The "consumer()" consumes items from the queue as they become available.
- After all 'producers' have finished producing, the program uses await queue.join() to wait until all items in the queue are processed.
- To signal the consumer to stop, the program sends 'None' to the queue, and then waits for the consumer to complete its task.
- The event loop is started using asyncio.run(main()).
When we run this program, the producers adding items to the queue and the 'consumer' consuming them.
Flowchart:
Previous: Handling task cancellation with asyncio.CancelledError in Python.
What is the difficulty level of this exercise?
Test your Programming skills with w3resource's quiz.
- Weekly Trends and Language Statistics
- Weekly Trends and Language Statistics