w3resource

Handling task cancellation with asyncio.CancelledError in Python


Write a Python program to create a coroutine that simulates a time-consuming task and use asyncio.CancelledError to handle task cancellation.

Sample Solution:

Code:

import asyncio
import random
async def time_consuming_task():
    print('Time-consuming task started...')
    try:
        for i in range(1, 6):
            await asyncio.sleep(random.randint(1,5))
            print(f'Step {i} completed')
    except asyncio.CancelledError:
        print('Time consuming task was cancelled')
        raise
async def main():
    task = asyncio.create_task(time_consuming_task())
    await asyncio.sleep(random.randint(1,3))    
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print('Main coroutine caught task cancellation!')
asyncio.run(main())

Output:

Time-consuming task started...
Time consuming task was cancelled
Main coroutine caught task cancellation!

Explanation:

The above code creates an async coroutine `time_consuming_task` that simulates work by sleeping for random intervals. It uses a try/except block to catch asyncio.CancelledError if the task is cancelled.

The main() coroutine creates the task, waits a random amount of time, cancels it, and uses another try/except to catch the cancellation.

Flowchart:

Flowchart: Handling task cancellation with asyncio.CancelledError in Python.

Previous: Measuring total execution time of concurrent asynchronous tasks in Python.
Next: Setting timeouts for asynchronous operations in Python using asyncio.wait_for().

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.