Reflow Documentation

Durable workflow execution for TypeScript. Define multi-step workflows with full type safety, automatic retries, and crash recovery - powered by SQLite, no infrastructure required.

import { createWorkflow, createEngine } from 'reflow-ts'
import { SQLiteStorage } from 'reflow-ts/sqlite-node'
import { z } from 'zod' // or valibot, arktype, etc.

const pipeline = createWorkflow({
  name: 'process-content',
  input: z.object({ url: z.string() }),
})
  .step('scrape', async ({ input }) => {
    const page = await fetchPage(input.url)
    return { content: page.text }
  })
  .step('summarize', {
    retry: { maxAttempts: 3, backoff: 'exponential' },
    handler: async ({ prev }) => {
      const summary = await llm(prev.content)
      return { summary }
    },
  })
  .step('store', async ({ input, prev }) => {
    await db.insert({ url: input.url, ...prev })
  })

const storage = new SQLiteStorage('./reflow.db')
const engine = createEngine({ storage, workflows: [pipeline] })
await engine.start()

await engine.enqueue('process-content', {
  url: 'https://example.com/article',
})

Install

# Bun (uses built-in bun:sqlite - no native dependencies)
bun add reflow-ts

# Node.js (requires better-sqlite3)
npm install reflow-ts better-sqlite3

Then pick a storage adapter based on your runtime:

// Bun - zero native deps
import { SQLiteStorage } from 'reflow-ts/sqlite-bun'
const storage = new SQLiteStorage('./reflow.db')

// Node.js - uses better-sqlite3
import { SQLiteStorage } from 'reflow-ts/sqlite-node'
const storage = new SQLiteStorage('./reflow.db')

Reflow uses Standard Schema for input validation, so you can bring any compatible library:

bun add zod        # or
bun add valibot    # or
bun add arktype    # or any Standard Schema-compatible library

The Problem

You have a multi-step operation - a signup, an import, an AI pipeline. You write it as a normal async function:

app.post('/signup', async (req, res) => {
  await createAccount(req.body)     // done
  await chargeStripe(req.body)      // done
  // process crashes, deploy happens, laptop sleeps
  await sendWelcomeEmail(req.body)  // never runs
})

Now the user is charged but never got their welcome email. Worse - you don't know which steps completed. Do you re-run everything? Then they get double-charged.

The usual fix is to build manual checkpoint logic: state columns, retry loops, deduplication. That's 200 lines of infrastructure code that's hard to test and easy to get wrong.

Reflow makes each step durable. If the process crashes after step 2 of 5, a new engine instance can reclaim the stale run after its lease expires and pick up at step 3. Active workers heartbeat their lease while they run, completed steps are never re-executed, and each step's output is persisted in SQLite - no external services required.

Who Is This For

Solo devs and small teams who need reliable multi-step workflows but don't want to run Temporal clusters or pay for cloud workflow services.

  • AI pipelines - LLM calls that cost money. Don't re-run a $0.12 call because the next step failed
  • SaaS apps - Background jobs that must complete: signup flows, billing, provisioning
  • CLI tools - Long-running imports or migrations that should resume after interruption
ReflowTemporalInngest
InfrastructureNone (SQLite file)Temporal Server + DBCloud service
Type safetyFull end-to-endPartialPartial
Setupbun add reflow-tsCluster deploymentAccount + SDK
Best forSingle-process apps, CLIs, AI agentsLarge distributed systemsServerless

Don't use Reflow when:

  • You need distributed execution across multiple machines
  • You need sub-second latency on workflow dispatch
  • You're already running Temporal or similar

Workflows

A workflow is a named sequence of steps with a validated input schema. Any Standard Schema-compatible library works (Zod, Valibot, ArkType, etc.).

const workflow = createWorkflow({
  name: 'send-welcome',
  input: z.object({ userId: z.string(), email: z.email() }),
})
  .step('create-account', async ({ input }) => {
    // input is typed as { userId: string, email: string }
    return { accountId: await createAccount(input.userId) }
  })
  .step('send-email', async ({ prev, input, signal }) => {
    // prev is typed as { accountId: string }
    // input is still available
    // signal is aborted on cancellation / timeout
    await sendEmail(input.email, 'Welcome!', { signal })
  })

