Python Programming Advanced

Advanced Python Techniques: From Decorators to Async Programming

D

Dr. Sarah Kim

Advanced Python Techniques: From Decorators to Async Programming

Master advanced Python concepts including decorators, context managers, metaclasses, and asynchronous programming for building robust applications.

Python’s elegance lies in its simplicity, but beneath the surface are powerful features that can transform how you write code. This guide explores advanced Python techniques that will elevate your programming skills and help you write more efficient, maintainable code.

Advanced Decorators

Function Decorators with Arguments

import functools
import time
from typing import Callable, Any, TypeVar, ParamSpec

P = ParamSpec('P')
T = TypeVar('T')

def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
    """Decorator that retries a function call with exponential backoff."""
    def decorator(func: Callable[P, T]) -> Callable[P, T]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            attempts = 0
            current_delay = delay
            
            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    attempts += 1
                    if attempts == max_attempts:
                        raise e
                    
                    print(f"Attempt {attempts} failed: {e}. Retrying in {current_delay}s...")
                    time.sleep(current_delay)
                    current_delay *= backoff
            
            raise RuntimeError("This should never be reached")
        return wrapper
    return decorator

def cache_with_ttl(ttl_seconds: int = 300):
    """Decorator that caches function results with TTL."""
    def decorator(func: Callable[P, T]) -> Callable[P, T]:
        cache = {}
        
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            # Create cache key from arguments
            key = str(args) + str(sorted(kwargs.items()))
            current_time = time.time()
            
            # Check if cached result exists and is still valid
            if key in cache:
                result, timestamp = cache[key]
                if current_time - timestamp < ttl_seconds:
                    return result
            
            # Call function and cache result
            result = func(*args, **kwargs)
            cache[key] = (result, current_time)
            return result
        
        return wrapper
    return decorator

# Usage examples
@retry(max_attempts=3, delay=0.5, backoff=2.0)
@cache_with_ttl(ttl_seconds=60)
def fetch_data_from_api(url: str) -> dict:
    """Fetch data from API with retry and caching."""
    import requests
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

Class Decorators

from dataclasses import dataclass
from typing import Type, TypeVar

T = TypeVar('T')

def singleton(cls: Type[T]) -> Type[T]:
    """Decorator that makes a class a singleton."""
    instances = {}
    
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    
    return get_instance

def add_repr(cls: Type[T]) -> Type[T]:
    """Decorator that adds a __repr__ method to a class."""
    def __repr__(self):
        attrs = ', '.join(f'{k}={v!r}' for k, v in self.__dict__.items())
        return f'{cls.__name__}({attrs})'
    
    cls.__repr__ = __repr__
    return cls

@singleton
@add_repr
class DatabaseConnection:
    def __init__(self, host: str, port: int):
        self.host = host
        self.port = port
        self.connected = False
    
    def connect(self):
        if not self.connected:
            print(f"Connecting to {self.host}:{self.port}")
            self.connected = True

# Usage
db1 = DatabaseConnection("localhost", 5432)
db2 = DatabaseConnection("localhost", 5432)
print(db1 is db2)  # True - same instance
print(db1)  # DatabaseConnection(host='localhost', port=5432, connected=False)

Context Managers

Custom Context Managers

import contextlib
import sqlite3
import tempfile
import os
from typing import Generator, Optional
from pathlib import Path

class DatabaseTransaction:
    """Context manager for database transactions with automatic rollback."""
    
    def __init__(self, connection: sqlite3.Connection):
        self.connection = connection
        self.transaction = None
    
    def __enter__(self) -> sqlite3.Connection:
        self.transaction = self.connection.execute("BEGIN")
        return self.connection
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            self.connection.commit()
            print("Transaction committed")
        else:
            self.connection.rollback()
            print(f"Transaction rolled back due to {exc_type.__name__}: {exc_val}")
        return False  # Don't suppress exceptions

@contextlib.contextmanager
def temporary_file(suffix: str = "", content: str = "") -> Generator[Path, None, None]:
    """Context manager for creating temporary files."""
    fd, path = tempfile.mkstemp(suffix=suffix)
    try:
        if content:
            with os.fdopen(fd, 'w') as f:
                f.write(content)
        else:
            os.close(fd)
        
        yield Path(path)
    finally:
        if os.path.exists(path):
            os.unlink(path)

@contextlib.contextmanager
def timer(description: str = "Operation") -> Generator[None, None, None]:
    """Context manager for timing operations."""
    start_time = time.time()
    try:
        yield
    finally:
        elapsed = time.time() - start_time
        print(f"{description} took {elapsed:.4f} seconds")

