A Coding Guide to Build a Production-Ready Asynchronous Python SDK with Rate Limiting, In-Memory Caching, and Authentication
In this tutorial, we guide users through building a robust, production-ready Python SDK. It begins by showing how to install and configure essential asynchronous HTTP libraries (aiohttp, nest-asyncio). It then walks through the implementation of core components, including structured response objects, token-bucket rate limiting, in-memory caching with TTL, and a clean, dataclass-driven design. We’ll see […] The post A Coding Guide to Build a Production-Ready Asynchronous Python SDK with Rate Limiting, In-Memory Caching, and Authentication appeared first on MarkTechPost.

In this tutorial, we guide users through building a robust, production-ready Python SDK. It begins by showing how to install and configure essential asynchronous HTTP libraries (aiohttp, nest-asyncio). It then walks through the implementation of core components, including structured response objects, token-bucket rate limiting, in-memory caching with TTL, and a clean, dataclass-driven design. We’ll see how to wrap these pieces up in an AdvancedSDK class that supports async context management, automatic retry/wait-on-rate-limit behavior, JSON/auth headers injection, and convenient HTTP-verb methods. Along the way, a demo harness against JSONPlaceholder illustrates caching efficiency, batch fetching with rate limits, error handling, and even shows how to extend the SDK via a fluent “builder” pattern for custom configuration.
import asyncio
import aiohttp
import time
import json
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import hashlib
import logging
!pip install aiohttp nest-asyncio
We install and configure the asynchronous runtime by importing asyncio and aiohttp, alongside utilities for timing, JSON handling, dataclass modeling, caching (via hashlib and datetime), and structured logging. The !pip install aiohttp nest-asyncio line ensures that the notebook can run an event loop seamlessly within Colab, enabling robust async HTTP requests and rate-limited workflows.
@dataclass
class APIResponse:
"""Structured response object"""
data: Any
status_code: int
headers: Dict[str, str]
timestamp: datetime
def to_dict(self) -> Dict:
return asdict(self)
The APIResponse dataclass encapsulates HTTP response details, payload (data), status code, headers, and the timestamp of retrieval into a single, typed object. The to_dict() helper converts the instance into a plain dictionary for easy logging, serialization, or downstream processing.
class RateLimiter:
"""Token bucket rate limiter"""
def __init__(self, max_calls: int = 100, time_window: int = 60):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
def can_proceed(self) -> bool:
now = time.time()
self.calls = [call_time for call_time in self.calls if now - call_time < self.time_window]
if len(self.calls) < self.max_calls:
self.calls.append(now)
return True
return False
def wait_time(self) -> float:
if not self.calls:
return 0
return max(0, self.time_window - (time.time() - self.calls[0]))
The RateLimiter class enforces a simple token-bucket policy by tracking the timestamps of recent calls and allowing up to max_calls within a rolling time_window. When the limit is reached, can_proceed() returns False, and wait_time() calculates how long to pause before making the next request.
class Cache:
"""Simple in-memory cache with TTL"""
def __init__(self, default_ttl: int = 300):
self.cache = {}
self.default_ttl = default_ttl
def _generate_key(self, method: str, url: str, params: Dict = None) -> str:
key_data = f"{method}:{url}:{json.dumps(params or {}, sort_keys=True)}"
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, method: str, url: str, params: Dict = None) -> Optional[APIResponse]:
key = self._generate_key(method, url, params)
if key in self.cache:
response, expiry = self.cache[key]
if datetime.now() < expiry:
return response
del self.cache[key]
return None
def set(self, method: str, url: str, response: APIResponse, params: Dict = None, ttl: int = None):
key = self._generate_key(method, url, params)
expiry = datetime.now() + timedelta(seconds=ttl or self.default_ttl)
self.cache[key] = (response, expiry)
The Cache class provides a lightweight in-memory TTL cache for API responses by hashing the request signature (method, URL, params) into a unique key. It returns valid cached APIResponse objects before expiry and automatically evicts stale entries after their time-to-live has elapsed.
class AdvancedSDK:
"""Advanced SDK with modern Python patterns"""
def __init__(self, base_url: str, api_key: str = None, rate_limit: int = 100):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.session = None
self.rate_limiter = RateLimiter(max_calls=rate_limit)
self.cache = Cache()
self.logger = self._setup_logger()
def _setup_logger(self) -> logging.Logger:
logger = logging.getLogger(f"SDK-{id(self)}")
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
async def __aenter__(self):
"""Async context manager entry"""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.session:
await self.session.close()
def _get_headers(self) -> Dict[str, str]:
headers = {'Content-Type': 'application/json'}
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
return headers
async def _make_request(self, method: str, endpoint: str, params: Dict = None,
data: Dict = None, use_cache: bool = True) -> APIResponse:
"""Core request method with rate limiting and caching"""
if use_cache and method.upper() == 'GET':
cached = self.cache.get(method, endpoint, params)
if cached:
self.logger.info(f"Cache hit for {method} {endpoint}")
return cached
if not self.rate_limiter.can_proceed():
wait_time = self.rate_limiter.wait_time()
self.logger.warning(f"Rate limit hit, waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
async with self.session.request(
method=method.upper(),
url=url,
params=params,
json=data,
headers=self._get_headers()
) as resp:
response_data = await resp.json() if resp.content_type == 'application/json' else await resp.text()
api_response = APIResponse(
data=response_data,
status_code=resp.status,
headers=dict(resp.headers),
timestamp=datetime.now()
)
if use_cache and method.upper() == 'GET' and 200 <= resp.status < 300:
self.cache.set(method, endpoint, api_response, params)
self.logger.info(f"{method.upper()} {endpoint} - Status: {resp.status}")
return api_response
except Exception as e:
self.logger.error(f"Request failed: {str(e)}")
raise
async def get(self, endpoint: str, params: Dict = None, use_cache: bool = True) -> APIResponse:
return await self._make_request('GET', endpoint, params=params, use_cache=use_cache)
async def post(self, endpoint: str, data: Dict = None) -> APIResponse:
return await self._make_request('POST', endpoint, data=data, use_cache=False)
async def put(self, endpoint: str, data: Dict = None) -> APIResponse:
return await self._make_request('PUT', endpoint, data=data, use_cache=False)
async def delete(self, endpoint: str) -> APIResponse:
return await self._make_request('DELETE', endpoint, use_cache=False)
The AdvancedSDK class wraps everything together into a clean, async-first client: it manages an aiohttp session via async context managers, injects JSON and auth headers, and coordinates our RateLimiter and Cache under the hood. Its _make_request method centralizes GET/POST/PUT/DELETE logic, handling cache lookups, rate-limit waits, error logging, and response packing into APIResponse objects, while the get/post/put/delete helpers give us ergonomic, high-level calls.
async def demo_sdk():
"""Demonstrate SDK capabilities"""
print("
Read More