Handling task cancellation with asyncio.CancelledError in Python
Write a Python program to create a coroutine that simulates a time-consuming task and use asyncio.CancelledError to handle task cancellation.
Sample Solution:
Code:
import asyncio
import random
async def time_consuming_task():
print('Time-consuming task started...')
try:
for i in range(1, 6):
await asyncio.sleep(random.randint(1,5))
print(f'Step {i} completed')
except asyncio.CancelledError:
print('Time consuming task was cancelled')
raise
async def main():
task = asyncio.create_task(time_consuming_task())
await asyncio.sleep(random.randint(1,3))
task.cancel()
try:
await task
except asyncio.CancelledError:
print('Main coroutine caught task cancellation!')
asyncio.run(main())
Output:
Time-consuming task started... Time consuming task was cancelled Main coroutine caught task cancellation!
Explanation:
The above code creates an async coroutine `time_consuming_task` that simulates work by sleeping for random intervals. It uses a try/except block to catch asyncio.CancelledError if the task is cancelled.
The main() coroutine creates the task, waits a random amount of time, cancels it, and uses another try/except to catch the cancellation.
Flowchart:
Previous: Measuring total execution time of concurrent asynchronous tasks in Python.
Next: Setting timeouts for asynchronous operations in Python using asyncio.wait_for().
What is the difficulty level of this exercise?
Test your Programming skills with w3resource's quiz.
- Weekly Trends and Language Statistics
- Weekly Trends and Language Statistics