Key Features
This example demonstrates how to implement custom retry logic when using third-party services in your Upstash Workflow.
We’ll use OpenAI as an example for such a third-party service. Our retry logic uses response status codes and headers to control when to retry, sleep, or store the third-party API response.
Code Example
The following code:
- Attempts to make an API call up to 10 times.
- Dynamically adjusts request delays based on response headers or status.
- Stores successful responses asynchronously.
import { serve } from "@upstash/workflow/nextjs"
import { convertToSeconds, storeResponse } from "@/lib/utils"
const OPENAI_CONFIG = {
url: "https://api.openai.com/v1/chat/completions",
model: "gpt-4",
maxTokens: 150,
maxRetries: 10,
baseDelay: 10,
}
const createSystemMessage = () => ({
role: "system",
content: "You are an AI assistant providing a brief summary and key insights for any given data.",
})
const createUserMessage = (data: string) => ({
role: "user",
content: `Analyze this data chunk: ${data}`,
})
export const { POST } = serve<{ userData: string }>(async (workflow) => {
const { userData } = workflow.requestPayload
for (let attempt = 0; attempt < 10; attempt++) {
const response = await workflow.call(`call-openai`, {
url: OPENAI_CONFIG.url,
method: "POST",
headers: {
authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: {
model: OPENAI_CONFIG.model,
messages: [createSystemMessage(), createUserMessage(userData)],
max_tokens: OPENAI_CONFIG.maxTokens,
},
})
if (response.status < 300) {
await workflow.run("store-response-in-db", () => storeResponse(response.body))
return
}
if (response.status === 429) {
const resetTime =
response.header["x-ratelimit-reset-tokens"]?.[0] ||
response.header["x-ratelimit-reset-requests"]?.[0] ||
OPENAI_CONFIG.baseDelay
await workflow.sleep("sleep-until-retry", Number(resetTime))
continue
}
await workflow.sleep("pause-to-avoid-spam", 5)
}
})
Code Breakdown
1. Setting up our Workflow
This POST endpoint serves our workflow. We create a loop to attempt the API call (we’re about to write) up to 10 times.
export const { POST } = serve<{ userData: string }>(async (workflow) => {
for (let attempt = 0; attempt < 10; attempt++) {
}
})
2. Making a Third-Party API Call
We use workflow.call
to send a request to any third-party API, for example OpenAI.
Using workflow.call
to request data from an API is one of the most powerful Upstash Workflow
features. Your request can take much longer than any function timeout would normally allow,
completely bypassing any platform-specific timeout limits.
Our request to OpenAI includes an auth header, model parameters, and the data to be processed by the AI. The response from this function call (response
) is used to determine our retry logic based on its status code and headers.
const response = await workflow.call(`call-openai`, {
url: OPENAI_CONFIG.url,
method: "POST",
headers: {
authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: {
model: OPENAI_CONFIG.model,
messages: [createSystemMessage(), createUserMessage(userData)],
max_tokens: OPENAI_CONFIG.maxTokens,
},
})
3. Processing a Successful Response (Status Code < 300)
If the OpenAI response is successful (status code under 300), we store the response in our database. We create a new workflow task (workflow.run
) to do this for maximum reliability.
if (response.status < 300) {
await workflow.run("store-response-in-db", () => storeResponse(response.body))
return
}
4. Handling Rate Limits (Status Code 429)
If the API response indicates a rate limit error (status code 429), we retrieve our rate limit reset values from the response headers. We calculate the time until the rate limit resets and then pause execution (workflow.sleep
) for this duration.
if (response.status === 429) {
const resetTime =
response.header["x-ratelimit-reset-tokens"]?.[0] ||
response.header["x-ratelimit-reset-requests"]?.[0] ||
OPENAI_CONFIG.baseDelay
await workflow.sleep("sleep-until-retry", Number(resetTime))
continue
}
5. Waiting Before the Next Retry Attempt
To avoid making too many requests in a short period and possibly overloading the OpenAI API, we pause our workflow before the next retry attempt (i.e., 5 seconds), regardless of rate limits.
await workflow.sleep("pause-to-avoid-spam", 5)