databricks-core-workflow-a

0
0
Source

Execute Databricks primary workflow: Delta Lake ETL pipelines. Use when building data ingestion pipelines, implementing medallion architecture, or creating Delta Lake transformations. Trigger with phrases like "databricks ETL", "delta lake pipeline", "medallion architecture", "databricks data pipeline", "bronze silver gold".

Install

mkdir -p .claude/skills/databricks-core-workflow-a && curl -L -o skill.zip "https://mcp.directory/api/skills/download/8873" && unzip -o skill.zip -d .claude/skills/databricks-core-workflow-a && rm skill.zip

Installs to .claude/skills/databricks-core-workflow-a

About this skill

Databricks Core Workflow A: Delta Lake ETL

Overview

Build production Delta Lake ETL pipelines using the medallion architecture (Bronze > Silver > Gold). Uses Auto Loader (cloudFiles) for incremental ingestion, MERGE INTO for upserts, and Delta Live Tables for declarative pipelines.

Prerequisites

  • Completed databricks-install-auth setup
  • Unity Catalog enabled with catalogs/schemas created
  • Access to cloud storage for raw data (S3, ADLS, GCS)

Architecture

Raw Sources (S3/ADLS/GCS)
    │  Auto Loader (cloudFiles)
    ▼
Bronze (raw + metadata)
    │  Cleanse, deduplicate, type-cast
    ▼
Silver (conformed)
    │  Aggregate, join, feature engineer
    ▼
Gold (analytics-ready)

Instructions

Step 1: Bronze Layer — Raw Ingestion with Auto Loader

Auto Loader (cloudFiles format) incrementally processes new files as they arrive. It handles schema inference, evolution, and scales to millions of files.

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, input_file_name, lit

spark = SparkSession.builder.getOrCreate()

# Streaming ingestion with Auto Loader
bronze_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoints/bronze/orders/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load("s3://data-lake/raw/orders/")
)

# Add ingestion metadata
bronze_with_meta = (
    bronze_stream
    .withColumn("_ingested_at", current_timestamp())
    .withColumn("_source_file", input_file_name())
    .withColumn("_source_system", lit("orders-api"))
)

# Write to bronze Delta table
(bronze_with_meta.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/bronze/orders/data")
    .option("mergeSchema", "true")
    .toTable("prod_catalog.bronze.raw_orders"))

Step 2: Silver Layer — Cleansing and Deduplication

Read from Bronze, apply business logic, and MERGE INTO Silver with upsert semantics.

from pyspark.sql.functions import col, trim, lower, to_timestamp, sha2, concat_ws
from delta.tables import DeltaTable

# Read new records from bronze (batch mode for scheduled jobs)
bronze_df = spark.table("prod_catalog.bronze.raw_orders")

# Apply transformations
silver_df = (
    bronze_df
    .withColumn("order_id", col("order_id").cast("string"))
    .withColumn("customer_email", lower(trim(col("customer_email"))))
    .withColumn("order_date", to_timestamp(col("order_date"), "yyyy-MM-dd'T'HH:mm:ss"))
    .withColumn("amount", col("amount").cast("decimal(12,2)"))
    .withColumn("email_hash", sha2(col("customer_email"), 256))
    .filter(col("order_id").isNotNull())
    .dropDuplicates(["order_id"])
)

# Upsert into silver with MERGE
if spark.catalog.tableExists("prod_catalog.silver.orders"):
    target = DeltaTable.forName(spark, "prod_catalog.silver.orders")
    (target.alias("t")
        .merge(silver_df.alias("s"), "t.order_id = s.order_id")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute())
else:
    silver_df.write.format("delta").saveAsTable("prod_catalog.silver.orders")

Step 3: Gold Layer — Business Aggregations

Aggregate Silver data into analytics-ready tables. Use partition-level overwrites for efficient updates.

from pyspark.sql.functions import sum as _sum, count, avg, date_trunc

# Daily order metrics
gold_metrics = (
    spark.table("prod_catalog.silver.orders")
    .withColumn("order_day", date_trunc("day", col("order_date")))
    .groupBy("order_day", "region")
    .agg(
        count("order_id").alias("total_orders"),
        _sum("amount").alias("total_revenue"),
        avg("amount").alias("avg_order_value"),
    )
)

# Overwrite only changed partitions
(gold_metrics.write
    .format("delta")
    .mode("overwrite")
    .option("replaceWhere", f"order_day >= '{target_date}'")
    .saveAsTable("prod_catalog.gold.daily_order_metrics"))

Step 4: Delta Table Maintenance

-- Compact small files (bin-packing)
OPTIMIZE prod_catalog.silver.orders;

