w3resource

Setting timeouts for asynchronous operations in Python using asyncio.wait_for()


Write a Python program that implements a timeout for an asynchronous operation using asyncio.wait_for().

Sample Solution:

Code:

import asyncio
import time

async def time_consuming_task(duration):
    print(f'Starting long operation for {duration} seconds...')
    await asyncio.sleep(duration)
    return f'Long operation completed in {duration} seconds'
async def main():
    timeout =3
    try:
        result = await asyncio.wait_for(time_consuming_task(8), timeout)
        print(result)
    except asyncio.TimeoutError:
        print(f'Timeout occurred after waiting for {timeout} seconds')
asyncio.run(main())

Output:

Starting long operation for 8 seconds...
Timeout occurred after waiting for 3 seconds

Explanation:

In the above exercise -

  • asyncio.wait_for() takes an awaitable (coroutine) and a timeout in seconds.
  • It waits for the coroutine to complete or raises asyncio.TimeoutError if timeout is exceeded.
  • The program await the wait_for() call to perform the wait.
  • The try/except block catches the potential TimeoutError.
  • If timeout occurs, we handle it accordingly in the except block.
  • If coroutine completes within timeout, result is returned as normal.

Flowchart:

Flowchart: Setting timeouts for asynchronous operations in Python using asyncio.wait_for().

Previous: Handling task cancellation with asyncio.CancelledError in Python.
Next: Simulating a producer-consumer scenario with asyncio queues in Python.

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.