projection-patterns

1
1
Source

Build read models and projections from event streams. Use when implementing CQRS read sides, building materialized views, or optimizing query performance in event-sourced systems.

Install

mkdir -p .claude/skills/projection-patterns && curl -L -o skill.zip "https://mcp.directory/api/skills/download/3714" && unzip -o skill.zip -d .claude/skills/projection-patterns && rm skill.zip

Installs to .claude/skills/projection-patterns

About this skill

Projection Patterns

Comprehensive guide to building projections and read models for event-sourced systems.

When to Use This Skill

  • Building CQRS read models
  • Creating materialized views from events
  • Optimizing query performance
  • Implementing real-time dashboards
  • Building search indexes from events
  • Aggregating data across streams

Core Concepts

1. Projection Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ Event Store │────►│ Projector   │────►│ Read Model  │
│             │     │             │     │ (Database)  │
│ ┌─────────┐ │     │ ┌─────────┐ │     │ ┌─────────┐ │
│ │ Events  │ │     │ │ Handler │ │     │ │ Tables  │ │
│ └─────────┘ │     │ │ Logic   │ │     │ │ Views   │ │
│             │     │ └─────────┘ │     │ │ Cache   │ │
└─────────────┘     └─────────────┘     └─────────────┘

2. Projection Types

TypeDescriptionUse Case
LiveReal-time from subscriptionCurrent state queries
CatchupProcess historical eventsRebuilding read models
PersistentStores checkpointResume after restart
InlineSame transaction as writeStrong consistency

Templates

Template 1: Basic Projector

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Any, Callable, List
import asyncpg

@dataclass
class Event:
    stream_id: str
    event_type: str
    data: dict
    version: int
    global_position: int


class Projection(ABC):
    """Base class for projections."""

    @property
    @abstractmethod
    def name(self) -> str:
        """Unique projection name for checkpointing."""
        pass

    @abstractmethod
    def handles(self) -> List[str]:
        """List of event types this projection handles."""
        pass

    @abstractmethod
    async def apply(self, event: Event) -> None:
        """Apply event to the read model."""
        pass


class Projector:
    """Runs projections from event store."""

    def __init__(self, event_store, checkpoint_store):
        self.event_store = event_store
        self.checkpoint_store = checkpoint_store
        self.projections: List[Projection] = []

    def register(self, projection: Projection):
        self.projections.append(projection)

    async def run(self, batch_size: int = 100):
        """Run all projections continuously."""
        while True:
            for projection in self.projections:
                await self._run_projection(projection, batch_size)
            await asyncio.sleep(0.1)

    async def _run_projection(self, projection: Projection, batch_size: int):
        checkpoint = await self.checkpoint_store.get(projection.name)
        position = checkpoint or 0

        events = await self.event_store.read_all(position, batch_size)

        for event in events:
            if event.event_type in projection.handles():
                await projection.apply(event)

            await self.checkpoint_store.save(
                projection.name,
                event.global_position
            )

    async def rebuild(self, projection: Projection):
        """Rebuild a projection from scratch."""
        await self.checkpoint_store.delete(projection.name)
        # Optionally clear read model tables
        await self._run_projection(projection, batch_size=1000)

Template 2: Order Summary Projection

class OrderSummaryProjection(Projection):
    """Projects order events to a summary read model."""

    def __init__(self, db_pool: asyncpg.Pool):
        self.pool = db_pool

    @property
    def name(self) -> str:
        return "order_summary"

    def handles(self) -> List[str]:
        return [
            "OrderCreated",
            "OrderItemAdded",
            "OrderItemRemoved",
            "OrderShipped",
            "OrderCompleted",
            "OrderCancelled"
        ]

    async def apply(self, event: Event) -> None:
        handlers = {
            "OrderCreated": self._handle_created,
            "OrderItemAdded": self._handle_item_added,
            "OrderItemRemoved": self._handle_item_removed,
            "OrderShipped": self._handle_shipped,
            "OrderCompleted": self._handle_completed,
            "OrderCancelled": self._handle_cancelled,
        }

        handler = handlers.get(event.event_type)
        if handler:
            await handler(event)

    async def _handle_created(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                INSERT INTO order_summaries
                (order_id, customer_id, status, total_amount, item_count, created_at)
                VALUES ($1, $2, $3, $4, $5, $6)
                """,
                event.data['order_id'],
                event.data['customer_id'],
                'pending',
                0,
                0,
                event.data['created_at']
            )

    async def _handle_item_added(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                UPDATE order_summaries
                SET total_amount = total_amount + $2,
                    item_count = item_count + 1,
                    updated_at = NOW()
                WHERE order_id = $1
                """,
                event.data['order_id'],
                event.data['price'] * event.data['quantity']
            )

    async def _handle_item_removed(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                UPDATE order_summaries
                SET total_amount = total_amount - $2,
                    item_count = item_count - 1,
                    updated_at = NOW()
                WHERE order_id = $1
                """,
                event.data['order_id'],
                event.data['price'] * event.data['quantity']
            )

    async def _handle_shipped(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                UPDATE order_summaries
                SET status = 'shipped',
                    shipped_at = $2,
                    updated_at = NOW()
                WHERE order_id = $1
                """,
                event.data['order_id'],
                event.data['shipped_at']
            )

    async def _handle_completed(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                UPDATE order_summaries
                SET status = 'completed',
                    completed_at = $2,
                    updated_at = NOW()
                WHERE order_id = $1
                """,
                event.data['order_id'],
                event.data['completed_at']
            )

    async def _handle_cancelled(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                UPDATE order_summaries
                SET status = 'cancelled',
                    cancelled_at = $2,
                    cancellation_reason = $3,
                    updated_at = NOW()
                WHERE order_id = $1
                """,
                event.data['order_id'],
                event.data['cancelled_at'],
                event.data.get('reason')
            )