-- Z-order for query performance on frequently filtered columns
OPTIMIZE prod_catalog.silver.orders ZORDER BY (order_date, region);

-- Or use Liquid Clustering (DBR 13.3+) — replaces partitioning + Z-order
ALTER TABLE prod_catalog.silver.orders CLUSTER BY (order_date, region);
OPTIMIZE prod_catalog.silver.orders;

-- Clean up old file versions (default: 7 days)
VACUUM prod_catalog.silver.orders RETAIN 168 HOURS;

-- Compute statistics for query optimizer
ANALYZE TABLE prod_catalog.silver.orders COMPUTE STATISTICS;

Step 5: Delta Live Tables (Declarative Pipeline)

DLT manages orchestration, data quality, lineage, and error handling automatically.

import dlt
from pyspark.sql.functions import col, current_timestamp

@dlt.table(
    comment="Raw orders from Auto Loader",
    table_properties={"quality": "bronze"},
)
def bronze_orders():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("s3://data-lake/raw/orders/")
        .withColumn("_ingested_at", current_timestamp())
    )

@dlt.table(comment="Cleansed orders")
@dlt.expect_or_drop("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("valid_amount", "amount > 0")
def silver_orders():
    return (
        dlt.read_stream("bronze_orders")
        .withColumn("amount", col("amount").cast("decimal(12,2)"))
        .dropDuplicates(["order_id"])
    )

@dlt.table(comment="Daily revenue metrics")
def gold_daily_revenue():
    return (
        dlt.read("silver_orders")
        .groupBy("region", "order_date")
        .agg({"amount": "sum", "order_id": "count"})
    )

Step 6: Schedule the Pipeline

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
    CreateJob, Task, NotebookTask, JobCluster, CronSchedule,
)
from databricks.sdk.service.compute import ClusterSpec, AutoScale

w = WorkspaceClient()