# Usage examples
def example_usage():
    # Database transaction
    conn = sqlite3.connect(":memory:")
    conn.execute("CREATE TABLE users (id INTEGER, name TEXT)")
    
    try:
        with DatabaseTransaction(conn):
            conn.execute("INSERT INTO users VALUES (1, 'Alice')")
            conn.execute("INSERT INTO users VALUES (2, 'Bob')")
            # raise Exception("Something went wrong!")  # Uncomment to test rollback
    except Exception as e:
        print(f"Handled exception: {e}")
    
    # Temporary file
    with temporary_file(suffix=".txt", content="Hello, World!") as temp_path:
        print(f"Temporary file created: {temp_path}")
        with open(temp_path, 'r') as f:
            print(f"Content: {f.read()}")
    
    # Timer
    with timer("Heavy computation"):
        time.sleep(1)  # Simulate work

Metaclasses and Class Creation

Custom Metaclasses

from typing import Dict, Any, Type

class SingletonMeta(type):
    """Metaclass that creates singleton instances."""
    _instances: Dict[Type, Any] = {}
    
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]

class ValidatedMeta(type):
    """Metaclass that adds validation to class attributes."""
    
    def __new__(mcs, name, bases, namespace, **kwargs):
        # Add validation for required attributes
        required_attrs = namespace.get('_required_attrs', [])
        for attr in required_attrs:
            if attr not in namespace:
                raise ValueError(f"Class {name} must define attribute '{attr}'")
        
        # Add type checking for annotated attributes
        annotations = namespace.get('__annotations__', {})
        for attr_name, attr_type in annotations.items():
            if attr_name in namespace:
                value = namespace[attr_name]
                if not isinstance(value, attr_type):
                    raise TypeError(
                        f"Attribute '{attr_name}' must be of type {attr_type.__name__}, "
                        f"got {type(value).__name__}"
                    )
        
        return super().__new__(mcs, name, bases, namespace)

class APIEndpoint(metaclass=ValidatedMeta):
    """Base class for API endpoints with validation."""
    _required_attrs = ['path', 'method']
    
    path: str
    method: str
    
    def __init__(self):
        self.handlers = {}
    
    def register_handler(self, handler):
        self.handlers[self.method] = handler

class UserEndpoint(APIEndpoint):
    path = "/api/users"
    method = "GET"
    
    def get_users(self):
        return {"users": []}

# Example with __init_subclass__ (modern alternative to metaclasses)
class ConfigurableBase:
    """Base class using __init_subclass__ for configuration."""
    
    def __init_subclass__(cls, config_required=True, **kwargs):
        super().__init_subclass__(**kwargs)
        
        if config_required and not hasattr(cls, 'config'):
            raise ValueError(f"Class {cls.__name__} must define 'config' attribute")
        
        # Auto-register subclasses
        if not hasattr(cls, '_registry'):
            cls._registry = {}
        cls._registry[cls.__name__] = cls

class DatabaseModel(ConfigurableBase, config_required=True):
    config = {
        'table_name': 'base_model',
        'primary_key': 'id'
    }

class User(DatabaseModel):
    config = {
        'table_name': 'users',
        'primary_key': 'id',
        'fields': ['name', 'email', 'created_at']
    }

Asynchronous Programming

Advanced Async Patterns

import asyncio
import aiohttp
import aiofiles
from typing import List, Dict, Any, Optional, Callable, Awaitable
from dataclasses import dataclass
from contextlib import asynccontextmanager

@dataclass
class TaskResult:
    task_id: str
    result: Any
    error: Optional[Exception] = None
    duration: float = 0.0

class AsyncTaskManager:
    """Advanced async task manager with concurrency control."""
    
    def __init__(self, max_concurrent_tasks: int = 10):
        self.max_concurrent_tasks = max_concurrent_tasks
        self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
        self.results: List[TaskResult] = []
    
    async def execute_task(self, task_id: str, coro: Awaitable[Any]) -> TaskResult:
        """Execute a single task with timing and error handling."""
        async with self.semaphore:
            start_time = asyncio.get_event_loop().time()
            try:
                result = await coro
                duration = asyncio.get_event_loop().time() - start_time
                return TaskResult(task_id=task_id, result=result, duration=duration)
            except Exception as e:
                duration = asyncio.get_event_loop().time() - start_time
                return TaskResult(task_id=task_id, result=None, error=e, duration=duration)
    
    async def execute_batch(self, tasks: Dict[str, Awaitable[Any]]) -> List[TaskResult]:
        """Execute multiple tasks concurrently."""
        task_coroutines = [
            self.execute_task(task_id, coro) 
            for task_id, coro in tasks.items()
        ]
        
        results = await asyncio.gather(*task_coroutines, return_exceptions=True)
        self.results.extend(results)
        return results

