Skip to content

Two optional methods on the Workflow class are called once when the entire run reaches a terminal state. They are useful for sending a completion notification, recording a summary, or alerting on failure.

export class Workflow {
async start(data, headers, api) { ... }
async onWorkflowComplete(api) {
// Called once when all steps (including all fan-out branches) complete successfully
}
async onWorkflowError(err, api) {
// Called once when a step throws an uncaught error that terminates the run
}
}

Both hooks are optional. Omit either one if you do not need it.

Hooks receive a restricted api object. Only the following are available:

MethodAvailable
api.getOAuthToken()
api.google.*
console.log()
api.dedupe()
api.runStore.*
api.sendEmail()
api.getAttachment() / api.deleteAttachment()
api.scheduleNextStep()
api.sendToFlow()
api.waitForEvent()
api.csv.*

The restriction above applies to the api object only. Global fetch() is still available inside hooks, so you can call external APIs directly after obtaining any required token or secret.

onWorkflowComplete fires only when all branches finish

Section titled “onWorkflowComplete fires only when all branches finish”

For fan-out workflows, the platform tracks a branch counter. onWorkflowComplete is called only when the counter reaches zero, meaning every parallel branch has finished. It does not fire per-branch.

onWorkflowError is skipped if the failing step had already scheduled a next step

Section titled “onWorkflowError is skipped if the failing step had already scheduled a next step”

If a step calls api.scheduleNextStep() and then throws an uncaught error, onWorkflowError is not called. The scheduled step was already armed, so the run is considered still in progress, not failed.

onWorkflowError fires only for uncaught step failures

Section titled “onWorkflowError fires only for uncaught step failures”

If a step catches an error internally and returns normally, the run does not fail and onWorkflowError is not called. This includes retry logic that catches an error, schedules a retry, and returns.

If a second fan-out branch fails after the run has already been marked failed, the hook is not called again.

A hook that throws does not affect the run status

Section titled “A hook that throws does not affect the run status”

If your hook implementation throws, the error is caught and logged. The run’s final status (completed or failed) is unaffected.

Example: Slack notification on completion or failure

Section titled “Example: Slack notification on completion or failure”

This example uses fetch() plus api.getOAuthToken() because the pattern is explicit and works across any OAuth-connected service:

export class Workflow {
async start(data, headers, api) {
for (const item of data.line_items) {
await api.scheduleNextStep({
delay: 10,
action: 'processItem',
payload: { itemId: item.id },
});
}
}
async processItem({ itemId }, headers, api) {
await api.runStore.increment('processed');
// ... do work
}
async onWorkflowComplete(api) {
const processed = await api.runStore.get('processed') ?? 0;
const { token } = await api.getOAuthToken('my-slack');
await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${token}` },
body: JSON.stringify({
channel: '#ops',
text: `Run complete: ${processed} items processed.`,
}),
});
}
async onWorkflowError(err, api) {
const { token } = await api.getOAuthToken('my-slack');
await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${token}` },
body: JSON.stringify({
channel: '#ops',
text: `Workflow failed: ${err.message}`,
}),
});
}
}

Using api.runStore to pass data into hooks

Section titled “Using api.runStore to pass data into hooks”

api.runStore persists for the lifetime of a run and is accessible inside both hooks. Use it to collect per-step data (counters, error lists) and compile a final report in the hook.

async processItem({ itemId }, headers, api) {
try {
// ... process
await api.runStore.increment('succeeded');
} catch (err) {
await api.runStore.push('errors', { itemId, reason: err.message });
await api.runStore.increment('failed');
}
}
async onWorkflowComplete(api) {
const succeeded = await api.runStore.get('succeeded') ?? 0;
const failed = await api.runStore.get('failed') ?? 0;
const errors = await api.runStore.get('errors') ?? [];
console.log(`Done. ${succeeded} succeeded, ${failed} failed.`);
// send report, write to sheet, etc.
}