effect-patterns-streams-sinks
Effect-TS patterns for Streams Sinks. Use when working with streams sinks in Effect-TS applications.
Install
mkdir -p .claude/skills/effect-patterns-streams-sinks && curl -L -o skill.zip "https://mcp.directory/api/skills/download/5620" && unzip -o skill.zip -d .claude/skills/effect-patterns-streams-sinks && rm skill.zipInstalls to .claude/skills/effect-patterns-streams-sinks
About this skill
Effect-TS Patterns: Streams Sinks
This skill provides 6 curated Effect-TS patterns for streams sinks. Use this skill when working on tasks related to:
- streams sinks
- Best practices in Effect-TS applications
- Real-world patterns and solutions
🟡 Intermediate Patterns
Sink Pattern 1: Batch Insert Stream Records into Database
Rule: Batch stream records before database operations to improve throughput and reduce transaction overhead.
Good Example:
This example demonstrates streaming user records from a paginated API and batching them for efficient database insertion.
import { Effect, Stream, Sink, Chunk } from "effect";
interface User {
readonly id: number;
readonly name: string;
readonly email: string;
}
interface PaginatedResponse {
readonly users: User[];
readonly nextPage: number | null;
}
// Mock API that returns paginated users
const fetchUserPage = (
page: number
): Effect.Effect<PaginatedResponse> =>
Effect.succeed(
page < 10
? {
users: Array.from({ length: 50 }, (_, i) => ({
id: page * 50 + i,
name: `User ${page * 50 + i}`,
email: `user${page * 50 + i}@example.com`,
})),
nextPage: page + 1,
}
: { users: [], nextPage: null }
).pipe(Effect.delay("10 millis"));
// Mock database insert that takes a batch of users
const insertUserBatch = (
users: readonly User[]
): Effect.Effect<number> =>
Effect.sync(() => {
console.log(`Inserting batch of ${users.length} users`);
return users.length;
}).pipe(Effect.delay("50 millis"));
// Create a stream of users from paginated API
const userStream: Stream.Stream<User> = Stream.paginateEffect(
0,
(page) =>
fetchUserPage(page).pipe(
Effect.map((response) => [
Chunk.fromIterable(response.users),
response.nextPage !== null ? Option.some(response.nextPage) : Option.none(),
])
)
);
// Sink that batches users and inserts them
const batchInsertSink: Sink.Sink<number, never, User> = Sink.fold(
0,
(count, chunk: Chunk.Chunk<User>) =>
Effect.gen(function* () {
const users = Chunk.toArray(chunk);
const inserted = yield* insertUserBatch(users);
return count + inserted;
}),
(count) => Effect.succeed(count)
).pipe(
// Batch into groups of 100 users
Sink.withChunking((chunk) =>
chunk.pipe(
Chunk.chunksOf(100),
Stream.fromIterable,
Stream.runCollect
)
)
);
// Run the stream with batching sink
const program = Effect.gen(function* () {
const totalInserted = yield* userStream.pipe(
Stream.run(batchInsertSink)
);
console.log(`Total users inserted: ${totalInserted}`);
});
Effect.runPromise(program);
This pattern:
- Creates a stream of users from a paginated API
- Defines a batching sink that collects users into groups of 100
- Inserts each batch to the database in a single operation
- Tracks total count of inserted records
The batching happens automatically—the sink collects elements until the batch size is reached, then processes the complete batch.
Rationale:
When consuming a stream of records to persist in a database, collect them into batches using Sink before inserting. This reduces the number of database round-trips and transaction overhead, improving overall throughput significantly.
Inserting records one-by-one is inefficient:
- Each insert is a separate database call (network latency, connection overhead)
- Each insert may be a separate transaction (ACID overhead)
- Resource contention and connection pool exhaustion at scale
Batching solves this by:
- Grouping N records into a single bulk insert operation
- Amortizing database overhead across multiple records
- Maintaining throughput even under backpressure
- Enabling efficient transaction semantics for the entire batch
For example, inserting 10,000 records one-by-one might take 100 seconds. Batching in groups of 100 might take just 2-3 seconds.
Sink Pattern 2: Write Stream Events to Event Log
Rule: Append stream events to an event log with metadata to maintain a complete, ordered record of what happened.
Good Example:
This example demonstrates an event sourcing pattern where a user account stream of events is appended to an event log with metadata.
import { Effect, Stream, Sink, DateTime, Data } from "effect";
// Event types
type AccountEvent =
| AccountCreated
| MoneyDeposited
| MoneyWithdrawn
| AccountClosed;
class AccountCreated extends Data.TaggedError("AccountCreated")<{
readonly accountId: string;
readonly owner: string;
readonly initialBalance: number;
}> {}
class MoneyDeposited extends Data.TaggedError("MoneyDeposited")<{
readonly accountId: string;
readonly amount: number;
}> {}
class MoneyWithdrawn extends Data.TaggedError("MoneyWithdrawn")<{
readonly accountId: string;
readonly amount: number;
}> {}
class AccountClosed extends Data.TaggedError("AccountClosed")<{
readonly accountId: string;
}> {}
// Event envelope with metadata
interface StoredEvent {
readonly eventId: string; // Unique identifier per event
readonly eventType: string; // Type of event
readonly aggregateId: string; // What this event is about
readonly aggregateType: string; // What kind of thing (Account)
readonly data: any; // Event payload
readonly metadata: {
readonly timestamp: number;
readonly version: number; // Position in log
readonly causationId?: string; // What caused this
};
}
// Mock event log that appends events
const eventLog: StoredEvent[] = [];
let eventVersion = 0;
const appendToEventLog = (
event: AccountEvent,
aggregateId: string
): Effect.Effect<StoredEvent> =>
Effect.gen(function* () {
const now = yield* DateTime.now;
const storedEvent: StoredEvent = {
eventId: `evt-${eventVersion}-${Date.now()}`,
eventType: event._tag,
aggregateId,
aggregateType: "Account",
data: event,
metadata: {
timestamp: now.toEpochMillis(),
version: ++eventVersion,
},
};
// Append to log (simulated)
eventLog.push(storedEvent);
console.log(
`[v${storedEvent.metadata.version}] ${storedEvent.eventType}: ${aggregateId}`
);
return storedEvent;
});
// Simulate a stream of events from various account operations
const accountEvents: Stream.Stream<[string, AccountEvent]> = Stream.fromIterable([
[
"acc-1",
new AccountCreated({
accountId: "acc-1",
owner: "Alice",
initialBalance: 1000,
}),
],
["acc-1", new MoneyDeposited({ accountId: "acc-1", amount: 500 })],
["acc-1", new MoneyWithdrawn({ accountId: "acc-1", amount: 200 })],
[
"acc-2",
new AccountCreated({
accountId: "acc-2",
owner: "Bob",
initialBalance: 2000,
}),
],
["acc-2", new MoneyDeposited({ accountId: "acc-2", amount: 1000 })],
["acc-1", new AccountClosed({ accountId: "acc-1" })],
]);
// Sink that appends each event to the log
const eventLogSink: Sink.Sink<number, never, [string, AccountEvent]> = Sink.fold(
0,
(count, [aggregateId, event]) =>
appendToEventLog(event, aggregateId).pipe(
Effect.map(() => count + 1)
),
(count) => Effect.succeed(count)
);
// Run the stream and append all events
const program = Effect.gen(function* () {
const totalEvents = yield* accountEvents.pipe(Stream.run(eventLogSink));
console.log(`\nTotal events appended: ${totalEvents}`);
console.log(`\nEvent log contents:`);
eventLog.forEach((event) => {
console.log(` [v${event.metadata.version}] ${event.eventType}`);
});
});
Effect.runPromise(program);
This pattern:
- Defines event types using tagged errors (AccountCreated, MoneyDeposited, etc.)
- Creates event envelopes with metadata (timestamp, version, causation)
- Streams events from various sources
- Appends to log with proper versioning and ordering
- Maintains history for reconstruction and audit
Rationale:
When consuming a stream of events that represent changes in your system, append each event to an event log using Sink. Event logs provide immutable, ordered records that enable event sourcing, audit trails, and temporal queries.
Event logs are foundational to many patterns:
- Event Sourcing: Instead of storing current state, store the sequence of events that led to it
- Audit Trails: Complete, tamper-proof record of who did what and when
- Temporal Queries: Reconstruct state at any point in time
- Consistency: Single source of truth for what happened
- Replay: Rebuild state or test changes by replaying events
Unlike batch inserts which are transactional, event logs are append-only. Each event is immutable once written. This simplicity enables:
- Fast appends (no updates, just sequential writes)
- Natural ordering (events in write order)
- Easy distribution (replicate the log)
- Strong consistency (events are facts that don't change)
Sink Pattern 4: Send Stream Records to Message Queue
Rule: Stream records to message queues with proper batching and acknowledgment for reliable distributed data flow.
Good Example:
This example demonstrates streaming sensor readings and publishing them to a message queue with topic-based partitioning.
import { Effect, Stream, Sink, Chunk } from "effect";
interface SensorReading {
readonly sensorId: string;
readonly location: string;
readonly temperature: number;
readonly humidity: number;
readonly timestamp: number;
}
// Mock message queue publisher
interface QueuePublisher {
readonly publish: (
topic: string,
partition: string,
messages: readonly SensorReading[]
) => Effect.Effect<{ acknowledged: number; messageIds: string[] }>;
}
// Create a mock queue publisher
const createMockPublisher = (): QueuePublisher => {
const publishedMessages: Record<string, SensorReadin
---
*Content truncated.*
More by PaulJPhilp
View all skills by PaulJPhilp →You might also like
flutter-development
aj-geddes
Build beautiful cross-platform mobile apps with Flutter and Dart. Covers widgets, state management with Provider/BLoC, navigation, API integration, and material design.
drawio-diagrams-enhanced
jgtolentino
Create professional draw.io (diagrams.net) diagrams in XML format (.drawio files) with integrated PMP/PMBOK methodologies, extensive visual asset libraries, and industry-standard professional templates. Use this skill when users ask to create flowcharts, swimlane diagrams, cross-functional flowcharts, org charts, network diagrams, UML diagrams, BPMN, project management diagrams (WBS, Gantt, PERT, RACI), risk matrices, stakeholder maps, or any other visual diagram in draw.io format. This skill includes access to custom shape libraries for icons, clipart, and professional symbols.
ui-ux-pro-max
nextlevelbuilder
"UI/UX design intelligence. 50 styles, 21 palettes, 50 font pairings, 20 charts, 8 stacks (React, Next.js, Vue, Svelte, SwiftUI, React Native, Flutter, Tailwind). Actions: plan, build, create, design, implement, review, fix, improve, optimize, enhance, refactor, check UI/UX code. Projects: website, landing page, dashboard, admin panel, e-commerce, SaaS, portfolio, blog, mobile app, .html, .tsx, .vue, .svelte. Elements: button, modal, navbar, sidebar, card, table, form, chart. Styles: glassmorphism, claymorphism, minimalism, brutalism, neumorphism, bento grid, dark mode, responsive, skeuomorphism, flat design. Topics: color palette, accessibility, animation, layout, typography, font pairing, spacing, hover, shadow, gradient."
godot
bfollington
This skill should be used when working on Godot Engine projects. It provides specialized knowledge of Godot's file formats (.gd, .tscn, .tres), architecture patterns (component-based, signal-driven, resource-based), common pitfalls, validation tools, code templates, and CLI workflows. The `godot` command is available for running the game, validating scripts, importing resources, and exporting builds. Use this skill for tasks involving Godot game development, debugging scene/resource files, implementing game systems, or creating new Godot components.
nano-banana-pro
garg-aayush
Generate and edit images using Google's Nano Banana Pro (Gemini 3 Pro Image) API. Use when the user asks to generate, create, edit, modify, change, alter, or update images. Also use when user references an existing image file and asks to modify it in any way (e.g., "modify this image", "change the background", "replace X with Y"). Supports both text-to-image generation and image-to-image editing with configurable resolution (1K default, 2K, or 4K for high resolution). DO NOT read the image file first - use this skill directly with the --input-image parameter.
fastapi-templates
wshobson
Create production-ready FastAPI projects with async patterns, dependency injection, and comprehensive error handling. Use when building new FastAPI applications or setting up backend API projects.
Related MCP Servers
Browse all serversSecurely join MySQL databases with Read MySQL for read-only query access and in-depth data analysis.
Context Portal: Manage project memory with a database-backed system for decisions, tracking, and semantic search via a k
Dot AI (Kubernetes Deployment) streamlines and automates Kubernetes deployment with intelligent guidance and vector sear
Claude Historian is a free AI search engine offering advanced search, file context, and solution discovery in Claude Cod
Claude Historian: AI-powered search for Claude Code conversations—find files, errors, context, and sessions via JSONL pa
Access Svelte documentation, code analysis, and autofix tools for Svelte 5 & SvelteKit. Improve projects with smart migr
Stay ahead of the MCP ecosystem
Get weekly updates on new skills and servers.