91
3
Source

ETL pipelines, Apache Spark, data warehousing, and big data processing. Use for building data pipelines, processing large datasets, or data infrastructure.

Install

mkdir -p .claude/skills/data-engineering && curl -L -o skill.zip "https://mcp.directory/api/skills/download/301" && unzip -o skill.zip -d .claude/skills/data-engineering && rm skill.zip

Installs to .claude/skills/data-engineering

About this skill

Data Engineering

Build scalable data pipelines and infrastructure for big data processing.

Quick Start with Apache Spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count

# Initialize Spark
spark = SparkSession.builder \
    .appName("DataProcessing") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Read data
df = spark.read.parquet("s3://bucket/data/")

# Transformations (lazy evaluation)
df_clean = df \
    .filter(col("value") > 0) \
    .groupBy("category") \
    .agg(
        sum("sales").alias("total_sales"),
        avg("price").alias("avg_price"),
        count("*").alias("count")
    ) \
    .orderBy(col("total_sales").desc())

# Write results
df_clean.write \
    .mode("overwrite") \
    .partitionBy("date") \
    .parquet("s3://bucket/output/")

ETL Pipeline with Apache Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'etl_pipeline',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
)

def extract(**context):
    # Extract data from source
    data = fetch_api_data()
    context['task_instance'].xcom_push(key='raw_data', value=data)

def transform(**context):
    # Transform data
    data = context['task_instance'].xcom_pull(key='raw_data')
    cleaned = clean_and_transform(data)
    context['task_instance'].xcom_push(key='clean_data', value=cleaned)

def load(**context):
    # Load to data warehouse
    data = context['task_instance'].xcom_pull(key='clean_data')
    load_to_warehouse(data)

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load,
    dag=dag
)

extract_task >> transform_task >> load_task

Data Warehousing

Star Schema Design

-- Fact Table
CREATE TABLE fact_sales (
    sale_id SERIAL PRIMARY KEY,
    date_key INT REFERENCES dim_date(date_key),
    product_key INT REFERENCES dim_product(product_key),
    customer_key INT REFERENCES dim_customer(customer_key),
    quantity INT,
    revenue DECIMAL(10,2),
    cost DECIMAL(10,2)
);

-- Dimension Table
CREATE TABLE dim_product (
    product_key INT PRIMARY KEY,
    product_id VARCHAR(50),
    product_name VARCHAR(200),
    category VARCHAR(100),
    brand VARCHAR(100)
);

Snowflake Data Warehouse

-- Create warehouse
CREATE WAREHOUSE compute_wh
    WAREHOUSE_SIZE = 'MEDIUM'
    AUTO_SUSPEND = 300
    AUTO_RESUME = TRUE;

-- Load data from S3
COPY INTO sales_table
FROM 's3://bucket/data/'
FILE_FORMAT = (TYPE = 'PARQUET')
ON_ERROR = 'CONTINUE';

-- Clustering
ALTER TABLE sales CLUSTER BY (date, region);

-- Time travel
SELECT * FROM sales AT (OFFSET => -3600);  -- 1 hour ago

Big Data Processing

Spark SQL

# Register as temp view
df.createOrReplaceTempView("sales")

# SQL queries
result = spark.sql("""
    SELECT
        category,
        SUM(sales) as total_sales,
        AVG(price) as avg_price
    FROM sales
    WHERE date >= '2024-01-01'
    GROUP BY category
    HAVING SUM(sales) > 10000
    ORDER BY total_sales DESC
""")

result.show()

Spark Optimization

# Cache in memory
df.cache()

# Repartition
df.repartition(200)

# Broadcast small tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")

# Persist
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)

Stream Processing with Kafka

from kafka import KafkaProducer, KafkaConsumer
import json

# Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

producer.send('topic-name', {'key': 'value'})

# Consumer
consumer = KafkaConsumer(
    'topic-name',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='my-group',
    auto_offset_reset='earliest'
)

for message in consumer:
    process_message(message.value)

Data Quality Validation

import great_expectations as ge

# Load data
df = ge.read_csv('data.csv')

# Define expectations
df.expect_column_values_to_not_be_null('user_id')
df.expect_column_values_to_be_unique('email')
df.expect_column_values_to_be_between('age', 0, 120)
df.expect_column_values_to_match_regex(
    'email',
    r'^[\w\.-]+@[\w\.-]+\.\w+$'
)