Each .step() receives:

  • input - the validated workflow input (same for every step)
  • prev - the return value of the previous step (undefined for the first step)
  • signal - an AbortSignal that is aborted when the run is cancelled, its lease is lost, or the step times out

The builder is immutable - each .step() returns a new workflow instance, so you can safely branch:

const base = createWorkflow({ name: 'base', input: z.object({}) })
const withLogging = base.step('log', async () => { })
const withMetrics = base.step('metric', async () => { })
// base, withLogging, and withMetrics are all independent

Engine

The engine connects workflows to storage and handles execution.

const storage = new SQLiteStorage('./workflows.db')
const engine = createEngine({
  storage,
  workflows: [orderWorkflow, emailWorkflow],
})

// Start polling (processes runs continuously)
// start() auto-initializes storage — no need to call storage.initialize()
await engine.start(1000) // poll every 1000ms (default)

// Enqueue runs as work arrives
const run = await engine.enqueue('order-fulfillment', {
  orderId: 'ORD_1',
  amount: 100,
})

// Or process one run manually (call storage.initialize() first if not using start())
await engine.tick()

// Stop polling (waits for in-flight work to finish)
await engine.stop()

By default, claimed runs use a 30_000ms lease. If a worker crashes and stops updating a run, a later tick() can reclaim it after that lease expires:

const engine = createEngine({
  storage,
  workflows: [orderWorkflow],
  runLeaseDurationMs: 30_000,
  heartbeatIntervalMs: 10_000,
})

The engine heartbeats active runs while they execute so long-running steps do not get reclaimed before they finish.

enqueue() is fully type-safe - it only accepts registered workflow names and their corresponding input types:

engine.enqueue('order-fulfillment', { orderId: 'x', amount: 1 }) // OK
engine.enqueue('order-fulfillment', { wrong: 'shape' })          // Type error
engine.enqueue('nonexistent', {})                                 // Type error

If callers may retry enqueue(), give the run an idempotency key:

const run = await engine.enqueue(
  'order-fulfillment',
  { orderId: 'ORD_1', amount: 100 },
  { idempotencyKey: 'checkout:ORD_1' },
)

Reusing the same idempotency key for the same workflow returns the existing run instead of creating a duplicate. Reusing it with different input throws.

Retry

Steps can be configured with automatic retry and backoff:

.step('call-api', {
  retry: {
    maxAttempts: 5,
    backoff: 'exponential', // or 'linear'
    initialDelayMs: 200,    // 200ms, 400ms, 800ms, 1600ms...
  },
  handler: async ({ input }) => {
    const response = await fetch('https://api.example.com')
    if (!response.ok) throw new Error('API error')
    return await response.json()
  },
})

Without retry config, a failing step immediately fails the entire workflow run.

Failure Handling

Attach an onFailure handler for compensation logic (saga pattern):

const workflow = createWorkflow({
  name: 'transfer',
  input: schema,
})
  .step('debit', async ({ input }) => {
    return await debitAccount(input.from, input.amount)
  })
  .step('credit', async ({ input }) => {
    return await creditAccount(input.to, input.amount)
  })
  .onFailure(async ({ error, stepName, input }) => {
    if (stepName === 'credit') {
      // Debit succeeded but credit failed - reverse the debit
      await creditAccount(input.from, input.amount)
    }
    await notifyOps('Transfer failed: ' + error.message)
  })

Error Handling

There are three ways to handle errors in Reflow, and they serve different purposes:

Recover inside the step

Use a try/catch when you can handle the error and continue the workflow. Return a fallback value so the next step still runs.

.step('fetch', async ({ input }) => {
  try {
    return await callAPI(input.url)
  } catch {
    return { data: null, failed: true }
  }
})

Retry transient failures

For rate limits, network blips, and flaky APIs - let the error throw and configure retry. If it exhausts all attempts, the run fails.

.step('charge', {
  retry: { maxAttempts: 3, backoff: 'exponential' },
  handler: async ({ input }) => {
    return await stripe.charge(input.amount)
  },
})

Clean up with onFailure

Use onFailure for compensation after the workflow has already failed. It does not recover the run - it cleans up side effects (saga pattern).

.onFailure(async ({ error, stepName, input }) => {
  if (stepName === 'charge') {
    await refundAccount(input.userId)
  }
})

Most workflows combine all three: try/catch for expected errors, retry for transient failures, and onFailure for rollback.