job = w.jobs.create(
    name="daily-orders-etl",
    tasks=[
        Task(task_key="bronze", job_cluster_key="etl",
             notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/bronze")),
        Task(task_key="silver", job_cluster_key="etl",
             notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/silver"),
             depends_on=[{"task_key": "bronze"}]),
        Task(task_key="gold", job_cluster_key="etl",
             notebook_task=NotebookTask(notebook_path="/Repos/team/pipelines/gold"),
             depends_on=[{"task_key": "silver"}]),
    ],
    job_clusters=[JobCluster(
        job_cluster_key="etl",
        new_cluster=ClusterSpec(
            spark_version="14.3.x-scala2.12",
            node_type_id="i3.xlarge",
            autoscale=AutoScale(min_workers=1, max_workers=4),
        ),
    )],
    schedule=CronSchedule(quartz_cron_expression="0 0 6 * * ?", timezone_id="UTC"),
    max_concurrent_runs=1,
)
print(f"Created job: {job.job_id}")

Output

  • Bronze layer with raw data, Auto Loader schema evolution, and ingestion metadata
  • Silver layer with cleansed, deduplicated, type-cast data via MERGE upserts
  • Gold layer with business-ready aggregations
  • Table maintenance schedule (OPTIMIZE, VACUUM, ANALYZE)
  • Optional DLT pipeline with built-in data quality expectations

Error Handling

ErrorCauseSolution
AnalysisException: mergeSchemaSource schema changedAuto Loader handles this; for batch add .option("mergeSchema", "true")
ConcurrentAppendExceptionMultiple jobs writing same tableUse MERGE with retry logic or serialize writes via max_concurrent_runs=1
Null primary keyBad source dataAdd @dlt.expect_or_drop or .filter(col("pk").isNotNull())
java.lang.OutOfMemoryErrorDriver collecting large resultsNever call .collect() on large data; use .write to keep distributed
VACUUM below retentionRetention < 7 daysSet delta.deletedFileRetentionDuration = '168 hours' minimum

Examples

Quick Pipeline Validation

-- Verify row counts flow through medallion layers
SELECT 'bronze' AS layer, COUNT(*) AS rows FROM prod_catalog.bronze.raw_orders
UNION ALL SELECT 'silver', COUNT(*) FROM prod_catalog.silver.orders
UNION ALL SELECT 'gold', COUNT(*) FROM prod_catalog.gold.daily_order_metrics;

Resources

Next Steps

For ML workflows, see databricks-core-workflow-b.

d2-diagram-creator

jeremylongshore

D2 Diagram Creator - Auto-activating skill for Visual Content. Triggers on: d2 diagram creator, d2 diagram creator Part of the Visual Content skill category.

6532

svg-icon-generator

jeremylongshore

Svg Icon Generator - Auto-activating skill for Visual Content. Triggers on: svg icon generator, svg icon generator Part of the Visual Content skill category.

9029

automating-mobile-app-testing

jeremylongshore

This skill enables automated testing of mobile applications on iOS and Android platforms using frameworks like Appium, Detox, XCUITest, and Espresso. It generates end-to-end tests, sets up page object models, and handles platform-specific elements. Use this skill when the user requests mobile app testing, test automation for iOS or Android, or needs assistance with setting up device farms and simulators. The skill is triggered by terms like "mobile testing", "appium", "detox", "xcuitest", "espresso", "android test", "ios test".

15922

performing-penetration-testing

jeremylongshore

This skill enables automated penetration testing of web applications. It uses the penetration-tester plugin to identify vulnerabilities, including OWASP Top 10 threats, and suggests exploitation techniques. Use this skill when the user requests a "penetration test", "pentest", "vulnerability assessment", or asks to "exploit" a web application. It provides comprehensive reporting on identified security flaws.

4915

designing-database-schemas

jeremylongshore

Design and visualize efficient database schemas, normalize data, map relationships, and generate ERD diagrams and SQL statements.

12014

ollama-setup

jeremylongshore

Configure auto-configure Ollama when user needs local LLM deployment, free AI alternatives, or wants to eliminate hosted API costs. Trigger phrases: "install ollama", "local AI", "free LLM", "self-hosted AI", "replace OpenAI", "no API costs". Use when appropriate context detected. Trigger with relevant phrases based on skill purpose.

5110

You might also like

flutter-development

aj-geddes

Build beautiful cross-platform mobile apps with Flutter and Dart. Covers widgets, state management with Provider/BLoC, navigation, API integration, and material design.

1,4071,302

drawio-diagrams-enhanced

jgtolentino

Create professional draw.io (diagrams.net) diagrams in XML format (.drawio files) with integrated PMP/PMBOK methodologies, extensive visual asset libraries, and industry-standard professional templates. Use this skill when users ask to create flowcharts, swimlane diagrams, cross-functional flowcharts, org charts, network diagrams, UML diagrams, BPMN, project management diagrams (WBS, Gantt, PERT, RACI), risk matrices, stakeholder maps, or any other visual diagram in draw.io format. This skill includes access to custom shape libraries for icons, clipart, and professional symbols.

1,2201,024

ui-ux-pro-max

nextlevelbuilder

"UI/UX design intelligence. 50 styles, 21 palettes, 50 font pairings, 20 charts, 8 stacks (React, Next.js, Vue, Svelte, SwiftUI, React Native, Flutter, Tailwind). Actions: plan, build, create, design, implement, review, fix, improve, optimize, enhance, refactor, check UI/UX code. Projects: website, landing page, dashboard, admin panel, e-commerce, SaaS, portfolio, blog, mobile app, .html, .tsx, .vue, .svelte. Elements: button, modal, navbar, sidebar, card, table, form, chart. Styles: glassmorphism, claymorphism, minimalism, brutalism, neumorphism, bento grid, dark mode, responsive, skeuomorphism, flat design. Topics: color palette, accessibility, animation, layout, typography, font pairing, spacing, hover, shadow, gradient."

9001,013

godot

bfollington

This skill should be used when working on Godot Engine projects. It provides specialized knowledge of Godot's file formats (.gd, .tscn, .tres), architecture patterns (component-based, signal-driven, resource-based), common pitfalls, validation tools, code templates, and CLI workflows. The `godot` command is available for running the game, validating scripts, importing resources, and exporting builds. Use this skill for tasks involving Godot game development, debugging scene/resource files, implementing game systems, or creating new Godot components.

958658

nano-banana-pro

garg-aayush

Generate and edit images using Google's Nano Banana Pro (Gemini 3 Pro Image) API. Use when the user asks to generate, create, edit, modify, change, alter, or update images. Also use when user references an existing image file and asks to modify it in any way (e.g., "modify this image", "change the background", "replace X with Y"). Supports both text-to-image generation and image-to-image editing with configurable resolution (1K default, 2K, or 4K for high resolution). DO NOT read the image file first - use this skill directly with the --input-image parameter.

970608

pdf-to-markdown

aliceisjustplaying

Convert entire PDF documents to clean, structured Markdown for full context loading. Use this skill when the user wants to extract ALL text from a PDF into context (not grep/search), when discussing or analyzing PDF content in full, when the user mentions "load the whole PDF", "bring the PDF into context", "read the entire PDF", or when partial extraction/grepping would miss important context. This is the preferred method for PDF text extraction over page-by-page or grep approaches.

1,033496

Stay ahead of the MCP ecosystem

Get weekly updates on new skills and servers.