Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.photon.codes/docs/llms.txt

Use this file to discover all available pages before exploring further.

The server keeps a durable event log for message, chat, group, and poll changes. Every event has an increasing sequence. Location updates are not in this log. They are delivered only through im.locations.watch(...). Updates missed while disconnected cannot be recovered with catchUp(...).

What You Can Do

NeedUse this when
Recover missed eventsYour process starts or reconnects and needs missed message, chat, group, or poll events
Store a checkpointYou need to resume from the last successfully processed event
Consume live streamsYou want to read SDK streams with for await or .on(...)
Derive streamsYou want to split streams with .filter(...), .map(...), or .take(...)

Recovery Flow

In production, use this order:
  1. Read lastHandledSequence from your database.
  2. Immediately open the live subscribeEvents(...) streams you need.
  3. At the same time, call im.events.catchUp(lastHandledSequence) to replay missed events.
  4. Feed live events and catch-up events into the same bounded-concurrency queue.
  5. Deduplicate by sequence, and advance the saved checkpoint only after all earlier sequences have completed successfully.
Do not wait for catchUp(...) to finish before opening live streams. Historical recovery and live consumption should run together so new events do not fall into a gap while you are catching up.

Concurrent Recovery

During recovery, put catch-up events and live events into one processing queue. The queue may process events concurrently, but checkpoint storage must advance in contiguous sequence order. This example opens message, chat, group, and poll live streams while catch-up is running:
const maxConcurrency = 8;
const catchup = im.events.catchUp(lastHandledSequence);
const realtimeStreams = [
  im.messages.subscribeEvents(),
  im.chats.subscribeEvents(),
  im.groups.subscribeEvents(),
  im.polls.subscribeEvents(),
];

let running = 0;
let savedSequence = lastHandledSequence ?? 0;
let checkpoint = Promise.resolve();
let processingError: unknown;
const queue: Array<() => Promise<void>> = [];
const seenSequences = new Set<number>();
const completedSequences = new Set<number>();

function scheduleEvent(event: { sequence: number }) {
  if (processingError) return;
  if (event.sequence <= savedSequence || seenSequences.has(event.sequence)) return;

  seenSequences.add(event.sequence);
  queue.push(async () => {
    await handleEvent(event);
    await markComplete(event.sequence);
  });

  void drainQueue();
}

async function markComplete(sequence: number) {
  completedSequences.add(sequence);

  checkpoint = checkpoint.then(async () => {
    while (completedSequences.has(savedSequence + 1)) {
      completedSequences.delete(savedSequence + 1);
      savedSequence += 1;
      await saveSequence(savedSequence);
    }
  });

  await checkpoint;
}

async function drainQueue() {
  while (running < maxConcurrency && queue.length > 0) {
    const task = queue.shift()!;
    running += 1;

    void task()
      .catch((error) => {
        processingError = error;
      })
      .finally(() => {
        running -= 1;
        void drainQueue();
      });
  }
}

for (const stream of realtimeStreams) {
  void (async () => {
    for await (const event of stream) {
      scheduleEvent(event);
    }
  })();
}

for await (const event of catchup) {
  if (processingError) throw processingError;
  if (event.type === "catchup.complete") break;
  scheduleEvent(event);
}
catchUp(...) returns TypedEventStream<CatchUpEvent>. If you omit lastHandledSequence, replay starts from the beginning of the log. lastHandledSequence must be a non-negative safe integer; invalid cursors throw ValidationError before the network call. The catch-up stream ends with catchup.complete:
{
  "type": "catchup.complete",  // Historical replay is complete
  "headSequence": 123          // Log head sequence when catch-up finished
}
Do not use catchup.complete to overwrite your checkpoint. It only means the historical replay is done. Your live streams are already open and continue receiving new events. seenSequences prevents processing the same event twice when it appears in both catch-up and live streams. completedSequences ensures the checkpoint only stores the largest sequence for which every earlier event has also completed.
The example uses maxConcurrency = 8. Your event handler must be safe to run concurrently. The checkpoint still advances only in contiguous sequence order. Do not save event.sequence before handleEvent(...) succeeds, and do not save a larger sequence just because it finished first. If processing fails, stop advancing the checkpoint so the process can recover from the original lastHandledSequence.
Events have the same shape in catch-up and live streams. Write method return values are still the authoritative result for the write your code just performed; event streams are for observing other changes and asynchronous state.

Stream Consumption

Every server-streaming SDK method returns TypedEventStream<T>. This includes subscribeEvents(...), catchUp(...), watch(...), and downloadStream(...). One stream instance has one consumer. Do not consume the same stream with both for await and .on(...). If you need multiple branches, derive them first with .filter(...), .map(...), or .take(...).

for await

try {
  for await (const event of im.messages.subscribeEvents({ chat: chat.guid })) {
    await handleEvent(event);
  }
} catch (error) {
  console.error("message stream failed:", error);
}
Breaking out of the for await loop closes the underlying stream.

await using

async function consume() {
  await using stream = im.messages.subscribeEvents({ chat: chat.guid });

  for await (const event of stream) {
    if (event.type === "message.received" && event.message.content.text === "stop") {
      return;
    }

    await handleEvent(event);
  }
}
await using closes the stream when the scope exits. Node.js 18.17 does not support await using natively; on the minimum supported runtime, use try / finally or break out of for await.

.on(...)

const stream = im.messages.subscribeEvents({ chat: chat.guid });

const stop = stream.on(
  async (event) => {
    await handleEvent(event);
  },
  (error) => {
    console.error(error);
  },
);

stop();
stop() cancels the loop and closes the stream. The callback may be async; the SDK waits for the current callback to finish before delivering the next event. If you omit onError, stream errors are thrown asynchronously and cannot be caught by an outer try / catch.

Derived Streams

.filter(...) keeps matching events:
const received = im.messages.subscribeEvents().filter((event) => event.type === "message.received");
.map(...) transforms events:
const messageGuids = im.messages
  .subscribeEvents()
  .filter((event) => event.type === "message.received")
  .map((event) => event.message.guid);
.take(n) emits the first n events, then closes the parent stream:
const firstReceived = im.messages
  .subscribeEvents({ chat: chat.guid })
  .filter((event) => event.type === "message.received")
  .take(1);

TypedEventStream

MemberPurpose
for await (const event of stream)Default consumer; claims exclusive consumption
.on(callback, onError?)Callback consumer; returns stop()
.filter(predicate)Derives a child stream with matching events
.map(transform)Derives a transformed child stream
.take(count)Derives the first count events and closes the parent when done
.close()Closes the underlying network request; safe to call more than once
await using stream = ...Closes the stream when the scope exits

Next Steps

  1. Messages — subscribe to message events
  2. Chats — subscribe to chat events
  3. Groups — subscribe to group events
  4. Polls — subscribe to poll events
  5. Error Handling — handle stream disconnects, retries, and idempotent writes