caduh

Queues 101 — At‑Least‑Once (and Why “Exactly‑Once” Is a Myth)

5 min read

Design for duplicates and reordering. Use idempotency, an inbox/outbox, and per‑key ordering to get effectively‑once processing in real systems.

TL;DR

  • Networks fail and consumers crash; assume at‑least‑once delivery → you will see duplicates.
  • Get effectively‑once by combining: idempotent handlers, a dedupe store (“inbox”), and producer outbox/idempotency.
  • Ordering isn’t global. You get ordering per partition/key at best. If you need global order, rethink the design—or encode sequence numbers and handle gaps.
  • Acknowledge/commit after durable work. Use DLQs, backoff + jitter, and poison‑pill handling.
  • “Exactly‑once” is marketing outside a closed system. Some platforms (e.g., Kafka EOS) can do atomic offset + write within the platform, but end‑to‑end side effects still require idempotency.

1) Delivery semantics in one picture

  • At‑most‑once: deliver 0–1 times. No retries → fast but you may drop work.
  • At‑least‑once (default in practice): deliver 1–N times. Requires dedupe.
  • Effectively‑once: users observe each business action once, despite duplicates, via idempotency + dedupe + transactions.

Treat “exactly‑once” as effectively‑once under assumptions. The assumptions rarely hold across networks + external systems.


2) Why “exactly‑once” breaks in the wild

  • Crashes between “side effect done” and “offset committed/ack sent.”
  • Retries by brokers/clients after timeouts.
  • At‑least‑once publishers upstream you don’t control.
  • External effects (email, payments, webhooks) that can’t participate in your transaction.
  • Clock/ordering drift: multiple producers write concurrently; global ordering is undefined.

Reality: we engineer idempotency instead of pretending duplicates won’t happen.


3) Producer patterns (publish once, or safely again)

A) Outbox pattern (transactional publish)

  • On the write path, within the same DB transaction that mutates business state, also write an outbox row.
  • A relay job reads outbox rows and publishes to the broker, marking rows as sent. If the relay crashes, it resumes.
-- App TX
BEGIN;
UPDATE orders SET status='paid' WHERE id=$1;
INSERT INTO outbox(id, topic, key, payload) VALUES ($2, 'orders', $1, $3);
COMMIT;

B) Idempotent producer keys

  • Kafka: enable idempotent producer; use stable keys for partitioning (user/order id) to keep per‑key order.
  • SQS FIFO: set MessageGroupId for per‑key order + ContentBasedDeduplication (5‑minute window) or an explicit MessageDeduplicationId you control.
// SQS FIFO example (conceptual)
{
  "QueueName": "orders.fifo",
  "Attributes": {
    "FifoQueue": "true",
    "ContentBasedDeduplication": "true"
  }
}

Dedup windows are finite; still implement consumer‑side idempotency.


4) Consumer patterns (dedupe & idempotency)

A) Inbox table (dedupe ledger)

Store a message_id (or business operation id) per processed message with a unique constraint. Process only if you can insert a new row.

CREATE TABLE inbox (
  msg_id text PRIMARY KEY,
  processed_at timestamptz NOT NULL DEFAULT now()
);
-- Inside a transaction with your domain changes
INSERT INTO inbox (msg_id) VALUES ($1) ON CONFLICT DO NOTHING;
-- Check rows affected: if 0 → we already processed; if 1 → proceed and commit work

B) Idempotent domain writes

Use UPSERTs or natural keys to make the action repeatable:

-- Set a payment as captured exactly once
INSERT INTO payments (id, order_id, amount_cents, status)
VALUES ($id, $order, $amount, 'captured')
ON CONFLICT (id) DO NOTHING; -- duplicates become no‑ops

C) Commit/ack after durable work

  • Kafka: write domain changes (+ inbox) → commit offsets after the DB txn commits.
  • SQS/RabbitMQ: process → ack only after persistence completes. If the consumer dies before ack, the message is redelivered (duplicate).

5) Ordering: what you can rely on

  • Per‑partition order only. Kafka/Kinesis guarantee order within a partition; SQS FIFO guarantees order within a MessageGroupId.
  • To preserve order per entity, key messages by that entity (e.g., order_id).
  • If you need cross‑entity order, encode sequence numbers and handle:
    • Gaps (wait up to N seconds, then proceed).
    • Reordering (buffer small windows).
    • Idempotent merge (latest‑wins updates, version checks).