class AsyncHTTPClient:
    """Async HTTP client with connection pooling and retry logic."""
    
    def __init__(self, timeout: int = 30, max_retries: int = 3):
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.max_retries = max_retries
        self.session: Optional[aiohttp.ClientSession] = None
    
    async def __aenter__(self):
        connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=self.timeout
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch_with_retry(self, url: str, **kwargs) -> Dict[str, Any]:
        """Fetch URL with exponential backoff retry."""
        for attempt in range(self.max_retries):
            try:
                async with self.session.get(url, **kwargs) as response:
                    response.raise_for_status()
                    return await response.json()
            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                if attempt == self.max_retries - 1:
                    raise e
                
                wait_time = 2 ** attempt
                await asyncio.sleep(wait_time)
    
    async def fetch_multiple(self, urls: List[str]) -> List[Dict[str, Any]]:
        """Fetch multiple URLs concurrently."""
        tasks = {f"url_{i}": self.fetch_with_retry(url) for i, url in enumerate(urls)}
        
        task_manager = AsyncTaskManager(max_concurrent_tasks=10)
        results = await task_manager.execute_batch(tasks)
        
        return [result.result for result in results if result.error is None]

# Async context manager for file operations
@asynccontextmanager
async def async_file_processor(file_path: str, chunk_size: int = 8192):
    """Async context manager for processing large files."""
    try:
        async with aiofiles.open(file_path, 'rb') as file:
            yield file
    except FileNotFoundError:
        print(f"File {file_path} not found")
        yield None

async def process_large_file(file_path: str) -> Dict[str, Any]:
    """Process a large file asynchronously."""
    stats = {
        'total_bytes': 0,
        'line_count': 0,
        'word_count': 0
    }
    
    async with async_file_processor(file_path) as file:
        if file is None:
            return stats
        
        async for line in file:
            line_text = line.decode('utf-8')
            stats['total_bytes'] += len(line)
            stats['line_count'] += 1
            stats['word_count'] += len(line_text.split())
            
            # Yield control to allow other tasks to run
            if stats['line_count'] % 1000 == 0:
                await asyncio.sleep(0)
    
    return stats

# Example usage
async def main():
    # HTTP client example
    urls = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3"
    ]
    
    async with AsyncHTTPClient() as client:
        results = await client.fetch_multiple(urls)
        print(f"Fetched {len(results)} URLs successfully")
    
    # Task manager example
    async def sample_task(delay: float) -> str:
        await asyncio.sleep(delay)
        return f"Task completed after {delay}s"
    
    task_manager = AsyncTaskManager(max_concurrent_tasks=3)
    tasks = {
        f"task_{i}": sample_task(i * 0.5) 
        for i in range(1, 6)
    }
    
    results = await task_manager.execute_batch(tasks)
    for result in results:
        if result.error:
            print(f"Task {result.task_id} failed: {result.error}")
        else:
            print(f"Task {result.task_id}: {result.result} (took {result.duration:.2f}s)")

# Run the example
if __name__ == "__main__":
    asyncio.run(main())

Advanced Data Structures and Algorithms

Custom Collections

from collections.abc import MutableMapping, Iterator
from typing import Generic, TypeVar, Optional, Tuple, List
import bisect
import weakref

K = TypeVar('K')
V = TypeVar('V')

class LRUCache(Generic[K, V]):
    """Least Recently Used cache implementation."""
    
    def __init__(self, capacity: int):
        self.capacity = capacity
        self.cache: Dict[K, 'Node[K, V]'] = {}
        self.head = self.Node(None, None)
        self.tail = self.Node(None, None)
        self.head.next = self.tail
        self.tail.prev = self.head
    
    class Node(Generic[K, V]):
        def __init__(self, key: Optional[K], value: Optional[V]):
            self.key = key
            self.value = value
            self.prev: Optional['LRUCache.Node[K, V]'] = None
            self.next: Optional['LRUCache.Node[K, V]'] = None
    
    def _add_node(self, node: 'Node[K, V]') -> None:
        """Add node right after head."""
        node.prev = self.head
        node.next = self.head.next
        self.head.next.prev = node
        self.head.next = node
    
    def _remove_node(self, node: 'Node[K, V]') -> None:
        """Remove an existing node."""
        prev_node = node.prev
        next_node = node.next
        prev_node.next = next_node
        next_node.prev = prev_node
    
    def _move_to_head(self, node: 'Node[K, V]') -> None:
        """Move node to head."""
        self._remove_node(node)
        self._add_node(node)
    
    def _pop_tail(self) -> 'Node[K, V]':
        """Pop the last node."""
        last_node = self.tail.prev
        self._remove_node(last_node)
        return last_node
    
    def get(self, key: K) -> Optional[V]:
        """Get value by key."""
        node = self.cache.get(key)
        if not node:
            return None
        
        # Move to head (mark as recently used)
        self._move_to_head(node)
        return node.value
    
    def put(self, key: K, value: V) -> None:
        """Put key-value pair."""
        node = self.cache.get(key)
        
        if not node:
            new_node = self.Node(key, value)
            
            if len(self.cache) >= self.capacity:
                # Remove least recently used
                tail = self._pop_tail()
                del self.cache[tail.key]
            
            self.cache[key] = new_node
            self._add_node(new_node)
        else:
            # Update existing node
            node.value = value
            self._move_to_head(node)

