python-resource-management
Python resource management with context managers, cleanup patterns, and streaming. Use when managing connections, file handles, implementing cleanup logic, or building streaming responses with accumulated state.
Install
mkdir -p .claude/skills/python-resource-management && curl -L -o skill.zip "https://mcp.directory/api/skills/download/3427" && unzip -o skill.zip -d .claude/skills/python-resource-management && rm skill.zipInstalls to .claude/skills/python-resource-management
About this skill
Python Resource Management
Manage resources deterministically using context managers. Resources like database connections, file handles, and network sockets should be released reliably, even when exceptions occur.
When to Use This Skill
- Managing database connections and connection pools
- Working with file handles and I/O
- Implementing custom context managers
- Building streaming responses with state
- Handling nested resource cleanup
- Creating async context managers
Core Concepts
1. Context Managers
The with statement ensures resources are released automatically, even on exceptions.
2. Protocol Methods
__enter__/__exit__ for sync, __aenter__/__aexit__ for async resource management.
3. Unconditional Cleanup
__exit__ always runs, regardless of whether an exception occurred.
4. Exception Handling
Return True from __exit__ to suppress exceptions, False to propagate them.
Quick Start
from contextlib import contextmanager
@contextmanager
def managed_resource():
resource = acquire_resource()
try:
yield resource
finally:
resource.cleanup()
with managed_resource() as r:
r.do_work()
Fundamental Patterns
Pattern 1: Class-Based Context Manager
Implement the context manager protocol for complex resources.
class DatabaseConnection:
"""Database connection with automatic cleanup."""
def __init__(self, dsn: str) -> None:
self._dsn = dsn
self._conn: Connection | None = None
def connect(self) -> None:
"""Establish database connection."""
self._conn = psycopg.connect(self._dsn)
def close(self) -> None:
"""Close connection if open."""
if self._conn is not None:
self._conn.close()
self._conn = None
def __enter__(self) -> "DatabaseConnection":
"""Enter context: connect and return self."""
self.connect()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit context: always close connection."""
self.close()
# Usage with context manager (preferred)
with DatabaseConnection(dsn) as db:
result = db.execute(query)
# Manual management when needed
db = DatabaseConnection(dsn)
db.connect()
try:
result = db.execute(query)
finally:
db.close()
Pattern 2: Async Context Manager
For async resources, implement the async protocol.
class AsyncDatabasePool:
"""Async database connection pool."""
def __init__(self, dsn: str, min_size: int = 1, max_size: int = 10) -> None:
self._dsn = dsn
self._min_size = min_size
self._max_size = max_size
self._pool: asyncpg.Pool | None = None
async def __aenter__(self) -> "AsyncDatabasePool":
"""Create connection pool."""
self._pool = await asyncpg.create_pool(
self._dsn,
min_size=self._min_size,
max_size=self._max_size,
)
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Close all connections in pool."""
if self._pool is not None:
await self._pool.close()
async def execute(self, query: str, *args) -> list[dict]:
"""Execute query using pooled connection."""
async with self._pool.acquire() as conn:
return await conn.fetch(query, *args)
# Usage
async with AsyncDatabasePool(dsn) as pool:
users = await pool.execute("SELECT * FROM users WHERE active = $1", True)
Pattern 3: Using @contextmanager Decorator
Simplify context managers with the decorator for straightforward cases.
from contextlib import contextmanager, asynccontextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_block(name: str):
"""Time a block of code."""
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
logger.info(f"{name} completed", duration_seconds=round(elapsed, 3))
# Usage
with timed_block("data_processing"):
process_large_dataset()
@asynccontextmanager
async def database_transaction(conn: AsyncConnection):
"""Manage database transaction."""
await conn.execute("BEGIN")
try:
yield conn
await conn.execute("COMMIT")
except Exception:
await conn.execute("ROLLBACK")
raise
# Usage
async with database_transaction(conn) as tx:
await tx.execute("INSERT INTO users ...")
await tx.execute("INSERT INTO audit_log ...")
Pattern 4: Unconditional Resource Release
Always clean up resources in __exit__, regardless of exceptions.
class FileProcessor:
"""Process file with guaranteed cleanup."""
def __init__(self, path: str) -> None:
self._path = path
self._file: IO | None = None
self._temp_files: list[Path] = []
def __enter__(self) -> "FileProcessor":
self._file = open(self._path, "r")
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Clean up all resources unconditionally."""
# Close main file
if self._file is not None:
self._file.close()
# Clean up any temporary files
for temp_file in self._temp_files:
try:
temp_file.unlink()
except OSError:
pass # Best effort cleanup
# Return None/False to propagate any exception
Advanced Patterns
Pattern 5: Selective Exception Suppression
Only suppress specific, documented exceptions.
class StreamWriter:
"""Writer that handles broken pipe gracefully."""
def __init__(self, stream) -> None:
self._stream = stream
def __enter__(self) -> "StreamWriter":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
"""Clean up, suppressing BrokenPipeError on shutdown."""
self._stream.close()
# Suppress BrokenPipeError (client disconnected)
# This is expected behavior, not an error
if exc_type is BrokenPipeError:
return True # Exception suppressed
return False # Propagate all other exceptions
Pattern 6: Streaming with Accumulated State
Maintain both incremental chunks and accumulated state during streaming.
from collections.abc import Generator
from dataclasses import dataclass, field
@dataclass
class StreamingResult:
"""Accumulated streaming result."""
chunks: list[str] = field(default_factory=list)
_finalized: bool = False
@property
def content(self) -> str:
"""Get accumulated content."""
return "".join(self.chunks)
def add_chunk(self, chunk: str) -> None:
"""Add chunk to accumulator."""
if self._finalized:
raise RuntimeError("Cannot add to finalized result")
self.chunks.append(chunk)
def finalize(self) -> str:
"""Mark stream complete and return content."""
self._finalized = True
return self.content
def stream_with_accumulation(
response: StreamingResponse,
) -> Generator[tuple[str, str], None, str]:
"""Stream response while accumulating content.
Yields:
Tuple of (accumulated_content, new_chunk) for each chunk.
Returns:
Final accumulated content.
"""
result = StreamingResult()
for chunk in response.iter_content():
result.add_chunk(chunk)
yield result.content, chunk
return result.finalize()
Pattern 7: Efficient String Accumulation
Avoid O(n²) string concatenation when accumulating.
def accumulate_stream(stream) -> str:
"""Efficiently accumulate stream content."""
# BAD: O(n²) due to string immutability
# content = ""
# for chunk in stream:
# content += chunk # Creates new string each time
# GOOD: O(n) with list and join
chunks: list[str] = []
for chunk in stream:
chunks.append(chunk)
return "".join(chunks) # Single allocation
Pattern 8: Tracking Stream Metrics
Measure time-to-first-byte and total streaming time.
import time
from collections.abc import Generator
def stream_with_metrics(
response: StreamingResponse,
) -> Generator[str, None, dict]:
"""Stream response while collecting metrics.
Yields:
Content chunks.
Returns:
Metrics dictionary.
"""
start = time.perf_counter()
first_chunk_time: float | None = None
chunk_count = 0
total_bytes = 0
for chunk in response.iter_content():
if first_chunk_time is None:
first_chunk_time = time.perf_counter() - start
chunk_count += 1
total_bytes += len(chunk.encode())
yield chunk
total_time = time.perf_counter() - start
return {
"time_to_first_byte_ms": round((first_chunk_time or 0) * 1000, 2),
"total_time_ms": round(total_time * 1000, 2),
"chunk_count": chunk_count,
"total_bytes": total_bytes,
}
Pattern 9: Managing Multiple Resources with ExitStack
Handle a dynamic number of resources cleanly.
from contextlib import ExitStack, AsyncExitStack
from pathlib import Path
def process_files(paths: list[Path]) -> list[str]:
"""Process multiple files with automatic cleanup."""
results = []
with ExitStack() as stack:
# Open all files - they'll all be closed when block exits
files = [stack.enter_context(open(p)) for p
---
*Content truncated.*
More by wshobson
View all skills by wshobson →You might also like
flutter-development
aj-geddes
Build beautiful cross-platform mobile apps with Flutter and Dart. Covers widgets, state management with Provider/BLoC, navigation, API integration, and material design.
drawio-diagrams-enhanced
jgtolentino
Create professional draw.io (diagrams.net) diagrams in XML format (.drawio files) with integrated PMP/PMBOK methodologies, extensive visual asset libraries, and industry-standard professional templates. Use this skill when users ask to create flowcharts, swimlane diagrams, cross-functional flowcharts, org charts, network diagrams, UML diagrams, BPMN, project management diagrams (WBS, Gantt, PERT, RACI), risk matrices, stakeholder maps, or any other visual diagram in draw.io format. This skill includes access to custom shape libraries for icons, clipart, and professional symbols.
ui-ux-pro-max
nextlevelbuilder
"UI/UX design intelligence. 50 styles, 21 palettes, 50 font pairings, 20 charts, 8 stacks (React, Next.js, Vue, Svelte, SwiftUI, React Native, Flutter, Tailwind). Actions: plan, build, create, design, implement, review, fix, improve, optimize, enhance, refactor, check UI/UX code. Projects: website, landing page, dashboard, admin panel, e-commerce, SaaS, portfolio, blog, mobile app, .html, .tsx, .vue, .svelte. Elements: button, modal, navbar, sidebar, card, table, form, chart. Styles: glassmorphism, claymorphism, minimalism, brutalism, neumorphism, bento grid, dark mode, responsive, skeuomorphism, flat design. Topics: color palette, accessibility, animation, layout, typography, font pairing, spacing, hover, shadow, gradient."
godot
bfollington
This skill should be used when working on Godot Engine projects. It provides specialized knowledge of Godot's file formats (.gd, .tscn, .tres), architecture patterns (component-based, signal-driven, resource-based), common pitfalls, validation tools, code templates, and CLI workflows. The `godot` command is available for running the game, validating scripts, importing resources, and exporting builds. Use this skill for tasks involving Godot game development, debugging scene/resource files, implementing game systems, or creating new Godot components.
nano-banana-pro
garg-aayush
Generate and edit images using Google's Nano Banana Pro (Gemini 3 Pro Image) API. Use when the user asks to generate, create, edit, modify, change, alter, or update images. Also use when user references an existing image file and asks to modify it in any way (e.g., "modify this image", "change the background", "replace X with Y"). Supports both text-to-image generation and image-to-image editing with configurable resolution (1K default, 2K, or 4K for high resolution). DO NOT read the image file first - use this skill directly with the --input-image parameter.
fastapi-templates
wshobson
Create production-ready FastAPI projects with async patterns, dependency injection, and comprehensive error handling. Use when building new FastAPI applications or setting up backend API projects.
Related MCP Servers
Browse all serversManage Kubernetes GitOps applications and resources with Argo CD and your assistant. Seamless argocd integration for aut
Manage Kubernetes GitOps applications and deployments with Argo CD integration for easy, natural language control from y
Rtfmbro is an MCP server for config management tools—get real-time, version-specific docs from GitHub for Python, Node.j
Effortlessly manage Netlify projects with AI using the Netlify MCP Server—automate deployment, sites, and more via natur
Kubernetes Multi-Cluster Manager enables seamless kubectl management across multiple clusters, connecting distributed re
Enhance software testing with Playwright MCP: Fast, reliable browser automation, an innovative alternative to Selenium s
Stay ahead of the MCP ecosystem
Get weekly updates on new skills and servers.