Tutorial Material

Python Async Await

Share to

Asynchronous programming with async/await is a programming paradigm that allows you to execute operations non-blocking. This is very useful for applications that perform many I/O operations such as HTTP requests, database access, or file reading.

Why Asynchronous?

In traditional synchronous programming:

# Synchronous - waiting one by one
result1 = fetch_data_from_api()      # Wait 2 seconds
result2 = fetch_data_from_database() # Wait 2 seconds
result3 = read_large_file()          # Wait 2 seconds
# Total: 6 seconds

With asynchronous:

# Asynchronous - running concurrently
result1, result2, result3 = await asyncio.gather(
    fetch_data_from_api(),
    fetch_data_from_database(),
    read_large_file()
)
# Total: ~2 seconds (parallel)

Basic Concepts

Coroutine

A function defined with async def is called a coroutine:

import asyncio

# This is a coroutine
async def greeting():
    print("Hello!")
    return "Done"

# Executing coroutine
asyncio.run(greeting())

await

await is used to wait for the result of a coroutine or an async operation:

import asyncio

async def long_process():
    print("Start process...")
    await asyncio.sleep(2)  # Simulate async operation
    print("Process finished!")
    return "Result"

async def main():
    result = await long_process()
    print(f"Got: {result}")

asyncio.run(main())

Running Coroutines

There are several ways to run a coroutine:

import asyncio

async def hello():
    await asyncio.sleep(1)
    return "Hello!"

# Method 1: asyncio.run() - for standalone script
if __name__ == "__main__":
    result = asyncio.run(hello())
    print(result)

# Method 2: await - from inside another coroutine
async def main():
    result = await hello()
    print(result)

Running Tasks Concurrently

asyncio.gather()

Executes multiple coroutines concurrently:

import asyncio

async def download_file(name: str, duration: int) -> str:
    print(f"Start download {name}...")
    await asyncio.sleep(duration)
    print(f"Finished download {name}")
    return f"{name} downloaded"

async def main():
    # Run all concurrently
    results = await asyncio.gather(
        download_file("file1.txt", 2),
        download_file("file2.txt", 3),
        download_file("file3.txt", 1),
    )
    print(f"All results: {results}")

asyncio.run(main())
# Output:
# Start download file1.txt...
# Start download file2.txt...
# Start download file3.txt...
# Finished download file3.txt (after 1 second)
# Finished download file1.txt (after 2 seconds)
# Finished download file2.txt (after 3 seconds)
# Total time: ~3 seconds (not 6 seconds)

asyncio.create_task()

Create a task running in background:

import asyncio

async def background_task():
    while True:
        print("Background task running...")
        await asyncio.sleep(1)

async def main():
    # Create task (not executed immediately)
    task = asyncio.create_task(background_task())
    
    # Do something else
    await asyncio.sleep(3)
    
    # Cancel task
    task.cancel()
    print("Task cancelled")

asyncio.run(main())

Async Context Manager

For resource management with async:

import asyncio

class AsyncDatabaseConnection:
    async def __aenter__(self):
        print("Opening database connection...")
        await asyncio.sleep(1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.5)
    
    async def query(self, sql: str) -> list:
        await asyncio.sleep(0.5)
        return ["result1", "result2"]

async def main():
    async with AsyncDatabaseConnection() as db:
        result = await db.query("SELECT * FROM users")
        print(f"Query result: {result}")

asyncio.run(main())

Async Iterator

For async iteration:

import asyncio

class AsyncCounter:
    def __init__(self, max_count: int):
        self.max_count = max_count
        self.current = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.max_count:
            raise StopAsyncIteration
        await asyncio.sleep(0.5)
        self.current += 1
        return self.current

async def main():
    async for num in AsyncCounter(5):
        print(f"Count: {num}")

asyncio.run(main())

Practical Example: Async HTTP Requests

Using aiohttp library for async HTTP requests:

import asyncio
import aiohttp

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def main():
    urls = [
        "https://api.github.com/users/python",
        "https://api.github.com/users/django",
        "https://api.github.com/users/fastapi",
    ]
    
    async with aiohttp.ClientSession() as session:
        # Fetch all URLs concurrently
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for result in results:
            print(f"User: {result.get('login')}")

# Install first: pip install aiohttp
asyncio.run(main())

In here we use aiohttp (because standard requests does not support async). We create one ClientSession for efficiency, then create list of tasks (tasks) for each URL. asyncio.gather(*tasks) then executes all those requests at once. Imagine if you have to fetch data from 100 URLs; this way will be much faster than fetching them one by one.

Timeout and Error Handling

import asyncio

async def long_operation():
    await asyncio.sleep(10)
    return "Done"

async def main():
    try:
        # Set timeout 2 seconds
        result = await asyncio.wait_for(long_operation(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timeout!")

asyncio.run(main())

Semaphore for Rate Limiting

Limiting number of concurrent operations:

import asyncio

async def download(semaphore: asyncio.Semaphore, url: str):
    async with semaphore:  # Only N concurrent requests
        print(f"Downloading {url}...")
        await asyncio.sleep(2)
        print(f"Finished {url}")
        return url

async def main():
    # Max 3 concurrent downloads
    semaphore = asyncio.Semaphore(3)
    
    urls = [f"file_{i}.txt" for i in range(10)]
    tasks = [download(semaphore, url) for url in urls]
    
    await asyncio.gather(*tasks)

asyncio.run(main())

Best Practices

  1. Use async for I/O bound operations - HTTP requests, database, file I/O
  2. Do not use for CPU bound - Use multiprocessing for heavy calculations
  3. Always await coroutine - Don't forget await, or coroutine won't run
  4. Use asyncio.gather() for parallel - More efficient than sequential await
  5. Handle exceptions well - Use try/except inside coroutine

When to Use Async?

Use async when:

Don't use async when:


Edit this tutorial

Belajarpython x DevMode Community
OFFICIAL COMMUNITY

Gabung Komunitas Developer & Kreator Digital

Dapatkan teman coding, sharing project, networking dengan expert, dan update teknologi terbaru.

Bebas spam. Unsubscribe kapan saja. DEVMODE Community