database-migrations-migration-observability

12
0
Source

Migration monitoring, CDC, and observability infrastructure

Install

mkdir -p .claude/skills/database-migrations-migration-observability && curl -L -o skill.zip "https://mcp.directory/api/skills/download/2068" && unzip -o skill.zip -d .claude/skills/database-migrations-migration-observability && rm skill.zip

Installs to .claude/skills/database-migrations-migration-observability

About this skill

Migration Observability and Real-time Monitoring

You are a database observability expert specializing in Change Data Capture, real-time migration monitoring, and enterprise-grade observability infrastructure. Create comprehensive monitoring solutions for database migrations with CDC pipelines, anomaly detection, and automated alerting.

Use this skill when

  • Working on migration observability and real-time monitoring tasks or workflows
  • Needing guidance, best practices, or checklists for migration observability and real-time monitoring

Do not use this skill when

  • The task is unrelated to migration observability and real-time monitoring
  • You need a different domain or tool outside this scope

Context

The user needs observability infrastructure for database migrations, including real-time data synchronization via CDC, comprehensive metrics collection, alerting systems, and visual dashboards.

Requirements

$ARGUMENTS

Instructions

1. Observable MongoDB Migrations

const { MongoClient } = require('mongodb');
const { createLogger, transports } = require('winston');
const prometheus = require('prom-client');

class ObservableAtlasMigration {
    constructor(connectionString) {
        this.client = new MongoClient(connectionString);
        this.logger = createLogger({
            transports: [
                new transports.File({ filename: 'migrations.log' }),
                new transports.Console()
            ]
        });
        this.metrics = this.setupMetrics();
    }

    setupMetrics() {
        const register = new prometheus.Registry();

        return {
            migrationDuration: new prometheus.Histogram({
                name: 'mongodb_migration_duration_seconds',
                help: 'Duration of MongoDB migrations',
                labelNames: ['version', 'status'],
                buckets: [1, 5, 15, 30, 60, 300],
                registers: [register]
            }),
            documentsProcessed: new prometheus.Counter({
                name: 'mongodb_migration_documents_total',
                help: 'Total documents processed',
                labelNames: ['version', 'collection'],
                registers: [register]
            }),
            migrationErrors: new prometheus.Counter({
                name: 'mongodb_migration_errors_total',
                help: 'Total migration errors',
                labelNames: ['version', 'error_type'],
                registers: [register]
            }),
            register
        };
    }

    async migrate() {
        await this.client.connect();
        const db = this.client.db();

        for (const [version, migration] of this.migrations) {
            await this.executeMigrationWithObservability(db, version, migration);
        }
    }

    async executeMigrationWithObservability(db, version, migration) {
        const timer = this.metrics.migrationDuration.startTimer({ version });
        const session = this.client.startSession();

        try {
            this.logger.info(`Starting migration ${version}`);

            await session.withTransaction(async () => {
                await migration.up(db, session, (collection, count) => {
                    this.metrics.documentsProcessed.inc({
                        version,
                        collection
                    }, count);
                });
            });

            timer({ status: 'success' });
            this.logger.info(`Migration ${version} completed`);

        } catch (error) {
            this.metrics.migrationErrors.inc({
                version,
                error_type: error.name
            });
            timer({ status: 'failed' });
            throw error;
        } finally {
            await session.endSession();
        }
    }
}

2. Change Data Capture with Debezium

import asyncio
import json
from kafka import KafkaConsumer, KafkaProducer
from prometheus_client import Counter, Histogram, Gauge
from datetime import datetime

