
Today we’re releasing Rivet Workflows: a durable execution for TypeScript built in to Rivet Actors.
- Durable & resilient: Progress persists across crashes, deploys, and restarts. Failed steps retry automatically.
- Advanced control flow: Sleep, join, race, rollback, human-in-the-loop, and durable loops
- Durable agents: Build AI agents with tool use, human-in-the-loop, and automatic checkpointing using the AI SDK
- React integration: Stream workflow progress to your frontend in realtime with
useActor - Observable: Built-in workflow inspector for debugging every run
- Permissive open-source: Apache 2.0, runs anywhere: Node.js, Bun, Cloudflare Workers
Show Me The Code
Wrap any multi-step process with workflow() and each step is checkpointed automatically. Crashes, deploys, and restarts pick up where they left off.
import { actor } from "rivetkit";
import { workflow } from "rivetkit/workflow";
const onboarding = actor({
run: workflow(async (ctx) => {
const customerId = await ctx.step("create-customer", () => createCustomer());
await ctx.step("send-welcome-email", () => sendWelcomeEmail(customerId));
await ctx.step("track-signup", () => trackSignup(customerId));
}),
});
async function createCustomer() {
const res = await fetch("https://api.stripe.com/v1/customers", {
method: "POST",
headers: { Authorization: `Bearer ${process.env.STRIPE_KEY}` },
});
return ((await res.json()) as { id: string }).id;
}
async function sendWelcomeEmail(customerId: string) {
await fetch("https://api.sendgrid.com/v3/mail/send", {
method: "POST",
headers: { Authorization: `Bearer ${process.env.SENDGRID_KEY}` },
body: JSON.stringify({ to: "user@example.com", customerId }),
});
}
async function trackSignup(customerId: string) {
await fetch("https://api.posthog.com/capture", {
method: "POST",
body: JSON.stringify({ distinct_id: customerId, event: "signup" }),
});
}
// Pause indefinitely for human approval, then continue where you left off
import { actor, queue } from "rivetkit";
import { type WorkflowContextOf, workflow } from "rivetkit/workflow";
const order = actor({
state: { status: "pending" as string },
queues: {
approval: queue<{ approved: boolean }>(),
},
run: workflow(async (ctx) => {
await ctx.step("validate-order", async () => validateOrderStep(ctx, "order-123"));
// Pauses here until a human approves or rejects. Could be minutes or days.
const decision = await ctx.queue.next("wait-approval");
if (decision.body.approved) {
await ctx.step("fulfill-order", async () => fulfillOrderStep(ctx, "order-123"));
} else {
await ctx.step("cancel-order", async () => cancelOrderStep(ctx, "order-123"));
}
}),
actions: {
getState: (c) => c.state,
},
});
async function validateOrderStep(ctx: WorkflowContextOf<typeof order>, orderId: string) {
await validateOrder(orderId);
ctx.state.status = "awaiting_approval";
}
async function fulfillOrderStep(ctx: WorkflowContextOf<typeof order>, orderId: string) {
await fulfillOrder(orderId);
ctx.state.status = "fulfilled";
}
async function cancelOrderStep(ctx: WorkflowContextOf<typeof order>, orderId: string) {
await cancelOrder(orderId);
ctx.state.status = "cancelled";
}
async function validateOrder(orderId: string) {
const res = await fetch(`https://api.example.com/orders/${orderId}/validate`, {
method: "POST",
});
if (!res.ok) throw new Error("Order validation failed");
}
async function fulfillOrder(orderId: string) {
await fetch(`https://api.example.com/orders/${orderId}/fulfill`, {
method: "POST",
});
}
async function cancelOrder(orderId: string) {
await fetch(`https://api.example.com/orders/${orderId}/cancel`, {
method: "POST",
});
}
// Respond to callers synchronously using completable queues
import { actor, queue } from "rivetkit";
import { type WorkflowLoopContextOf, workflow } from "rivetkit/workflow";
const counter = actor({
state: { count: 0 },
queues: {
requests: queue<{ delta: number }, { count: number }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("request-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-request", {
completable: true,
});
const newCount = await loopCtx.step("apply-delta", async () =>
applyDelta(loopCtx, message.body.delta),
);
// Respond to the caller with the new count
await message.complete({ count: newCount });
});
}),
actions: {
getState: (c) => c.state,
},
});
async function applyDelta(
loopCtx: WorkflowLoopContextOf<typeof counter>,
delta: number,
): Promise<number> {
loopCtx.state.count += delta;
return loopCtx.state.count;
}
// Use durable loops for long-lived workflows that process messages indefinitely
import { actor, queue } from "rivetkit";
import { type WorkflowLoopContextOf, workflow } from "rivetkit/workflow";
const worker = actor({
state: { processed: 0 },
queues: {
tasks: queue<{ url: string }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("task-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-task");
await loopCtx.step("process-task", async () =>
processTaskStep(loopCtx, message.body.url),
);
});
}),
actions: {
getState: (c) => c.state,
},
});
async function processTaskStep(
loopCtx: WorkflowLoopContextOf<typeof worker>,
url: string,
): Promise<void> {
await processTask(url);
loopCtx.state.processed += 1;
}
async function processTask(url: string) {
const res = await fetch(url, { method: "POST" });
if (!res.ok) throw new Error(`Task failed: ${res.status}`);
}
// Sleep for arbitrary durations without consuming compute
import { actor } from "rivetkit";
import { type WorkflowContextOf, workflow } from "rivetkit/workflow";
const emailDrip = actor({
state: { emailsSent: 0 },
run: workflow(async (ctx) => {
await ctx.step("send-welcome", async () => sendEmailStep(ctx, "welcome"));
// Sleep for 3 days without consuming compute
await ctx.sleep("wait-3-days", 3 * 24 * 60 * 60 * 1000);
await ctx.step("send-tips", async () => sendEmailStep(ctx, "tips"));
// Sleep for 7 more days
await ctx.sleep("wait-7-days", 7 * 24 * 60 * 60 * 1000);
await ctx.step("send-offer", async () => sendEmailStep(ctx, "offer"));
}),
actions: {
getState: (c) => c.state,
},
});
async function sendEmailStep(
ctx: WorkflowContextOf<typeof emailDrip>,
template: string,
): Promise<void> {
await sendEmail(template, "user@example.com");
ctx.state.emailsSent += 1;
}
async function sendEmail(template: string, to: string) {
const res = await fetch("https://api.sendgrid.com/v3/mail/send", {
method: "POST",
headers: { Authorization: `Bearer ${process.env.SENDGRID_KEY}` },
body: JSON.stringify({ to, template }),
});
if (!res.ok) throw new Error(`Email send failed: ${res.status}`);
}
// Run independent work in parallel and wait for all branches to complete
import { actor } from "rivetkit";
import { workflow } from "rivetkit/workflow";
const migration = actor({
state: { completed: false },
run: workflow(async (ctx) => {
await ctx.join("update-all", {
users: {
run: async (branchCtx) => {
await branchCtx.step("update-users", () =>
migrateRecords("https://api.example.com/users/migrate"),
);
},
},
orders: {
run: async (branchCtx) => {
await branchCtx.step("update-orders", () =>
migrateRecords("https://api.example.com/orders/migrate"),
);
},
},
products: {
run: async (branchCtx) => {
await branchCtx.step("update-products", () =>
migrateRecords("https://api.example.com/products/migrate"),
);
},
},
});
await ctx.step("mark-complete", async () => {
await fetch("https://api.example.com/migration/complete", { method: "POST" });
ctx.state.completed = true;
});
}),
actions: {
getState: (c) => c.state,
},
});
async function migrateRecords(url: string) {
const res = await fetch(url, { method: "POST" });
if (!res.ok) throw new Error(`Migration failed: ${res.status}`);
}
// Race a queue message against a timeout
import { actor, queue } from "rivetkit";
import { type WorkflowLoopContextOf, workflow } from "rivetkit/workflow";
const auction = actor({
state: { result: null as "sold" | "expired" | null },
queues: {
bids: queue<{ amount: number }>(),
},
run: workflow(async (ctx) => {
await ctx.step("list-item", () => listItem("item-123"));
// First bid wins, or the auction expires after 24 hours
const { winner } = await ctx.race("bid-or-expire", [
{
name: "bid",
run: async (branchCtx) => {
const bid = await branchCtx.queue.next("wait-bid");
return bid.body.amount;
},
},
{
name: "expire",
run: async (branchCtx) => {
await branchCtx.sleep("auction-timeout", 24 * 60 * 60 * 1000);
return 0;
},
},
]);
await ctx.step("finalize", async () => {
await finalizeAuction("item-123", winner);
ctx.state.result = winner === "bid" ? "sold" : "expired";
});
}),
actions: {
getState: (c) => c.state,
},
});
async function listItem(itemId: string) {
await fetch(`https://api.example.com/auctions/${itemId}`, { method: "POST" });
}
async function finalizeAuction(itemId: string, outcome: string) {
await fetch(`https://api.example.com/auctions/${itemId}/finalize`, {
method: "POST",
body: JSON.stringify({ outcome }),
});
}
// Compensating actions run automatically when a later step fails
import { actor, queue } from "rivetkit";
import { workflow } from "rivetkit/workflow";
const checkout = actor({
state: { status: "pending" as string },
queues: {
orders: queue<{ orderId: string }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("checkout-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-order");
await loopCtx.rollbackCheckpoint("checkout-checkpoint");
await loopCtx.step<string>({
name: "reserve-inventory",
run: () => reserveInventory(message.body.orderId),
rollback: releaseInventoryRollback,
});
await loopCtx.step<string>({
name: "charge-card",
run: () => chargeCard(message.body.orderId),
rollback: refundChargeRollback,
});
await loopCtx.step("confirm", async () => {
loopCtx.state.status = "confirmed";
});
});
}),
actions: {
getState: (c) => c.state,
},
});
async function releaseInventoryRollback(
_ctx: WorkflowLoopContextOf<typeof checkout>,
reservationId: string,
): Promise<void> {
await releaseInventory(reservationId);
}
async function refundChargeRollback(
_ctx: WorkflowLoopContextOf<typeof checkout>,
chargeId: string,
): Promise<void> {
await refundCharge(chargeId);
}
async function reserveInventory(orderId: string): Promise<string> {
const res = await fetch("https://api.example.com/inventory/reserve", {
method: "POST",
body: JSON.stringify({ orderId }),
});
return ((await res.json()) as { reservationId: string }).reservationId;
}
async function releaseInventory(reservationId: string) {
await fetch(`https://api.example.com/inventory/${reservationId}/release`, {
method: "POST",
});
}
async function chargeCard(orderId: string): Promise<string> {
const res = await fetch("https://api.stripe.com/v1/charges", {
method: "POST",
headers: { Authorization: `Bearer ${process.env.STRIPE_KEY}` },
body: JSON.stringify({ orderId }),
});
return ((await res.json()) as { id: string }).id;
}
async function refundCharge(chargeId: string) {
await fetch("https://api.stripe.com/v1/refunds", {
method: "POST",
headers: { Authorization: `Bearer ${process.env.STRIPE_KEY}` },
body: JSON.stringify({ charge: chargeId }),
});
}
Example: Durable Agents
Combine workflows with the AI SDK to build AI agents that survive crashes and pick up exactly where they left off. Each tool call is checkpointed, chat history is stored in SQLite, and the agent loops indefinitely waiting for new prompts via a queue.
import { actor, queue } from "rivetkit";
import { db } from "rivetkit/db";
import { type WorkflowLoopContextOf, workflow } from "rivetkit/workflow";
import { generateText, tool } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
const agent = actor({
db: db({
onMigrate: async (db) => {
await db.execute(`
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT NOT NULL
)
`);
},
}),
queues: {
prompts: queue<{ content: string }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("chat-loop", async (loopCtx) => {
// Wait for next user message
const message = await loopCtx.queue.next("wait-prompt");
// Save user message to SQLite
await loopCtx.step("save-user-message", async () =>
saveUserMessage(loopCtx, message.body.content),
);
// Load full chat history
const history = await loopCtx.step("load-history", async () => loadHistory(loopCtx));
// Generate AI response with tool use
const result = await loopCtx.step("generate", async () => generateReply(history));
// Save assistant response to SQLite
await loopCtx.step("save-response", async () =>
saveAssistantResponse(loopCtx, result.text),
);
});
}),
});
async function saveUserMessage(
loopCtx: WorkflowLoopContextOf<typeof agent>,
content: string,
): Promise<void> {
await loopCtx.db.execute(
"INSERT INTO messages (role, content) VALUES (?, ?)",
"user",
content,
);
}
async function loadHistory(
loopCtx: WorkflowLoopContextOf<typeof agent>,
): Promise<Array<{ role: string; content: string }>> {
return await loopCtx.db.execute<{ role: string; content: string }>(
"SELECT role, content FROM messages ORDER BY id",
);
}
async function generateReply(history: Array<{ role: string; content: string }>) {
return await generateText({
model: openai("gpt-5"),
messages: history.map((row) => ({
role: row.role as "user" | "assistant",
content: row.content,
})),
tools: {
getWeather: tool({
description: "Get the weather for a location",
parameters: z.object({ location: z.string() }),
execute: async ({ location }) => `72°F in ${location}`,
}),
},
maxSteps: 5,
});
}
async function saveAssistantResponse(
loopCtx: WorkflowLoopContextOf<typeof agent>,
content: string,
): Promise<void> {
await loopCtx.db.execute(
"INSERT INTO messages (role, content) VALUES (?, ?)",
"assistant",
content,
);
}
React Integration
Stream workflow progress to your frontend in realtime. Broadcast events on every step and render them with useActor.
import { useState, useEffect } from "react";
import { createRivetKit } from "@rivetkit/react";
import type { registry } from "./actors";
const { useActor } = createRivetKit<typeof registry>();
function OrderProgress() {
const { connection, connStatus } = useActor({
name: "orderProcessor",
key: ["order-123"],
});
const [status, setStatus] = useState("pending");
useEffect(() => {
if (!connection) return;
connection.on("statusUpdated", (s: string) => setStatus(s));
}, [connection]);
if (connStatus !== "connected") return <div>Connecting...</div>;
return <p>Order status: {status}</p>;
}
import { actor, event, setup } from "rivetkit";
import { type WorkflowContextOf, workflow } from "rivetkit/workflow";
export const orderProcessor = actor({
state: { status: "pending" as string },
events: {
statusUpdated: event<string>(),
},
run: workflow(async (ctx) => {
await ctx.step("validate", async () => validateAndBroadcast(ctx));
await ctx.step("charge-payment", async () => chargeAndBroadcast(ctx));
await ctx.step("fulfill", async () => fulfillAndBroadcast(ctx));
}),
actions: {
getState: (c) => c.state,
},
});
async function validateAndBroadcast(ctx: WorkflowContextOf<typeof orderProcessor>): Promise<void> {
await fetch("https://api.example.com/orders/validate", { method: "POST" });
ctx.state.status = "validated";
ctx.broadcast("statusUpdated", ctx.state.status);
}
async function chargeAndBroadcast(ctx: WorkflowContextOf<typeof orderProcessor>): Promise<void> {
await fetch("https://api.stripe.com/v1/charges", {
method: "POST",
headers: { Authorization: `Bearer ${process.env.STRIPE_KEY}` },
});
ctx.state.status = "charged";
ctx.broadcast("statusUpdated", ctx.state.status);
}
async function fulfillAndBroadcast(ctx: WorkflowContextOf<typeof orderProcessor>): Promise<void> {
await fetch("https://api.example.com/orders/ship", { method: "POST" });
ctx.state.status = "fulfilled";
ctx.broadcast("statusUpdated", ctx.state.status);
}
export const registry = setup({ use: { orderProcessor } });
No pub/sub service, no polling, no separate WebSocket server. The actor broadcasts events directly to connected clients. The React hook handles connection lifecycle automatically.
Inspector
Every workflow run is fully observable in the built-in inspector. See each step, its status, retries, timing, and output data in a visual DAG, updated in realtime as the workflow executes.

Rivet Actors at Its Core
Rivet Workflows is built directly in to Rivet Actors, a lightweight primitive for stateful workloads. Actors already provide persistent state, queues, and fault tolerance. Workflows add durable replay on top, so you get all of the Rivet Actor primitives for free:
- State with zero latency: No database round trips. Workflow state lives on the same machine as your compute.
- Realtime over WebSockets: Broadcast workflow progress to clients with
c.broadcast(). - SQLite per actor: Structured queries alongside workflow state.
- Scales to zero: Actors hibernate when idle. A workflow sleeping for a week costs nothing.
- Runs anywhere: Node.js, Bun, Cloudflare Workers, Vercel, or your own infrastructure.
Permissive Open-Source License
Rivet Workflows is licensed under Apache 2.0. Use it in production, self-host it, embed it in commercial products. No restrictions.
Other durable execution engines like Inngest (SSPL) and Restate (BSL 1.1) use restrictive licenses that limit self-hosting and commercial use. We believe durable execution should be infrastructure you own, not a dependency you rent.
Get Started
Rivet Workflows is available today in RivetKit.
npm install rivetkit
import { workflow } from "rivetkit/workflow";