You can subscribe to many different events using our Stripe integration.
We’re going to run through the steps required to create the onPriceCreated event.
Here’s an example Job using it, you should create Jobs like this while you’re developing so you can test your trigger. See the integration testing guide for more info.
import { Stripe } from "@trigger.dev/stripe";const stripe = new Stripe({ id: "stripe", apiKey: process.env["STRIPE_API_KEY"]!,});client.defineJob({ id: "stripe-on-price", name: "Stripe On Price", version: "0.1.0", //this is what we're going to add in this guide trigger: stripe.onPriceCreated(), run: async (payload, io, ctx) => { //do stuff with the payload await io.logger.info("price created!", { current: payload.currency }); },});
Creating Tasks that list/create/delete/update webhooks
We need to create Tasks that will be used to register webhooks. These will be used in the ExternalSource.
These are created just like normal Tasks, which we’ve already covered. In this case we’re creating three tasks:
io.stripe.webhookEndpoints.create
io.stripe.webhookEndpoints.update
io.stripe.webhookEndpoints.delete
io.stripe.webhookEndpoints.list
//...import { WebhookEndpoints } from "./webhookEndpoints";export class Stripe implements TriggerIntegration { //... get webhookEndpoints() { return new WebhookEndpoints(this.runTask.bind(this)); } //...}
An event is something that happens in the external system. In this case, it’s a price being created. We use EventSpecifications to define events.
We have many events exported from this file, but here’s the one we care about:
integrations/stripe/events.ts
export const onPriceCreated: EventSpecification<OnPriceEvent> = { //this name matches the name of the event in the Stripe API name: "price.created", //used for display in the Dashboard UI title: "On Price Created", source: "stripe.com", icon: "stripe", //examples appear in the Testing UI in the Dashboard. They're optional but they make the DX really nice :) examples: [ { id: "recurring", name: "Recurring Price", icon: "stripe", payload: { id: "price_1NYV6vI0XSgju2urKsSmI53v", object: "price", //... }, }, ], //you can just cast the payload to the type you want here parsePayload: (payload) => payload as OnPriceEvent, //these properties are displayed in the Dashboard UI runProperties: (payload) => [{ label: "Price ID", text: payload.id }],};
We import all the events from the file above, and then we can add a trigger for the event we want.
integrations/stripe/index.ts
//... other imports//this imports all the events from the file above, as an Objectimport * as events from "./events";export class Stripe implements TriggerIntegration { //... //the source is used to register webhooks and to process the data when received get source() { return createWebhookEventSource(this); } //the actual trigger. This one has some configuration options (the optional params) onPriceCreated(params?: { connect?: boolean; filter?: EventFilter }) { return createTrigger(this.source, events.onPriceCreated, params ?? { connect: false }); }}//this creates a type that is the union of all the eventstype StripeEvents = (typeof events)[keyof typeof events];//this is the type of the trigger we're creatingtype CreateTriggersResult<TEventSpecification extends StripeEvents> = ExternalSourceTrigger< TEventSpecification, ReturnType<typeof createWebhookEventSource>>;function createTrigger<TEventSpecification extends StripeEvents>( source: ReturnType<typeof createWebhookEventSource>, event: TEventSpecification, params: TriggerParams): CreateTriggersResult<TEventSpecification> { return new ExternalSourceTrigger({ event, params, source, options: {}, });}
The ExternalSource is response for registering webhooks and processing the data when it’s received.
integrations/stripe/index.ts
//...everything we've already covered//this defines the shape of the data that Stripe returns when we create/update a webhookconst WebhookDataSchema = z.object({ id: z.string(), object: z.literal("webhook_endpoint"), api_version: z.string().nullable(), application: z.string().nullable(), created: z.number(), description: z.string().nullable(), enabled_events: z.array(z.string()), livemode: z.boolean(), metadata: z.record(z.string()), status: z.enum(["enabled", "disabled"]), url: z.string(),});function createWebhookEventSource( //the integration is used to register the webhook integration: Stripe // { connect?: boolean } comes through from the params in the ExternalSourceTrigger we defined above): ExternalSource<Stripe, { connect?: boolean }, "HTTP", {}> { return new ExternalSource("HTTP", { //this needs to be unique in Trigger.dev //there's only one stripe webhook endpoint (for all events), so we use "stripe.webhook" id: "stripe.webhook", //this is the schema for the params that come through from the ExternalSourceTrigger schema: z.object({ connect: z.boolean().optional() }), version: "0.1.0", integration, //if the key is the same then a webhook will be updated (with any new events added) //if the key is different then a new webhook will be created //in Stripe's case we can have webhooks with multiple events BUT not shared between connect and non-connect key: (params) => `stripe.webhook${params.connect ? ".connect" : ""}`, //the webookHandler is called when the webhook is received, this is the last step we'll cover handler: webhookHandler, //this function is called when the webhook is registered register: async (event, io, ctx) => { const { params, source: httpSource, options } = event; //httpSource.data is the stored data about the existing webhooks that has the same key //httpSource.data will be undefined if no webhook has been registered yet with the same key const webhookData = WebhookDataSchema.safeParse(httpSource.data); //this is the full list of events that we want to register (when we add more than just onPriceCreated) const allEvents = Array.from(new Set([...options.event.desired, ...options.event.missing])); const registeredOptions = { event: allEvents, }; //if there is already an active source (i.e. a webhook has been registered) //and httpSource.data was parsed successfully if (httpSource.active && webhookData.success) { //there are no missing events, so we don't need to update the webhook if (options.event.missing.length === 0) return; //we want to update the existing webhook with the new events //this uses the Task we created above const updatedWebhook = await io.integration.webhookEndpoints.update("update-webhook", { id: webhookData.data.id, url: httpSource.url, enabled_events: allEvents as unknown as WebhookEvents[], }); //when registering new events, we need to return the data and the options return { data: WebhookDataSchema.parse(updatedWebhook), options: registeredOptions, }; } //if there is no active source, or httpSource.data wasn't parsed successfully, //but we might be able to add events to an existing webhook const listResponse = await io.integration.webhookEndpoints.list("list-webhooks", { limit: 100, }); //if one of these webhooks has the URL we want, we can update it const existingWebhook = listResponse.data.find((w) => w.url === httpSource.url); if (existingWebhook) { //add all the events to the webhook const updatedWebhook = await io.integration.webhookEndpoints.update( "update-found-webhook", { id: existingWebhook.id, url: httpSource.url, enabled_events: allEvents as unknown as WebhookEvents[], disabled: false, } ); //return the data and the registered options return { data: WebhookDataSchema.parse(updatedWebhook), options: registeredOptions, }; } //there are no matching webhooks, so we need to create a new one const webhook = await io.integration.webhookEndpoints.create("create-webhook", { url: httpSource.url, enabled_events: allEvents as unknown as WebhookEvents[], connect: params.connect, }); //when creating a new webhook, we need to also return the secret that Stripe sends us //the secret is used to validate the webhook payloads we receive return { data: WebhookDataSchema.parse(webhook), secret: webhook.secret, options: registeredOptions, }; }, });}
When a webhook is received, we need to validate the signature, parse the payload, and return the events. Each event will turn into a payload that can trigger a Job run.
integrations/stripe/index.ts
//...everything we've already covered//this function is called when a webhook is receivedasync function webhookHandler(event: HandlerEvent<"HTTP">, logger: Logger) { logger.debug("[@trigger.dev/stripe] Handling webhook payload"); //The rawEvent is a Request, from this you can get the body, headers, etc. //The source has info on the ExternalSource that received the webhook const { rawEvent: request, source } = event; //no body means no events if (!request.body) { logger.debug("[@trigger.dev/stripe] No body found"); return { events: [] }; } //in Stripe's case, we need to get the text from the request, and we'll use their SDK to get an event const rawBody = await request.text(); //it's important to verify webhooks payloads because anyone can send data to a URL const signature = request.headers.get("stripe-signature"); if (signature) { //The Stripe SDK is used to validate the signature const stripeClient = new StripeClient("", { apiVersion: "2022-11-15" }); try { //this will throw an error if the signature is invalid const event = stripeClient.webhooks.constructEvent(rawBody, signature, source.secret); return { //move than one event can be returned, but for most APIs it will just be one events: [ { id: event.id, payload: event.data.object, source: "stripe.com", //this name should match the EventSpecification name name: event.type, timestamp: new Date(event.created * 1000), //the context can be any format, it will be passed to the Job run's context.source context: { apiVersion: event.api_version, livemode: event.livemode, request: event.request, previousAttributes: event.data.previous_attributes, }, }, ], //you can also return a response, which will be sent back to the webhook sender //but this is optional, we return a 200 which is normally the requirement //response: { // status: 200, // body: "ok", // headers: { // "Content-Type": "text/plain", // }, //}, //metadata can be any format, is stored and can be used in the next webhookHandler call // metadata: { // requestId: event.request, // } }; } catch (error) { if (error instanceof Error) { logger.error("[@trigger.dev/stripe] Error while validating webhook signature", { error: { name: error.name, message: error.message }, }); } else { logger.error("[@trigger.dev/stripe] Unknown Error while validating webhook signature"); } return { events: [] }; } } //if there's no signature, we can't validate the payload return { events: [], };}