> ## Documentation Index
> Fetch the complete documentation index at: https://docs.photon.codes/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Events

> Recover missed events after disconnects and consume SDK streams

The server keeps a durable event log for message, chat, group, and poll changes. Every event has an increasing `sequence`.

Location updates are not in this log. They are delivered only through [`im.locations.watch(...)`](/advanced-kits/imessage/locations). Updates missed while disconnected cannot be recovered with `catchUp(...)`.

## What You Can Do

| Need                  | Use this when                                                                           |
| --------------------- | --------------------------------------------------------------------------------------- |
| Recover missed events | Your process starts or reconnects and needs missed message, chat, group, or poll events |
| Store a checkpoint    | You need to resume from the last successfully processed event                           |
| Consume live streams  | You want to read SDK streams with `for await` or `.on(...)`                             |
| Derive streams        | You want to split streams with `.filter(...)`, `.map(...)`, or `.take(...)`             |

## Recovery Flow

In production, use this order:

1. Read `lastHandledSequence` from your database.
2. Immediately open the live `subscribeEvents(...)` streams you need.
3. At the same time, call `im.events.catchUp(lastHandledSequence)` to replay missed events.
4. Feed live events and catch-up events into the same bounded-concurrency queue.
5. Deduplicate by `sequence`, and advance the saved checkpoint only after all earlier sequences have completed successfully.

Do not wait for `catchUp(...)` to finish before opening live streams. Historical recovery and live consumption should run together so new events do not fall into a gap while you are catching up.

## Concurrent Recovery

During recovery, put catch-up events and live events into one processing queue. The queue may process events concurrently, but checkpoint storage must advance in contiguous sequence order.

This example opens message, chat, group, and poll live streams while catch-up is running:

```ts theme={null}
const maxConcurrency = 8;
const catchup = im.events.catchUp(lastHandledSequence);
const realtimeStreams = [
  im.messages.subscribeEvents(),
  im.chats.subscribeEvents(),
  im.groups.subscribeEvents(),
  im.polls.subscribeEvents(),
];

let running = 0;
let savedSequence = lastHandledSequence ?? 0;
let checkpoint = Promise.resolve();
let processingError: unknown;
const queue: Array<() => Promise<void>> = [];
const seenSequences = new Set<number>();
const completedSequences = new Set<number>();

function scheduleEvent(event: { sequence: number }) {
  if (processingError) return;
  if (event.sequence <= savedSequence || seenSequences.has(event.sequence)) return;

  seenSequences.add(event.sequence);
  queue.push(async () => {
    await handleEvent(event);
    await markComplete(event.sequence);
  });

  void drainQueue();
}

async function markComplete(sequence: number) {
  completedSequences.add(sequence);

  checkpoint = checkpoint.then(async () => {
    while (completedSequences.has(savedSequence + 1)) {
      completedSequences.delete(savedSequence + 1);
      savedSequence += 1;
      await saveSequence(savedSequence);
    }
  });

  await checkpoint;
}

async function drainQueue() {
  while (running < maxConcurrency && queue.length > 0) {
    const task = queue.shift()!;
    running += 1;

    void task()
      .catch((error) => {
        processingError = error;
      })
      .finally(() => {
        running -= 1;
        void drainQueue();
      });
  }
}

for (const stream of realtimeStreams) {
  void (async () => {
    for await (const event of stream) {
      scheduleEvent(event);
    }
  })();
}

for await (const event of catchup) {
  if (processingError) throw processingError;
  if (event.type === "catchup.complete") break;
  scheduleEvent(event);
}
```

`catchUp(...)` returns `TypedEventStream<CatchUpEvent>`. If you omit `lastHandledSequence`, replay starts from the beginning of the log. `lastHandledSequence` must be a non-negative safe integer; invalid cursors throw `ValidationError` before the network call.

The catch-up stream ends with `catchup.complete`:

```jsonc theme={null}
{
  "type": "catchup.complete",  // Historical replay is complete
  "headSequence": 123          // Log head sequence when catch-up finished
}
```

Do not use `catchup.complete` to overwrite your checkpoint. It only means the historical replay is done. Your live streams are already open and continue receiving new events.