Template 3: Elasticsearch Search Projection

from elasticsearch import AsyncElasticsearch

class ProductSearchProjection(Projection):
    """Projects product events to Elasticsearch for full-text search."""

    def __init__(self, es_client: AsyncElasticsearch):
        self.es = es_client
        self.index = "products"

    @property
    def name(self) -> str:
        return "product_search"

    def handles(self) -> List[str]:
        return [
            "ProductCreated",
            "ProductUpdated",
            "ProductPriceChanged",
            "ProductDeleted"
        ]

    async def apply(self, event: Event) -> None:
        if event.event_type == "ProductCreated":
            await self.es.index(
                index=self.index,
                id=event.data['product_id'],
                document={
                    'name': event.data['name'],
                    'description': event.data['description'],
                    'category': event.data['category'],
                    'price': event.data['price'],
                    'tags': event.data.get('tags', []),
                    'created_at': event.data['created_at']
                }
            )

        elif event.event_type == "ProductUpdated":
            await self.es.update(
                index=self.index,
                id=event.data['product_id'],
                doc={
                    'name': event.data['name'],
                    'description': event.data['description'],
                    'category': event.data['category'],
                    'tags': event.data.get('tags', []),
                    'updated_at': event.data['updated_at']
                }
            )

        elif event.event_type == "ProductPriceChanged":
            await self.es.update(
                index=self.index,
                id=event.data['product_id'],
                doc={
                    'price': event.data['new_price'],
                    'price_updated_at': event.data['changed_at']
                }
            )

        elif event.event_type == "ProductDeleted":
            await self.es.delete(
                index=self.index,
                id=event.data['product_id']
            )

Template 4: Aggregating Projection

class DailySalesProjection(Projection):
    """Aggregates sales data by day for reporting."""

    def __init__(self, d

---

*Content truncated.*

You might also like

flutter-development

aj-geddes

Build beautiful cross-platform mobile apps with Flutter and Dart. Covers widgets, state management with Provider/BLoC, navigation, API integration, and material design.

1,6851,430

ui-ux-pro-max

nextlevelbuilder

"UI/UX design intelligence. 50 styles, 21 palettes, 50 font pairings, 20 charts, 8 stacks (React, Next.js, Vue, Svelte, SwiftUI, React Native, Flutter, Tailwind). Actions: plan, build, create, design, implement, review, fix, improve, optimize, enhance, refactor, check UI/UX code. Projects: website, landing page, dashboard, admin panel, e-commerce, SaaS, portfolio, blog, mobile app, .html, .tsx, .vue, .svelte. Elements: button, modal, navbar, sidebar, card, table, form, chart. Styles: glassmorphism, claymorphism, minimalism, brutalism, neumorphism, bento grid, dark mode, responsive, skeuomorphism, flat design. Topics: color palette, accessibility, animation, layout, typography, font pairing, spacing, hover, shadow, gradient."

1,2701,335

drawio-diagrams-enhanced

jgtolentino

Create professional draw.io (diagrams.net) diagrams in XML format (.drawio files) with integrated PMP/PMBOK methodologies, extensive visual asset libraries, and industry-standard professional templates. Use this skill when users ask to create flowcharts, swimlane diagrams, cross-functional flowcharts, org charts, network diagrams, UML diagrams, BPMN, project management diagrams (WBS, Gantt, PERT, RACI), risk matrices, stakeholder maps, or any other visual diagram in draw.io format. This skill includes access to custom shape libraries for icons, clipart, and professional symbols.

1,5441,153

godot

bfollington

This skill should be used when working on Godot Engine projects. It provides specialized knowledge of Godot's file formats (.gd, .tscn, .tres), architecture patterns (component-based, signal-driven, resource-based), common pitfalls, validation tools, code templates, and CLI workflows. The `godot` command is available for running the game, validating scripts, importing resources, and exporting builds. Use this skill for tasks involving Godot game development, debugging scene/resource files, implementing game systems, or creating new Godot components.

1,359809

nano-banana-pro

garg-aayush

Generate and edit images using Google's Nano Banana Pro (Gemini 3 Pro Image) API. Use when the user asks to generate, create, edit, modify, change, alter, or update images. Also use when user references an existing image file and asks to modify it in any way (e.g., "modify this image", "change the background", "replace X with Y"). Supports both text-to-image generation and image-to-image editing with configurable resolution (1K default, 2K, or 4K for high resolution). DO NOT read the image file first - use this skill directly with the --input-image parameter.

1,265728

pdf-to-markdown

aliceisjustplaying

Convert entire PDF documents to clean, structured Markdown for full context loading. Use this skill when the user wants to extract ALL text from a PDF into context (not grep/search), when discussing or analyzing PDF content in full, when the user mentions "load the whole PDF", "bring the PDF into context", "read the entire PDF", or when partial extraction/grepping would miss important context. This is the preferred method for PDF text extraction over page-by-page or grep approaches.

1,493684