class CDCObservabilityManager:
    def __init__(self, config):
        self.config = config
        self.metrics = self.setup_metrics()

    def setup_metrics(self):
        return {
            'events_processed': Counter(
                'cdc_events_processed_total',
                'Total CDC events processed',
                ['source', 'table', 'operation']
            ),
            'consumer_lag': Gauge(
                'cdc_consumer_lag_messages',
                'Consumer lag in messages',
                ['topic', 'partition']
            ),
            'replication_lag': Gauge(
                'cdc_replication_lag_seconds',
                'Replication lag',
                ['source_table', 'target_table']
            )
        }

    async def setup_cdc_pipeline(self):
        self.consumer = KafkaConsumer(
            'database.changes',
            bootstrap_servers=self.config['kafka_brokers'],
            group_id='migration-consumer',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )

        self.producer = KafkaProducer(
            bootstrap_servers=self.config['kafka_brokers'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

    async def process_cdc_events(self):
        for message in self.consumer:
            event = self.parse_cdc_event(message.value)

            self.metrics['events_processed'].labels(
                source=event.source_db,
                table=event.table,
                operation=event.operation
            ).inc()

            await self.apply_to_target(
                event.table,
                event.operation,
                event.data,
                event.timestamp
            )

    async def setup_debezium_connector(self, source_config):
        connector_config = {
            "name": f"migration-connector-{source_config['name']}",
            "config": {
                "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
                "database.hostname": source_config['host'],
                "database.port": source_config['port'],
                "database.dbname": source_config['database'],
                "plugin.name": "pgoutput",
                "heartbeat.interval.ms": "10000"
            }
        }

        response = requests.post(
            f"{self.config['kafka_connect_url']}/connectors",
            json=connector_config
        )

3. Enterprise Monitoring and Alerting

from prometheus_client import Counter, Gauge, Histogram, Summary
import numpy as np

class EnterpriseMigrationMonitor:
    def __init__(self, config):
        self.config = config
        self.registry = prometheus.CollectorRegistry()
        self.metrics = self.setup_metrics()
        self.alerting = AlertingSystem(config.get('alerts', {}))

    def setup_metrics(self):
        return {
            'migration_duration': Histogram(
                'migration_duration_seconds',
                'Migration duration',
                ['migration_id'],
                buckets=[60, 300, 600, 1800, 3600],
                registry=self.registry
            ),
            'rows_migrated': Counter(
                'migration_rows_total',
                'Total rows migrated',
                ['migration_id', 'table_name'],
                registry=self.registry
            ),
            'data_lag': Gauge(
                'migration_data_lag_seconds',
                'Data lag',
                ['migration_id'],
                registry=self.registry
            )
        }

    async def track_migration_progress(self, migration_id):
        while migration.status == 'running':
            stats = await self.calculate_progress_stats(migration)

            self.metrics['rows_migrated'].labels(
                migration_id=migration_id,
                table_name=migration.table
            ).inc(stats.rows_processed)

            anomalies = await self.detect_anomalies(migration_id, stats)
            if anomalies:
                await self.handle_anomalies(migration_id, anomalies)

            await asyncio.sleep(30)

    async def detect_anomalies(self, migration_id, stats):
        anomalies = []

        if stats.rows_per_second < stats.expected_rows_per_second * 0.5:
            anomalies.append({
                'type': 'low_throughput',
                'severity': 'warning',
                'message': f'Throughput below expected'
            })

        if stats.error_rate > 0.01:
            anomalies.append({
                'type': 'high_error_rate',
                'severity': 'critical',
                'message': f'Error rate exceeds threshold'
            })

        return anomalies

    async def setup_migration_dashboard(self):
        dashboard_config = {
            "dashboard": {
                "title": "Database Migration Monitoring",
                "panels": [
                    {
                        "title": "Migration Progress",
                        "targets": [{
                            "expr": "rate(migration_rows_total[5m])"
                        }]
                    },
                    {
                        "title": "Data Lag",
                        "targets": [{
                            "expr": "migration_data_lag_seconds"
                        }]
                    }
                ]
            }
        }

        response = requests.post(
            f"{self.config['grafana_url']}/api/dashboards/db",
            json=dashboard_config,
            headers={'Authorization': f"Bearer {self.config['grafana_token']}"}
        )

class AlertingSystem:
    def __init__(self, config):
        self.config = config

    async def send_alert(self, title, message, severity, **kwargs):
        if 'slack' in self.config:
            await self.send_slack_alert(title, message, severity)

        if 'email' in self.config:
            await self.send_email_alert(title, message, 

---

*Content truncated.*

mobile-design

sickn33

Mobile-first design and engineering doctrine for iOS and Android apps. Covers touch interaction, performance, platform conventions, offline behavior, and mobile-specific decision-making. Teaches principles and constraints, not fixed layouts. Use for React Native, Flutter, or native mobile apps.

6338

unity-developer

sickn33

Build Unity games with optimized C# scripts, efficient rendering, and proper asset management. Masters Unity 6 LTS, URP/HDRP pipelines, and cross-platform deployment. Handles gameplay systems, UI implementation, and platform optimization. Use PROACTIVELY for Unity performance issues, game mechanics, or cross-platform builds.

9037

frontend-slides

sickn33

Create stunning, animation-rich HTML presentations from scratch or by converting PowerPoint files. Use when the user wants to build a presentation, convert a PPT/PPTX to web, or create slides for a talk/pitch. Helps non-designers discover their aesthetic through visual exploration rather than abstract choices.

8733

fastapi-pro

sickn33

Build high-performance async APIs with FastAPI, SQLAlchemy 2.0, and Pydantic V2. Master microservices, WebSockets, and modern Python async patterns. Use PROACTIVELY for FastAPI development, async optimization, or API architecture.

7131

flutter-expert

sickn33

Master Flutter development with Dart 3, advanced widgets, and multi-platform deployment. Handles state management, animations, testing, and performance optimization for mobile, web, desktop, and embedded platforms. Use PROACTIVELY for Flutter architecture, UI implementation, or cross-platform features.

7030

threejs-skills

sickn33

Three.js skills for creating 3D elements and interactive experiences

8224

You might also like

flutter-development

aj-geddes

Build beautiful cross-platform mobile apps with Flutter and Dart. Covers widgets, state management with Provider/BLoC, navigation, API integration, and material design.

643969

drawio-diagrams-enhanced

jgtolentino

Create professional draw.io (diagrams.net) diagrams in XML format (.drawio files) with integrated PMP/PMBOK methodologies, extensive visual asset libraries, and industry-standard professional templates. Use this skill when users ask to create flowcharts, swimlane diagrams, cross-functional flowcharts, org charts, network diagrams, UML diagrams, BPMN, project management diagrams (WBS, Gantt, PERT, RACI), risk matrices, stakeholder maps, or any other visual diagram in draw.io format. This skill includes access to custom shape libraries for icons, clipart, and professional symbols.

591705

ui-ux-pro-max

nextlevelbuilder

"UI/UX design intelligence. 50 styles, 21 palettes, 50 font pairings, 20 charts, 8 stacks (React, Next.js, Vue, Svelte, SwiftUI, React Native, Flutter, Tailwind). Actions: plan, build, create, design, implement, review, fix, improve, optimize, enhance, refactor, check UI/UX code. Projects: website, landing page, dashboard, admin panel, e-commerce, SaaS, portfolio, blog, mobile app, .html, .tsx, .vue, .svelte. Elements: button, modal, navbar, sidebar, card, table, form, chart. Styles: glassmorphism, claymorphism, minimalism, brutalism, neumorphism, bento grid, dark mode, responsive, skeuomorphism, flat design. Topics: color palette, accessibility, animation, layout, typography, font pairing, spacing, hover, shadow, gradient."

318398

godot

bfollington

This skill should be used when working on Godot Engine projects. It provides specialized knowledge of Godot's file formats (.gd, .tscn, .tres), architecture patterns (component-based, signal-driven, resource-based), common pitfalls, validation tools, code templates, and CLI workflows. The `godot` command is available for running the game, validating scripts, importing resources, and exporting builds. Use this skill for tasks involving Godot game development, debugging scene/resource files, implementing game systems, or creating new Godot components.

339397

nano-banana-pro

garg-aayush

Generate and edit images using Google's Nano Banana Pro (Gemini 3 Pro Image) API. Use when the user asks to generate, create, edit, modify, change, alter, or update images. Also use when user references an existing image file and asks to modify it in any way (e.g., "modify this image", "change the background", "replace X with Y"). Supports both text-to-image generation and image-to-image editing with configurable resolution (1K default, 2K, or 4K for high resolution). DO NOT read the image file first - use this skill directly with the --input-image parameter.

451339

fastapi-templates

wshobson

Create production-ready FastAPI projects with async patterns, dependency injection, and comprehensive error handling. Use when building new FastAPI applications or setting up backend API projects.

304231

Stay ahead of the MCP ecosystem

Get weekly updates on new skills and servers.