Crash Recovery

Reflow automatically resumes workflows from the last completed step. If your process crashes after step 2 of 5, a later engine instance can reclaim the stale running run after runLeaseDurationMs and continue at step 3 - completed steps are never re-executed.

// Process crashes here after 'charge' completed but before 'fulfill'
// On restart, the engine claims the run and skips 'charge'
await engine.start()

This is especially valuable for AI pipelines where each LLM call costs money. If the store step fails after two successful LLM calls, Reflow resumes from the store step - the LLM calls are not repeated.

Concurrency

By default, the engine processes one run at a time. Set concurrency to process multiple runs in parallel per tick:

const engine = createEngine({
  storage,
  workflows: [orderWorkflow],
  concurrency: 5, // Process up to 5 runs in parallel per tick
})

With concurrency: 5, each tick claims up to 5 pending runs and executes them concurrently. Steps within a single run still execute sequentially.

Cancellation

Cancel pending or running workflows:

const run = await engine.enqueue('order-fulfillment', {
  orderId: 'ORD_1',
  amount: 100,
})

const cancelled = await engine.cancel(run.id)
// true if cancelled, false if already completed/failed/cancelled

Cancellation aborts the current step's AbortSignal immediately and prevents later steps from starting. If your step handler cooperates with the signal, cancellation stops it cleanly:

.step('fetch-profile', async ({ input, signal }) => {
  const response = await fetch(
    'https://api.example.com/users/' + input.userId,
    { signal },
  )
  return await response.json()
})

To see which steps completed before cancellation, use getRunStatus():

await engine.cancel(run.id)

const info = await engine.getRunStatus(run.id)
// info.run.status === 'cancelled'
// info.steps contains all completed step results and their outputs

Scheduled Workflows

Enqueue workflows on a recurring interval:

// Enqueue a cleanup workflow every hour
const scheduleId = engine.schedule(
  'cleanup',
  { olderThanDays: 30 },
  60 * 60 * 1000,
)

// Stop the schedule
engine.unschedule(scheduleId)

// await engine.stop() also clears all schedules

Step Timeouts

Prevent steps from hanging indefinitely:

.step('call-external-api', {
  timeoutMs: 5000, // Fail after 5 seconds
  handler: async ({ input }) => {
    return await fetch('https://slow-api.example.com')
  },
})

Timeouts can also be set via the retry config:

.step('flaky-service', {
  retry: {
    maxAttempts: 3,
    backoff: 'exponential',
    initialDelayMs: 500,
    timeoutMs: 10000, // Each attempt times out after 10s
  },
  handler: async ({ input }) => { },
})

Step-level timeoutMs takes precedence over retry.timeoutMs.

Hooks

Add observability with lifecycle hooks:

const engine = createEngine({
  storage,
  workflows: [orderWorkflow],
  hooks: {
    onStepComplete: ({ runId, stepName, output, attempts }) => {
      console.log(stepName + ' completed in ' + attempts + ' attempt(s)')
    },
    onRunComplete: ({ runId, workflow }) => {
      metrics.increment('workflow.completed', { workflow })
    },
    onRunFailed: ({ runId, workflow, stepName, error }) => {
      alerting.notify(workflow + ' failed at ' + stepName)
    },
    onError: (error) => {
      // Fires on background failures (scheduled enqueues, poll cycles)
      console.error('Engine error:', error)
    },
  },
})

Storage

Reflow ships with three storage adapters:

SQLiteStorage - for Bun runtime. Uses the built-in bun:sqlite module with zero native dependencies.

import { SQLiteStorage } from 'reflow-ts/sqlite-bun'

const storage = new SQLiteStorage('./workflows.db')

SQLiteStorage - for Node.js. Uses better-sqlite3 (native addon). Persists to disk, uses WAL mode.

import { SQLiteStorage } from 'reflow-ts/sqlite-node'

const storage = new SQLiteStorage('./workflows.db')

MemoryStorage - used internally by the test helper. For custom use, import from reflow-ts/test.

You can implement your own adapter by conforming to the StorageAdapter interface:

