Advanced Python Techniques: From Decorators to Async Programming
Dr. Sarah Kim

Master advanced Python concepts including decorators, context managers, metaclasses, and asynchronous programming for building robust applications.
Python’s elegance lies in its simplicity, but beneath the surface are powerful features that can transform how you write code. This guide explores advanced Python techniques that will elevate your programming skills and help you write more efficient, maintainable code.
Advanced Decorators
Function Decorators with Arguments
import functools
import time
from typing import Callable, Any, TypeVar, ParamSpec
P = ParamSpec('P')
T = TypeVar('T')
def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
"""Decorator that retries a function call with exponential backoff."""
def decorator(func: Callable[P, T]) -> Callable[P, T]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
attempts = 0
current_delay = delay
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempts += 1
if attempts == max_attempts:
raise e
print(f"Attempt {attempts} failed: {e}. Retrying in {current_delay}s...")
time.sleep(current_delay)
current_delay *= backoff
raise RuntimeError("This should never be reached")
return wrapper
return decorator
def cache_with_ttl(ttl_seconds: int = 300):
"""Decorator that caches function results with TTL."""
def decorator(func: Callable[P, T]) -> Callable[P, T]:
cache = {}
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
# Create cache key from arguments
key = str(args) + str(sorted(kwargs.items()))
current_time = time.time()
# Check if cached result exists and is still valid
if key in cache:
result, timestamp = cache[key]
if current_time - timestamp < ttl_seconds:
return result
# Call function and cache result
result = func(*args, **kwargs)
cache[key] = (result, current_time)
return result
return wrapper
return decorator
# Usage examples
@retry(max_attempts=3, delay=0.5, backoff=2.0)
@cache_with_ttl(ttl_seconds=60)
def fetch_data_from_api(url: str) -> dict:
"""Fetch data from API with retry and caching."""
import requests
response = requests.get(url)
response.raise_for_status()
return response.json()
Class Decorators
from dataclasses import dataclass
from typing import Type, TypeVar
T = TypeVar('T')
def singleton(cls: Type[T]) -> Type[T]:
"""Decorator that makes a class a singleton."""
instances = {}
def get_instance(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
def add_repr(cls: Type[T]) -> Type[T]:
"""Decorator that adds a __repr__ method to a class."""
def __repr__(self):
attrs = ', '.join(f'{k}={v!r}' for k, v in self.__dict__.items())
return f'{cls.__name__}({attrs})'
cls.__repr__ = __repr__
return cls
@singleton
@add_repr
class DatabaseConnection:
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.connected = False
def connect(self):
if not self.connected:
print(f"Connecting to {self.host}:{self.port}")
self.connected = True
# Usage
db1 = DatabaseConnection("localhost", 5432)
db2 = DatabaseConnection("localhost", 5432)
print(db1 is db2) # True - same instance
print(db1) # DatabaseConnection(host='localhost', port=5432, connected=False)
Context Managers
Custom Context Managers
import contextlib
import sqlite3
import tempfile
import os
from typing import Generator, Optional
from pathlib import Path
class DatabaseTransaction:
"""Context manager for database transactions with automatic rollback."""
def __init__(self, connection: sqlite3.Connection):
self.connection = connection
self.transaction = None
def __enter__(self) -> sqlite3.Connection:
self.transaction = self.connection.execute("BEGIN")
return self.connection
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.connection.commit()
print("Transaction committed")
else:
self.connection.rollback()
print(f"Transaction rolled back due to {exc_type.__name__}: {exc_val}")
return False # Don't suppress exceptions
@contextlib.contextmanager
def temporary_file(suffix: str = "", content: str = "") -> Generator[Path, None, None]:
"""Context manager for creating temporary files."""
fd, path = tempfile.mkstemp(suffix=suffix)
try:
if content:
with os.fdopen(fd, 'w') as f:
f.write(content)
else:
os.close(fd)
yield Path(path)
finally:
if os.path.exists(path):
os.unlink(path)
@contextlib.contextmanager
def timer(description: str = "Operation") -> Generator[None, None, None]:
"""Context manager for timing operations."""
start_time = time.time()
try:
yield
finally:
elapsed = time.time() - start_time
print(f"{description} took {elapsed:.4f} seconds")
# Usage examples
def example_usage():
# Database transaction
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (id INTEGER, name TEXT)")
try:
with DatabaseTransaction(conn):
conn.execute("INSERT INTO users VALUES (1, 'Alice')")
conn.execute("INSERT INTO users VALUES (2, 'Bob')")
# raise Exception("Something went wrong!") # Uncomment to test rollback
except Exception as e:
print(f"Handled exception: {e}")
# Temporary file
with temporary_file(suffix=".txt", content="Hello, World!") as temp_path:
print(f"Temporary file created: {temp_path}")
with open(temp_path, 'r') as f:
print(f"Content: {f.read()}")
# Timer
with timer("Heavy computation"):
time.sleep(1) # Simulate work
Metaclasses and Class Creation
Custom Metaclasses
from typing import Dict, Any, Type
class SingletonMeta(type):
"""Metaclass that creates singleton instances."""
_instances: Dict[Type, Any] = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class ValidatedMeta(type):
"""Metaclass that adds validation to class attributes."""
def __new__(mcs, name, bases, namespace, **kwargs):
# Add validation for required attributes
required_attrs = namespace.get('_required_attrs', [])
for attr in required_attrs:
if attr not in namespace:
raise ValueError(f"Class {name} must define attribute '{attr}'")
# Add type checking for annotated attributes
annotations = namespace.get('__annotations__', {})
for attr_name, attr_type in annotations.items():
if attr_name in namespace:
value = namespace[attr_name]
if not isinstance(value, attr_type):
raise TypeError(
f"Attribute '{attr_name}' must be of type {attr_type.__name__}, "
f"got {type(value).__name__}"
)
return super().__new__(mcs, name, bases, namespace)
class APIEndpoint(metaclass=ValidatedMeta):
"""Base class for API endpoints with validation."""
_required_attrs = ['path', 'method']
path: str
method: str
def __init__(self):
self.handlers = {}
def register_handler(self, handler):
self.handlers[self.method] = handler
class UserEndpoint(APIEndpoint):
path = "/api/users"
method = "GET"
def get_users(self):
return {"users": []}
# Example with __init_subclass__ (modern alternative to metaclasses)
class ConfigurableBase:
"""Base class using __init_subclass__ for configuration."""
def __init_subclass__(cls, config_required=True, **kwargs):
super().__init_subclass__(**kwargs)
if config_required and not hasattr(cls, 'config'):
raise ValueError(f"Class {cls.__name__} must define 'config' attribute")
# Auto-register subclasses
if not hasattr(cls, '_registry'):
cls._registry = {}
cls._registry[cls.__name__] = cls
class DatabaseModel(ConfigurableBase, config_required=True):
config = {
'table_name': 'base_model',
'primary_key': 'id'
}
class User(DatabaseModel):
config = {
'table_name': 'users',
'primary_key': 'id',
'fields': ['name', 'email', 'created_at']
}
Asynchronous Programming
Advanced Async Patterns
import asyncio
import aiohttp
import aiofiles
from typing import List, Dict, Any, Optional, Callable, Awaitable
from dataclasses import dataclass
from contextlib import asynccontextmanager
@dataclass
class TaskResult:
task_id: str
result: Any
error: Optional[Exception] = None
duration: float = 0.0
class AsyncTaskManager:
"""Advanced async task manager with concurrency control."""
def __init__(self, max_concurrent_tasks: int = 10):
self.max_concurrent_tasks = max_concurrent_tasks
self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
self.results: List[TaskResult] = []
async def execute_task(self, task_id: str, coro: Awaitable[Any]) -> TaskResult:
"""Execute a single task with timing and error handling."""
async with self.semaphore:
start_time = asyncio.get_event_loop().time()
try:
result = await coro
duration = asyncio.get_event_loop().time() - start_time
return TaskResult(task_id=task_id, result=result, duration=duration)
except Exception as e:
duration = asyncio.get_event_loop().time() - start_time
return TaskResult(task_id=task_id, result=None, error=e, duration=duration)
async def execute_batch(self, tasks: Dict[str, Awaitable[Any]]) -> List[TaskResult]:
"""Execute multiple tasks concurrently."""
task_coroutines = [
self.execute_task(task_id, coro)
for task_id, coro in tasks.items()
]
results = await asyncio.gather(*task_coroutines, return_exceptions=True)
self.results.extend(results)
return results
class AsyncHTTPClient:
"""Async HTTP client with connection pooling and retry logic."""
def __init__(self, timeout: int = 30, max_retries: int = 3):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_with_retry(self, url: str, **kwargs) -> Dict[str, Any]:
"""Fetch URL with exponential backoff retry."""
for attempt in range(self.max_retries):
try:
async with self.session.get(url, **kwargs) as response:
response.raise_for_status()
return await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == self.max_retries - 1:
raise e
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
async def fetch_multiple(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Fetch multiple URLs concurrently."""
tasks = {f"url_{i}": self.fetch_with_retry(url) for i, url in enumerate(urls)}
task_manager = AsyncTaskManager(max_concurrent_tasks=10)
results = await task_manager.execute_batch(tasks)
return [result.result for result in results if result.error is None]
# Async context manager for file operations
@asynccontextmanager
async def async_file_processor(file_path: str, chunk_size: int = 8192):
"""Async context manager for processing large files."""
try:
async with aiofiles.open(file_path, 'rb') as file:
yield file
except FileNotFoundError:
print(f"File {file_path} not found")
yield None
async def process_large_file(file_path: str) -> Dict[str, Any]:
"""Process a large file asynchronously."""
stats = {
'total_bytes': 0,
'line_count': 0,
'word_count': 0
}
async with async_file_processor(file_path) as file:
if file is None:
return stats
async for line in file:
line_text = line.decode('utf-8')
stats['total_bytes'] += len(line)
stats['line_count'] += 1
stats['word_count'] += len(line_text.split())
# Yield control to allow other tasks to run
if stats['line_count'] % 1000 == 0:
await asyncio.sleep(0)
return stats
# Example usage
async def main():
# HTTP client example
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3"
]
async with AsyncHTTPClient() as client:
results = await client.fetch_multiple(urls)
print(f"Fetched {len(results)} URLs successfully")
# Task manager example
async def sample_task(delay: float) -> str:
await asyncio.sleep(delay)
return f"Task completed after {delay}s"
task_manager = AsyncTaskManager(max_concurrent_tasks=3)
tasks = {
f"task_{i}": sample_task(i * 0.5)
for i in range(1, 6)
}
results = await task_manager.execute_batch(tasks)
for result in results:
if result.error:
print(f"Task {result.task_id} failed: {result.error}")
else:
print(f"Task {result.task_id}: {result.result} (took {result.duration:.2f}s)")
# Run the example
if __name__ == "__main__":
asyncio.run(main())
Advanced Data Structures and Algorithms
Custom Collections
from collections.abc import MutableMapping, Iterator
from typing import Generic, TypeVar, Optional, Tuple, List
import bisect
import weakref
K = TypeVar('K')
V = TypeVar('V')
class LRUCache(Generic[K, V]):
"""Least Recently Used cache implementation."""
def __init__(self, capacity: int):
self.capacity = capacity
self.cache: Dict[K, 'Node[K, V]'] = {}
self.head = self.Node(None, None)
self.tail = self.Node(None, None)
self.head.next = self.tail
self.tail.prev = self.head
class Node(Generic[K, V]):
def __init__(self, key: Optional[K], value: Optional[V]):
self.key = key
self.value = value
self.prev: Optional['LRUCache.Node[K, V]'] = None
self.next: Optional['LRUCache.Node[K, V]'] = None
def _add_node(self, node: 'Node[K, V]') -> None:
"""Add node right after head."""
node.prev = self.head
node.next = self.head.next
self.head.next.prev = node
self.head.next = node
def _remove_node(self, node: 'Node[K, V]') -> None:
"""Remove an existing node."""
prev_node = node.prev
next_node = node.next
prev_node.next = next_node
next_node.prev = prev_node
def _move_to_head(self, node: 'Node[K, V]') -> None:
"""Move node to head."""
self._remove_node(node)
self._add_node(node)
def _pop_tail(self) -> 'Node[K, V]':
"""Pop the last node."""
last_node = self.tail.prev
self._remove_node(last_node)
return last_node
def get(self, key: K) -> Optional[V]:
"""Get value by key."""
node = self.cache.get(key)
if not node:
return None
# Move to head (mark as recently used)
self._move_to_head(node)
return node.value
def put(self, key: K, value: V) -> None:
"""Put key-value pair."""
node = self.cache.get(key)
if not node:
new_node = self.Node(key, value)
if len(self.cache) >= self.capacity:
# Remove least recently used
tail = self._pop_tail()
del self.cache[tail.key]
self.cache[key] = new_node
self._add_node(new_node)
else:
# Update existing node
node.value = value
self._move_to_head(node)
class SortedDict(MutableMapping[K, V]):
"""Dictionary that maintains keys in sorted order."""
def __init__(self):
self._keys: List[K] = []
self._values: Dict[K, V] = {}
def __getitem__(self, key: K) -> V:
return self._values[key]
def __setitem__(self, key: K, value: V) -> None:
if key not in self._values:
bisect.insort(self._keys, key)
self._values[key] = value
def __delitem__(self, key: K) -> None:
self._keys.remove(key)
del self._values[key]
def __iter__(self) -> Iterator[K]:
return iter(self._keys)
def __len__(self) -> int:
return len(self._keys)
def items_in_range(self, start: K, end: K) -> List[Tuple[K, V]]:
"""Get items with keys in the specified range."""
start_idx = bisect.bisect_left(self._keys, start)
end_idx = bisect.bisect_right(self._keys, end)
return [
(key, self._values[key])
for key in self._keys[start_idx:end_idx]
]
class ObservableDict(dict):
"""Dictionary that notifies observers of changes."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._observers: List[weakref.ReferenceType] = []
def add_observer(self, observer: Callable[[str, K, V], None]) -> None:
"""Add an observer for dictionary changes."""
self._observers.append(weakref.ref(observer))
def _notify_observers(self, action: str, key: K, value: V) -> None:
"""Notify all observers of a change."""
# Clean up dead references
self._observers = [ref for ref in self._observers if ref() is not None]
for observer_ref in self._observers:
observer = observer_ref()
if observer:
observer(action, key, value)
def __setitem__(self, key: K, value: V) -> None:
action = "update" if key in self else "insert"
super().__setitem__(key, value)
self._notify_observers(action, key, value)
def __delitem__(self, key: K) -> None:
value = self[key]
super().__delitem__(key)
self._notify_observers("delete", key, value)
# Usage examples
def example_usage():
# LRU Cache
cache = LRUCache[str, int](capacity=3)
cache.put("a", 1)
cache.put("b", 2)
cache.put("c", 3)
print(cache.get("a")) # 1
cache.put("d", 4) # Evicts "b"
print(cache.get("b")) # None
# Sorted Dictionary
sorted_dict = SortedDict[int, str]()
sorted_dict[3] = "three"
sorted_dict[1] = "one"
sorted_dict[2] = "two"
print(list(sorted_dict.keys())) # [1, 2, 3]
print(sorted_dict.items_in_range(1, 2)) # [(1, 'one'), (2, 'two')]
# Observable Dictionary
def on_change(action: str, key: str, value: int):
print(f"Dictionary {action}: {key} = {value}")
obs_dict = ObservableDict[str, int]()
obs_dict.add_observer(on_change)
obs_dict["x"] = 10 # Prints: Dictionary insert: x = 10
obs_dict["x"] = 20 # Prints: Dictionary update: x = 20
del obs_dict["x"] # Prints: Dictionary delete: x = 20
Conclusion
These advanced Python techniques provide powerful tools for building sophisticated applications:
- Advanced decorators enable clean separation of concerns and reusable functionality
- Context managers ensure proper resource management and cleanup
- Metaclasses and
__init_subclass__
provide powerful class customization capabilities - Asynchronous programming enables high-performance concurrent applications
- Custom data structures solve specific problems efficiently
The key to mastering these techniques is understanding when and how to apply them appropriately. Start with simpler patterns and gradually incorporate more advanced concepts as your applications grow in complexity.
Remember that with great power comes great responsibility – use these advanced features judiciously and always prioritize code readability and maintainability over cleverness.