w3resource

Simulating a producer-consumer scenario with asyncio queues in Python


Write a Python program that uses asyncio queues to simulate a producer-consumer scenario with multiple producers and a single consumer.

Sample Solution:

Code:

import asyncio
import random
async def producer(queue, id):
    for i in range(3):
        item = f"Item: {id}-{i}"
        await queue.put(item)
        print(f"Producer {id} produced-> {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Consumer consumed {item}")
        queue.task_done()
async def main():
    queue = asyncio.Queue()
    producers = [asyncio.create_task(producer(queue, i)) for i in range(3)]
    consumer_task = asyncio.create_task(consumer(queue))
    await asyncio.gather(*producers)
    await queue.join()
    await queue.put(None)  # Signal the consumer to stop
    await consumer_task
# Run the event loop
asyncio.run(main())

Output:

Producer 0 produced-> Item: 0-0
Producer 1 produced-> Item: 1-0
Producer 2 produced-> Item: 2-0
Consumer consumed Item: 0-0
Consumer consumed Item: 1-0
Consumer consumed Item: 2-0
Producer 1 produced-> Item: 1-1
Consumer consumed Item: 1-1
Producer 0 produced-> Item: 0-1
Consumer consumed Item: 0-1
Producer 2 produced-> Item: 2-1
Consumer consumed Item: 2-1
Producer 1 produced-> Item: 1-2
Consumer consumed Item: 1-2
Producer 0 produced-> Item: 0-2
Consumer consumed Item: 0-2
Producer 2 produced-> Item: 2-2
Consumer consumed Item: 2-2

Explanation:

In the above exercise -

  • The "producer()" coroutine simulates a producer that adds items to the queue. Each producer produces 3 items with an associated ID.
  • The "consumer()" coroutine simulates a single consumer that consumes items from the queue. It runs in an infinite loop, consuming items until a None value is encountered, indicating it should stop.
  • The "main()" coroutine sets up the event loop and the queue. It creates three producer tasks and one consumer task using asyncio.create_task().
  • The 'producers' produce items and add them to the queue asynchronously. The "consumer()" consumes items from the queue as they become available.
  • After all 'producers' have finished producing, the program uses await queue.join() to wait until all items in the queue are processed.
  • To signal the consumer to stop, the program sends 'None' to the queue, and then waits for the consumer to complete its task.
  • The event loop is started using asyncio.run(main()).

When we run this program, the producers adding items to the queue and the 'consumer' consuming them.

Flowchart:

Flowchart: Simulating a producer-consumer scenario with asyncio queues in Python.

Previous: Handling task cancellation with asyncio.CancelledError in Python.

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.