Skip to main content
Sign In
Features

Queues & Run Loops

Use actor-local durable queues for serial run loops and request/response workflows.

What are queues?

  • Realtime: messages are delivered to a live actor as soon as possible.
  • Durable: messages are persisted and survive actor sleep/restart.
  • Request/response: clients can wait for a queue completion response.
  • Scalable: queues absorb large bursts and handle heavy backpressure safely.
  • Local per actor: each actor instance has its own queue storage (scoped by actor key/id).

Queues are commonly referred to as “mailboxes” in other actor frameworks.

What are queues good for?

  • Great for any task that changes actor state.
  • Helps avoid race conditions by handling work in order.
  • Makes complex behavior easier to organize.

Basic queue

This is the default pattern. Define queue names in queues, process them in run, and publish from the client with handle.send(...).

Completable messages

Use this when you want explicit completion/ack semantics but do not need to return data.

  • If processing fails before message.complete(), the message is not acknowledged.
  • Unacknowledged messages are retried, so mutation handlers should be idempotent.
  • status: "timedOut" means sender timeout elapsed before message.complete(...).

Request/reply pattern

Use this when the sender needs data back from queued work.

Queue messages from within an actor

Queueing is useful from inside actor logic too, not just from clients.

  • Use actions as entrypoints, then enqueue into the run loop to keep mutations serialized.
  • You can also call c.queue.send(...) from other parts of run when needed.
  • c.queue.send(...) confirms durable enqueue. It does not wait for processing to finish.

Defining queue schemas

You can define queue types with queue<TMessage, TComplete>() or with schema objects. Schema objects support Standard Schema validators, including Zod.

import { actor, queue, setup } from "rivetkit";
import { z } from "zod";

export const worker = actor({
  state: {},
  queues: {
    // Use generic queue typing when you want compile-time typing only.
    foo: queue<{ id: string }, { ok: true }>(),
    // Use schema objects when you want runtime validation for message and completion payloads.
    bar: {
      message: z.object({ id: z.string() }),
      complete: z.object({ ok: z.boolean() }),
    },
  },
});

export const registry = setup({ use: { worker } });

Pull messages with next

Use next when you want to wait for queue messages.

  • Waits until messages are available unless timeout is hit.
  • Omit timeout to wait indefinitely.
import { actor, queue, setup } from "rivetkit";

export const queueWorker = actor({
  state: {},
  queues: {
    jobs: queue<{ id: string }>(),
  },
  actions: {
    pull: async (c) => {
      const batch = await c.queue.next({
        count: 10,
        timeout: 1_000,
      });

      const oneWithoutTimeout = await c.queue.next({
        count: 1,
      });

      return {
        batchCount: batch.length,
        oneWithoutTimeoutCount: oneWithoutTimeout.length,
      };
    },
  },
});

export const registry = setup({ use: { queueWorker } });

Poll messages

Use tryNext when you need a non-blocking read.

  • Returns immediately and never waits.
import { actor, queue, setup } from "rivetkit";

export const queueWorker = actor({
  state: {},
  queues: {
    jobs: queue<{ id: string }>(),
  },
  actions: {
    poll: async (c) => {
      const immediate = await c.queue.tryNext({
        count: 10,
      });

      return {
        immediateCount: immediate.length,
      };
    },
  },
});

export const registry = setup({ use: { queueWorker } });

Abort signals

Use signal when your receive loop needs external cancellation semantics in addition to actor shutdown behavior.

Multiple queues

Multiple queues let you separate message flows by purpose. By default, receive calls race across all queues when names is not specified. In this pattern, prompt messages run through a streaming loop while stop messages act as control signals on a separate receive path.

Use iter({ names: ["prompt"] }) as the main stream and next({ names: ["stop"] }) as a stop signal.

Sleeping behavior

If an actor has a run handler, it does not sleep while that handler is actively doing work. It only can sleep when the run loop is blocked waiting for queue entries (for example inside iter(...) or next(...)).

This means you can run normal code in run without worrying about sleep interrupting it mid-call.

Debugging

  • GET /inspector/queue?limit=50 returns queue size and pending message metadata.
  • GET /inspector/summary includes queueSize for quick queue health checks.
  • POST /queue/:name with wait: true is useful to verify completable/request-response behavior.
  • In non-dev mode, inspector endpoints require authorization.

Recommendations

  • Actions are for getting data, queue entries are for mutating data.
  • Implement connection auth in onBeforeConnect. See Authentication.
  • Route most state changes through one queue loop so ordering stays predictable.
  • If you need more complex multi-step run loops, consider using workflows.
  • Use c.aborted and c.abortSignal for graceful shutdown and cancellation.
  • Add timeout when callers need bounded wait behavior.
  • Use wait: true only when the caller actually needs a response.