`seenSequences` prevents processing the same event twice when it appears in both catch-up and live streams. `completedSequences` ensures the checkpoint only stores the largest `sequence` for which every earlier event has also completed.

<Warning>
  The example uses `maxConcurrency = 8`. Your event handler must be safe to run concurrently. The checkpoint still advances only in contiguous sequence order. Do not save `event.sequence` before `handleEvent(...)` succeeds, and do not save a larger sequence just because it finished first. If processing fails, stop advancing the checkpoint so the process can recover from the original `lastHandledSequence`.
</Warning>

Events have the same shape in catch-up and live streams. Write method return values are still the authoritative result for the write your code just performed; event streams are for observing other changes and asynchronous state.

## Stream Consumption

Every server-streaming SDK method returns `TypedEventStream<T>`. This includes `subscribeEvents(...)`, `catchUp(...)`, `watch(...)`, and `downloadStream(...)`.

One stream instance has one consumer. Do not consume the same `stream` with both `for await` and `.on(...)`. If you need multiple branches, derive them first with `.filter(...)`, `.map(...)`, or `.take(...)`.

### `for await`

```ts theme={null}
try {
  for await (const event of im.messages.subscribeEvents({ chat: chat.guid })) {
    await handleEvent(event);
  }
} catch (error) {
  console.error("message stream failed:", error);
}
```

Breaking out of the `for await` loop closes the underlying stream.

### `await using`

```ts theme={null}
async function consume() {
  await using stream = im.messages.subscribeEvents({ chat: chat.guid });

  for await (const event of stream) {
    if (event.type === "message.received" && event.message.content.text === "stop") {
      return;
    }

    await handleEvent(event);
  }
}
```

`await using` closes the stream when the scope exits. Node.js 18.17 does not support `await using` natively; on the minimum supported runtime, use `try` / `finally` or break out of `for await`.

### `.on(...)`

```ts theme={null}
const stream = im.messages.subscribeEvents({ chat: chat.guid });

const stop = stream.on(
  async (event) => {
    await handleEvent(event);
  },
  (error) => {
    console.error(error);
  },
);

stop();
```

`stop()` cancels the loop and closes the stream. The callback may be async; the SDK waits for the current callback to finish before delivering the next event. If you omit `onError`, stream errors are thrown asynchronously and cannot be caught by an outer `try` / `catch`.

### Derived Streams

`.filter(...)` keeps matching events:

```ts theme={null}
const received = im.messages.subscribeEvents().filter((event) => event.type === "message.received");
```

`.map(...)` transforms events:

```ts theme={null}
const messageGuids = im.messages
  .subscribeEvents()
  .filter((event) => event.type === "message.received")
  .map((event) => event.message.guid);
```

`.take(n)` emits the first `n` events, then closes the parent stream:

```ts theme={null}
const firstReceived = im.messages
  .subscribeEvents({ chat: chat.guid })
  .filter((event) => event.type === "message.received")
  .take(1);
```

## TypedEventStream

| Member                              | Purpose                                                            |
| ----------------------------------- | ------------------------------------------------------------------ |
| `for await (const event of stream)` | Default consumer; claims exclusive consumption                     |
| `.on(callback, onError?)`           | Callback consumer; returns `stop()`                                |
| `.filter(predicate)`                | Derives a child stream with matching events                        |
| `.map(transform)`                   | Derives a transformed child stream                                 |
| `.take(count)`                      | Derives the first `count` events and closes the parent when done   |
| `.close()`                          | Closes the underlying network request; safe to call more than once |
| `await using stream = ...`          | Closes the stream when the scope exits                             |

## Next Steps

1. [Messages](/advanced-kits/imessage/messages) — subscribe to message events
2. [Chats](/advanced-kits/imessage/chats) — subscribe to chat events
3. [Groups](/advanced-kits/imessage/groups) — subscribe to group events
4. [Polls](/advanced-kits/imessage/polls) — subscribe to poll events
5. [Error Handling](/advanced-kits/imessage/error-handling) — handle stream disconnects, retries, and idempotent writes