interface StorageAdapter {
  initialize(): Promise<void>
  createRun(run: WorkflowRun): Promise<CreateRunResult>
  claimNextRun(
    workflowNames: readonly string[],
    staleBefore?: number,
  ): Promise<ClaimedRun | null>
  heartbeatRun(runId: string, leaseId: string): Promise<boolean>
  getRun(runId: string): Promise<WorkflowRun | null>
  getStepResults(runId: string): Promise<StepResult[]>
  saveStepResult(
    result: StepResult,
    leaseId?: string,
  ): Promise<boolean>
  updateRunStatus(
    runId: string,
    status: RunStatus,
  ): Promise<boolean>
  updateClaimedRunStatus(
    runId: string,
    leaseId: string,
    status: RunStatus,
  ): Promise<boolean>
}

Persisted workflow input and step output must be JSON-compatible data: plain objects, arrays, strings, numbers, booleans, null, and undefined.

Deployment

The SQLite file is Reflow's source of truth - not a cache. Each step's result is written to disk before moving to the next step. When your process restarts, the new engine reads the file and resumes incomplete runs.

This works out of the box on any deployment with a persistent filesystem: a VPS (DigitalOcean, Hetzner, EC2), a dedicated server, or your local machine. Process restarts don't touch the filesystem.

Ephemeral containers (vanilla Docker, Fly Machines, Railway, Lambda) get a fresh filesystem on each deploy. Mount a persistent volume so the SQLite file survives:

# Docker
docker run -v /data:/app/data myapp

# fly.toml
[mounts]
  source = "reflow_data"
  destination = "/data"
// Point Reflow at the mounted volume
const storage = new SQLiteStorage('/data/workflows.db')  // Bun
const storage = new SQLiteStorage('/data/workflows.db')     // Node.js

This is the same requirement any SQLite-based app has. It's one config line, not infrastructure.

Testing

Reflow includes a test helper that runs workflows synchronously and returns typed results:

import { testEngine } from 'reflow-ts/test'

const te = testEngine({ workflows: [orderWorkflow] })

const result = await te.run('order-fulfillment', {
  orderId: 'test',
  amount: 100,
})

result.status              // 'completed' | 'failed'
result.steps.charge.output // { chargeId: string } - fully typed
result.steps.charge.status // 'completed' | 'failed'
result.steps.charge.error  // string | null

Use it in your test suite:

import { describe, it, expect } from 'vitest'
import { testEngine } from 'reflow-ts/test'

describe('order workflow', () => {
  it('charges and fulfills', async () => {
    const te = testEngine({ workflows: [orderWorkflow] })
    const result = await te.run('order-fulfillment', {
      orderId: 'ORD_1',
      amount: 100,
    })

    expect(result.status).toBe('completed')
    expect(result.steps.charge.output.chargeId).toBeTruthy()
  })
})

Type Safety

Reflow tracks types through the entire workflow chain:

  • Workflow name is a string literal type ('order-fulfillment', not string)
  • Input is validated by your schema library and inferred at the type level
  • Step chaining - each step's prev is typed as the return value of the previous step
  • Engine - enqueue() only accepts registered workflow names with matching input
  • Test engine - run() returns typed step results keyed by step name
// These are all compile-time errors, not runtime surprises:
engine.enqueue('typo', {})                    // not a registered workflow
engine.enqueue('order-fulfillment', {})       // missing required fields
workflow.step('x', async ({ prev }) => {
  prev.nonexistent                            // property doesn't exist
})

API Reference

function

createWorkflow(config)

Creates a new workflow builder.

createWorkflow({ name: 'my-workflow', input: schema })
name string Unique workflow name (becomes a literal type)
input StandardSchemaV1 Any Standard Schema-compatible schema (Zod, Valibot, ArkType, etc.)
Returns Workflow with .step(), .onFailure(), and .parseInput() methods.
method

workflow.step(name, handler | config)

Adds a step to the workflow. Accepts either a handler function or a config object.

// Simple handler
.step('name', async ({ input, prev, signal }) => {
  return { result: 'value' }
})

// Config with retry
.step('name', {
  retry: { maxAttempts: 3, backoff: 'exponential' },
  timeoutMs: 5000,
  handler: async ({ input, prev, signal }) => {
    return { result: 'value' }
  },
})

Handler context

input TInput The validated workflow input (same for every step)
prev TPrev Return value of the previous step (undefined for first step)
signal AbortSignal Aborted on cancellation, lease loss, or timeout

Config options

handler (ctx) => Promise<T> Step handler function
retry RetryConfig Optional. See retry config below
timeoutMs number Optional. Timeout per attempt in milliseconds

