projection-patterns
Build read models and projections from event streams. Use when implementing CQRS read sides, building materialized views, or optimizing query performance in event-sourced systems.
Install
mkdir -p .claude/skills/projection-patterns && curl -L -o skill.zip "https://mcp.directory/api/skills/download/3714" && unzip -o skill.zip -d .claude/skills/projection-patterns && rm skill.zipInstalls to .claude/skills/projection-patterns
About this skill
Projection Patterns
Comprehensive guide to building projections and read models for event-sourced systems.
When to Use This Skill
- Building CQRS read models
- Creating materialized views from events
- Optimizing query performance
- Implementing real-time dashboards
- Building search indexes from events
- Aggregating data across streams
Core Concepts
1. Projection Architecture
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Event Store │────►│ Projector │────►│ Read Model │
│ │ │ │ │ (Database) │
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
│ │ Events │ │ │ │ Handler │ │ │ │ Tables │ │
│ └─────────┘ │ │ │ Logic │ │ │ │ Views │ │
│ │ │ └─────────┘ │ │ │ Cache │ │
└─────────────┘ └─────────────┘ └─────────────┘
2. Projection Types
| Type | Description | Use Case |
|---|---|---|
| Live | Real-time from subscription | Current state queries |
| Catchup | Process historical events | Rebuilding read models |
| Persistent | Stores checkpoint | Resume after restart |
| Inline | Same transaction as write | Strong consistency |
Templates
Template 1: Basic Projector
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Any, Callable, List
import asyncpg
@dataclass
class Event:
stream_id: str
event_type: str
data: dict
version: int
global_position: int
class Projection(ABC):
"""Base class for projections."""
@property
@abstractmethod
def name(self) -> str:
"""Unique projection name for checkpointing."""
pass
@abstractmethod
def handles(self) -> List[str]:
"""List of event types this projection handles."""
pass
@abstractmethod
async def apply(self, event: Event) -> None:
"""Apply event to the read model."""
pass
class Projector:
"""Runs projections from event store."""
def __init__(self, event_store, checkpoint_store):
self.event_store = event_store
self.checkpoint_store = checkpoint_store
self.projections: List[Projection] = []
def register(self, projection: Projection):
self.projections.append(projection)
async def run(self, batch_size: int = 100):
"""Run all projections continuously."""
while True:
for projection in self.projections:
await self._run_projection(projection, batch_size)
await asyncio.sleep(0.1)
async def _run_projection(self, projection: Projection, batch_size: int):
checkpoint = await self.checkpoint_store.get(projection.name)
position = checkpoint or 0
events = await self.event_store.read_all(position, batch_size)
for event in events:
if event.event_type in projection.handles():
await projection.apply(event)
await self.checkpoint_store.save(
projection.name,
event.global_position
)
async def rebuild(self, projection: Projection):
"""Rebuild a projection from scratch."""
await self.checkpoint_store.delete(projection.name)
# Optionally clear read model tables
await self._run_projection(projection, batch_size=1000)
Template 2: Order Summary Projection
class OrderSummaryProjection(Projection):
"""Projects order events to a summary read model."""
def __init__(self, db_pool: asyncpg.Pool):
self.pool = db_pool
@property
def name(self) -> str:
return "order_summary"
def handles(self) -> List[str]:
return [
"OrderCreated",
"OrderItemAdded",
"OrderItemRemoved",
"OrderShipped",
"OrderCompleted",
"OrderCancelled"
]
async def apply(self, event: Event) -> None:
handlers = {
"OrderCreated": self._handle_created,
"OrderItemAdded": self._handle_item_added,
"OrderItemRemoved": self._handle_item_removed,
"OrderShipped": self._handle_shipped,
"OrderCompleted": self._handle_completed,
"OrderCancelled": self._handle_cancelled,
}
handler = handlers.get(event.event_type)
if handler:
await handler(event)
async def _handle_created(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO order_summaries
(order_id, customer_id, status, total_amount, item_count, created_at)
VALUES ($1, $2, $3, $4, $5, $6)
""",
event.data['order_id'],
event.data['customer_id'],
'pending',
0,
0,
event.data['created_at']
)
async def _handle_item_added(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET total_amount = total_amount + $2,
item_count = item_count + 1,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['price'] * event.data['quantity']
)
async def _handle_item_removed(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET total_amount = total_amount - $2,
item_count = item_count - 1,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['price'] * event.data['quantity']
)
async def _handle_shipped(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET status = 'shipped',
shipped_at = $2,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['shipped_at']
)
async def _handle_completed(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET status = 'completed',
completed_at = $2,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['completed_at']
)
async def _handle_cancelled(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET status = 'cancelled',
cancelled_at = $2,
cancellation_reason = $3,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['cancelled_at'],
event.data.get('reason')
)
Template 3: Elasticsearch Search Projection
from elasticsearch import AsyncElasticsearch
class ProductSearchProjection(Projection):
"""Projects product events to Elasticsearch for full-text search."""
def __init__(self, es_client: AsyncElasticsearch):
self.es = es_client
self.index = "products"
@property
def name(self) -> str:
return "product_search"
def handles(self) -> List[str]:
return [
"ProductCreated",
"ProductUpdated",
"ProductPriceChanged",
"ProductDeleted"
]
async def apply(self, event: Event) -> None:
if event.event_type == "ProductCreated":
await self.es.index(
index=self.index,
id=event.data['product_id'],
document={
'name': event.data['name'],
'description': event.data['description'],
'category': event.data['category'],
'price': event.data['price'],
'tags': event.data.get('tags', []),
'created_at': event.data['created_at']
}
)
elif event.event_type == "ProductUpdated":
await self.es.update(
index=self.index,
id=event.data['product_id'],
doc={
'name': event.data['name'],
'description': event.data['description'],
'category': event.data['category'],
'tags': event.data.get('tags', []),
'updated_at': event.data['updated_at']
}
)
elif event.event_type == "ProductPriceChanged":
await self.es.update(
index=self.index,
id=event.data['product_id'],
doc={
'price': event.data['new_price'],
'price_updated_at': event.data['changed_at']
}
)
elif event.event_type == "ProductDeleted":
await self.es.delete(
index=self.index,
id=event.data['product_id']
)
Template 4: Aggregating Projection
class DailySalesProjection(Projection):
"""Aggregates sales data by day for reporting."""
def __init__(self, d
---
*Content truncated.*
More by wshobson
View all skills by wshobson →You might also like
flutter-development
aj-geddes
Build beautiful cross-platform mobile apps with Flutter and Dart. Covers widgets, state management with Provider/BLoC, navigation, API integration, and material design.
drawio-diagrams-enhanced
jgtolentino
Create professional draw.io (diagrams.net) diagrams in XML format (.drawio files) with integrated PMP/PMBOK methodologies, extensive visual asset libraries, and industry-standard professional templates. Use this skill when users ask to create flowcharts, swimlane diagrams, cross-functional flowcharts, org charts, network diagrams, UML diagrams, BPMN, project management diagrams (WBS, Gantt, PERT, RACI), risk matrices, stakeholder maps, or any other visual diagram in draw.io format. This skill includes access to custom shape libraries for icons, clipart, and professional symbols.
ui-ux-pro-max
nextlevelbuilder
"UI/UX design intelligence. 50 styles, 21 palettes, 50 font pairings, 20 charts, 8 stacks (React, Next.js, Vue, Svelte, SwiftUI, React Native, Flutter, Tailwind). Actions: plan, build, create, design, implement, review, fix, improve, optimize, enhance, refactor, check UI/UX code. Projects: website, landing page, dashboard, admin panel, e-commerce, SaaS, portfolio, blog, mobile app, .html, .tsx, .vue, .svelte. Elements: button, modal, navbar, sidebar, card, table, form, chart. Styles: glassmorphism, claymorphism, minimalism, brutalism, neumorphism, bento grid, dark mode, responsive, skeuomorphism, flat design. Topics: color palette, accessibility, animation, layout, typography, font pairing, spacing, hover, shadow, gradient."
godot
bfollington
This skill should be used when working on Godot Engine projects. It provides specialized knowledge of Godot's file formats (.gd, .tscn, .tres), architecture patterns (component-based, signal-driven, resource-based), common pitfalls, validation tools, code templates, and CLI workflows. The `godot` command is available for running the game, validating scripts, importing resources, and exporting builds. Use this skill for tasks involving Godot game development, debugging scene/resource files, implementing game systems, or creating new Godot components.
nano-banana-pro
garg-aayush
Generate and edit images using Google's Nano Banana Pro (Gemini 3 Pro Image) API. Use when the user asks to generate, create, edit, modify, change, alter, or update images. Also use when user references an existing image file and asks to modify it in any way (e.g., "modify this image", "change the background", "replace X with Y"). Supports both text-to-image generation and image-to-image editing with configurable resolution (1K default, 2K, or 4K for high resolution). DO NOT read the image file first - use this skill directly with the --input-image parameter.
fastapi-templates
wshobson
Create production-ready FastAPI projects with async patterns, dependency injection, and comprehensive error handling. Use when building new FastAPI applications or setting up backend API projects.
Related MCP Servers
Browse all serversEnhance productivity with AI-driven Notion automation. Leverage the Notion API for secure, automated workspace managemen
Unlock browser automation studio with Browserbase MCP Server. Enhance Selenium software testing and AI-driven workflows
Connect Supabase projects to AI with Supabase MCP Server. Standardize LLM communication for secure, efficient developmen
Empower your Unity projects with Unity-MCP: AI-driven control, seamless integration, and advanced workflows within the U
Control any ROS1 or ROS2 robot with natural language using ROS MCP Server—AI-powered, code-free, real-time monitoring an
IDA Pro software enables programmatic access to IDA disassembler databases for automated reverse engineering and binary
Stay ahead of the MCP ecosystem
Get weekly updates on new skills and servers.