Shard your workers by key to avoid concurrent processing for the same entity:

// Pseudo: worker pool keyed by order_id
const shard = hash(orderId) % N;
queues[shard].enqueue(msg);

6) Example flows

A) Kafka consumer (TypeScript‑ish, DB transaction + inbox)

for await (const msg of consumer) {
  const msgId = msg.headers["id"] ?? `${msg.topic}:${msg.partition}:${msg.offset}`;
  await db.tx(async t => {
    const inserted = await t.oneOrNone(
      "INSERT INTO inbox (msg_id) VALUES ($1) ON CONFLICT DO NOTHING RETURNING 1",
      [msgId]
    );
    if (!inserted) return; // duplicate → no‑op

    // idempotent domain change (upsert/natural keys)
    await t.none(
      "INSERT INTO shipments (id, order_id, status) VALUES ($1,$2,'created') ON CONFLICT (id) DO NOTHING",
      [msg.headers["shipment_id"], msg.key]
    );
  });
  await consumer.commit(msg); // after DB commit
}

B) SQS FIFO handler (Node)

// Per-message MessageGroupId = order_id for per-key ordering
// Use MessageDeduplicationId as your operation id (e.g., shipment_id)
const params = {
  QueueUrl, MessageBody: JSON.stringify(payload),
  MessageGroupId: orderId, MessageDeduplicationId: shipmentId
};
await sqs.sendMessage(params).promise();

C) Inbox + Outbox together (DB‑centric)

  • Inbox dedupes incoming messages.
  • Outbox guarantees publishing of outgoing events tied to state changes.
  • Run a lightweight relay (poll outbox, publish, mark sent).

7) DLQs, retries, and poison pills

  • Transient failures: retry with exponential backoff + jitter.
  • Permanent/validation errors: don’t spin forever—dead‑letter after N attempts and alert.
  • Keep per‑message attempt count and last_error (header or store).
  • Build a replay tool to re‑enqueue DLQ messages after a fix.

8) Observability & ops

  • Metrics: processed_total, duplicates_total, dlq_total, processing_latency_ms, inflight, backlog_depth.
  • Logs: include msg_id, key, attempt, partition, offset.
  • Traces: one span per message with events for dedupe hit and retry.
  • Run compaction/TTL on the inbox (e.g., keep 7–30 days) if msg_id space is unique per system.

9) Myth vs reality quick table

| Claim | Reality | What to do | |---|---|---| | “Our broker is exactly‑once.” | Maybe within the broker/processor; not across your DB, email, payments, etc. | Idempotency + inbox/outbox + atomic commits | | “We need global ordering.” | Expensive and brittle; most needs are per‑entity order. | Key by entity; encode sequence if needed | | “Dedup is expensive.” | It’s a single INSERT ... ON CONFLICT DO NOTHING. | Add inbox table + TTL/compaction | | “FIFO queues fix everything.” | Help per group; dedup window limited. | Still implement consumer dedupe |


Pitfalls & fast fixes

| Pitfall | Why it hurts | Fix | |---|---|---| | Commit/ack before persistence | Lost work or dup effects | Commit/ack after DB txn | | No dedupe | Double side effects (emails, charges) | Inbox table + idempotent writes | | Global ordering assumption | Throughput cliff | Use per‑key ordering; shard workers | | Long retry storms | Thundering herd | Backoff + jitter; cap attempts; DLQ | | Using timestamps as IDs | Collisions/clock skew | Use UUIDv4/ULID or broker offsets | | Infinite DLQ | Silent data loss | Alert, replay tooling, dashboards |


Quick checklist

  • [ ] Define your id strategy (UUID/ULID or broker offsets) for dedupe.
  • [ ] Add an inbox table with PRIMARY KEY (msg_id) + TTL/compaction.
  • [ ] Make handlers idempotent (UPSERT/natural keys).
  • [ ] Use outbox for publishing events with your DB writes.
  • [ ] Key streams for per‑entity ordering; shard workers by key.
  • [ ] DLQ + retry policy with backoff + jitter; instrument metrics.

One‑minute adoption plan

  1. Add an inbox table and wrap handlers in a DB transaction with INSERT ... ON CONFLICT DO NOTHING.
  2. Switch consumer commits/acks to after successful persistence.
  3. Introduce an outbox in your write path and a relay to publish it.
  4. Choose and enforce per‑key partitioning; cap concurrency per key.
  5. Ship DLQ with retry/backoff and basic dashboards for backlog + duplicates.