# Validate
results = df.validate()
print(results)

Delta Lake (Data Lakehouse)

from delta.tables import DeltaTable

# Write to Delta
df.write.format("delta") \
    .mode("overwrite") \
    .save("/path/to/delta-table")

# Read from Delta
df = spark.read.format("delta").load("/path/to/delta-table")

# ACID transactions
deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

# Upsert (merge)
deltaTable.alias("target") \
    .merge(
        updates.alias("source"),
        "target.id = source.id"
    ) \
    .whenMatchedUpdate(set={"value": "source.value"}) \
    .whenNotMatchedInsert(
        values={"id": "source.id", "value": "source.value"}
    ) \
    .execute()

# Time travel
df = spark.read.format("delta") \
    .option("versionAsOf", 10) \
    .load("/path/to/delta-table")

Best Practices

  1. Incremental processing: Process only new data
  2. Idempotency: Same input produces same output
  3. Data validation: Check quality at every stage
  4. Monitoring: Track pipeline health and performance
  5. Error handling: Retry logic, dead letter queues
  6. Partitioning: Partition large datasets by date/category
  7. Compression: Use Parquet, ORC for storage efficiency

You might also like

flutter-development

aj-geddes

Build beautiful cross-platform mobile apps with Flutter and Dart. Covers widgets, state management with Provider/BLoC, navigation, API integration, and material design.

284790

drawio-diagrams-enhanced

jgtolentino

Create professional draw.io (diagrams.net) diagrams in XML format (.drawio files) with integrated PMP/PMBOK methodologies, extensive visual asset libraries, and industry-standard professional templates. Use this skill when users ask to create flowcharts, swimlane diagrams, cross-functional flowcharts, org charts, network diagrams, UML diagrams, BPMN, project management diagrams (WBS, Gantt, PERT, RACI), risk matrices, stakeholder maps, or any other visual diagram in draw.io format. This skill includes access to custom shape libraries for icons, clipart, and professional symbols.

212415

godot

bfollington

This skill should be used when working on Godot Engine projects. It provides specialized knowledge of Godot's file formats (.gd, .tscn, .tres), architecture patterns (component-based, signal-driven, resource-based), common pitfalls, validation tools, code templates, and CLI workflows. The `godot` command is available for running the game, validating scripts, importing resources, and exporting builds. Use this skill for tasks involving Godot game development, debugging scene/resource files, implementing game systems, or creating new Godot components.

205286

nano-banana-pro

garg-aayush

Generate and edit images using Google's Nano Banana Pro (Gemini 3 Pro Image) API. Use when the user asks to generate, create, edit, modify, change, alter, or update images. Also use when user references an existing image file and asks to modify it in any way (e.g., "modify this image", "change the background", "replace X with Y"). Supports both text-to-image generation and image-to-image editing with configurable resolution (1K default, 2K, or 4K for high resolution). DO NOT read the image file first - use this skill directly with the --input-image parameter.

217234

ui-ux-pro-max

nextlevelbuilder

"UI/UX design intelligence. 50 styles, 21 palettes, 50 font pairings, 20 charts, 8 stacks (React, Next.js, Vue, Svelte, SwiftUI, React Native, Flutter, Tailwind). Actions: plan, build, create, design, implement, review, fix, improve, optimize, enhance, refactor, check UI/UX code. Projects: website, landing page, dashboard, admin panel, e-commerce, SaaS, portfolio, blog, mobile app, .html, .tsx, .vue, .svelte. Elements: button, modal, navbar, sidebar, card, table, form, chart. Styles: glassmorphism, claymorphism, minimalism, brutalism, neumorphism, bento grid, dark mode, responsive, skeuomorphism, flat design. Topics: color palette, accessibility, animation, layout, typography, font pairing, spacing, hover, shadow, gradient."

169198

rust-coding-skill

UtakataKyosui

Guides Claude in writing idiomatic, efficient, well-structured Rust code using proper data modeling, traits, impl organization, macros, and build-speed best practices.

165173

Stay ahead of the MCP ecosystem

Get weekly updates on new skills and servers.