class SortedDict(MutableMapping[K, V]):
    """Dictionary that maintains keys in sorted order."""
    
    def __init__(self):
        self._keys: List[K] = []
        self._values: Dict[K, V] = {}
    
    def __getitem__(self, key: K) -> V:
        return self._values[key]
    
    def __setitem__(self, key: K, value: V) -> None:
        if key not in self._values:
            bisect.insort(self._keys, key)
        self._values[key] = value
    
    def __delitem__(self, key: K) -> None:
        self._keys.remove(key)
        del self._values[key]
    
    def __iter__(self) -> Iterator[K]:
        return iter(self._keys)
    
    def __len__(self) -> int:
        return len(self._keys)
    
    def items_in_range(self, start: K, end: K) -> List[Tuple[K, V]]:
        """Get items with keys in the specified range."""
        start_idx = bisect.bisect_left(self._keys, start)
        end_idx = bisect.bisect_right(self._keys, end)
        
        return [
            (key, self._values[key]) 
            for key in self._keys[start_idx:end_idx]
        ]

class ObservableDict(dict):
    """Dictionary that notifies observers of changes."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._observers: List[weakref.ReferenceType] = []
    
    def add_observer(self, observer: Callable[[str, K, V], None]) -> None:
        """Add an observer for dictionary changes."""
        self._observers.append(weakref.ref(observer))
    
    def _notify_observers(self, action: str, key: K, value: V) -> None:
        """Notify all observers of a change."""
        # Clean up dead references
        self._observers = [ref for ref in self._observers if ref() is not None]
        
        for observer_ref in self._observers:
            observer = observer_ref()
            if observer:
                observer(action, key, value)
    
    def __setitem__(self, key: K, value: V) -> None:
        action = "update" if key in self else "insert"
        super().__setitem__(key, value)
        self._notify_observers(action, key, value)
    
    def __delitem__(self, key: K) -> None:
        value = self[key]
        super().__delitem__(key)
        self._notify_observers("delete", key, value)

# Usage examples
def example_usage():
    # LRU Cache
    cache = LRUCache[str, int](capacity=3)
    cache.put("a", 1)
    cache.put("b", 2)
    cache.put("c", 3)
    print(cache.get("a"))  # 1
    cache.put("d", 4)  # Evicts "b"
    print(cache.get("b"))  # None
    
    # Sorted Dictionary
    sorted_dict = SortedDict[int, str]()
    sorted_dict[3] = "three"
    sorted_dict[1] = "one"
    sorted_dict[2] = "two"
    print(list(sorted_dict.keys()))  # [1, 2, 3]
    print(sorted_dict.items_in_range(1, 2))  # [(1, 'one'), (2, 'two')]
    
    # Observable Dictionary
    def on_change(action: str, key: str, value: int):
        print(f"Dictionary {action}: {key} = {value}")
    
    obs_dict = ObservableDict[str, int]()
    obs_dict.add_observer(on_change)
    obs_dict["x"] = 10  # Prints: Dictionary insert: x = 10
    obs_dict["x"] = 20  # Prints: Dictionary update: x = 20
    del obs_dict["x"]   # Prints: Dictionary delete: x = 20

Conclusion

These advanced Python techniques provide powerful tools for building sophisticated applications:

  1. Advanced decorators enable clean separation of concerns and reusable functionality
  2. Context managers ensure proper resource management and cleanup
  3. Metaclasses and __init_subclass__ provide powerful class customization capabilities
  4. Asynchronous programming enables high-performance concurrent applications
  5. Custom data structures solve specific problems efficiently

The key to mastering these techniques is understanding when and how to apply them appropriately. Start with simpler patterns and gradually incorporate more advanced concepts as your applications grow in complexity.

Remember that with great power comes great responsibility – use these advanced features judiciously and always prioritize code readability and maintainability over cleverness.