RetryConfig

maxAttempts number Maximum number of attempts. Default: 1 (no retry)
backoff 'linear' | 'exponential' Backoff strategy between retries
initialDelayMs number Base delay in ms. Default: 1000
timeoutMs number Timeout per attempt. Step-level timeoutMs takes precedence
Returns a new Workflow with the step appended. The builder is immutable.
method

workflow.onFailure(handler)

Attaches a failure handler for compensation logic (saga pattern). Called when a step fails after exhausting all retry attempts.

.onFailure(async ({ error, stepName, input }) => {
  // compensate or alert
})
error Error The error that caused the failure
stepName string Which step failed
input TInput The original workflow input
function

createEngine(config)

Creates an engine that connects workflows to storage and handles execution.

const engine = createEngine({
  storage,
  workflows: [pipeline],
  concurrency: 5,
  runLeaseDurationMs: 30_000,
})
storage StorageAdapter Storage backend. Required
workflows Workflow[] Workflows to register. Required
hooks EngineHooks Lifecycle hooks: onStepComplete, onRunComplete, onRunFailed, onError
concurrency number Runs to process in parallel per tick. Default: 1
runLeaseDurationMs number How long a run stays claimed before another engine may reclaim it. Default: 30000
heartbeatIntervalMs number How often the worker renews its lease. Default: leaseDuration / 3
Returns an Engine with the methods below.
async

engine.start(pollIntervalMs?)

Initializes storage and starts the polling loop. Call this once at startup, then enqueue() work as it arrives.

pollIntervalMs number Polling interval in ms. Default: 1000
Returns Promise<void>
async

engine.stop()

Stops the polling loop, clears all schedules, and waits for any in-flight tick to finish.

Returns Promise<void>
async

engine.tick()

Claims up to concurrency pending or stale runs and executes them in parallel. Useful for CLI tools or tests where you want explicit control instead of polling.

Returns Promise<void>
async

engine.enqueue(name, input, options?)

Submits a workflow run. Type-safe - only accepts registered workflow names with their corresponding input types.

const run = await engine.enqueue('process-content', {
  url: 'https://example.com/article',
})

Options

idempotencyKey string Prevents duplicate runs. Same key + same input returns existing run. Same key + different input throws
Returns Promise<WorkflowRun> with run.id as the unique identifier.
async

engine.cancel(runId)

Cancels a pending or running workflow. Aborts the current step's AbortSignal immediately and prevents later steps from starting.

Returns Promise<boolean> - true if cancelled, false if already completed/failed/cancelled.
sync

engine.schedule(name, input, intervalMs)

Enqueues a workflow run on a recurring interval.

const scheduleId = engine.schedule('cleanup', { days: 30 }, 60 * 60 * 1000)
engine.unschedule(scheduleId)
Returns string schedule ID. Cancel with engine.unschedule(scheduleId). Cleared automatically by engine.stop().
async

engine.getRunStatus(runId)

Query the status of any run and its step results.

const info = await engine.getRunStatus(run.id)
// info.run.status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
// info.steps: StepResult[] with output, error, and attempt count
Returns Promise<RunInfo | null>
function

testEngine(config)

Creates a test engine with in-memory storage for synchronous workflow execution. Same config as createEngine minus storage.

import { testEngine } from 'reflow-ts/test'

const te = testEngine({ workflows: [pipeline] })
const result = await te.run('process-content', { url: '...' })
result.status              // 'completed' | 'failed'
result.steps.scrape.output // fully typed
Returns a test engine with a .run() method that returns typed step results keyed by step name.
class

SQLiteStorage(path)

SQLite storage adapter for Bun runtime. Uses the built-in bun:sqlite module - no native dependencies required.

import { SQLiteStorage } from 'reflow-ts/sqlite-bun'

const storage = new SQLiteStorage('./workflows.db')
await storage.initialize()
class

SQLiteStorage(path)

SQLite storage adapter for Node.js. Uses better-sqlite3 (native addon). WAL mode and transactional claiming.

import { SQLiteStorage } from 'reflow-ts/sqlite-node'

const storage = new SQLiteStorage('./workflows.db')
await storage.initialize()
interface

StorageAdapter

Interface for implementing custom storage backends. See the Storage section for the full interface definition and method signatures.