Workflows
A Workflow
in LlamaIndexTS is an event-driven abstraction used to chain together several events. Workflows are made up of steps
, with each step responsible for handling certain event types and emitting new events.
Workflows in LlamaIndexTS work by defining step functions that handle specific event types and emit new events.
When a step function is added to a workflow, you need to specify the input and optionally the output event types (used for validation). The specification of the input events ensures each step only runs when an accepted event is ready.
You can create a Workflow
to do anything! Build an agent, a RAG flow, an extraction flow, or anything else you want.
Getting Started
As an illustrative example, let's consider a naive workflow where a joke is generated and then critiqued.
import {
StartEvent,
StopEvent,
Workflow,
WorkflowEvent,
} from "@llamaindex/workflow";
import { OpenAI } from "llamaindex";
// Create LLM instance
const llm = new OpenAI();
// Create a custom event type
export class JokeEvent extends WorkflowEvent<{ joke: string }> {}
const generateJoke = async (_: unknown, ev: StartEvent<string>) => {
const prompt = `Write your best joke about ${ev.data}.`;
const response = await llm.complete({ prompt });
return new JokeEvent({ joke: response.text });
};
const critiqueJoke = async (_: unknown, ev: JokeEvent) => {
const prompt = `Give a thorough critique of the following joke: ${ev.data.joke}`;
const response = await llm.complete({ prompt });
return new StopEvent(response.text);
};
const jokeFlow = new Workflow<unknown, string, string>();
jokeFlow.addStep(
{
inputs: [StartEvent<string>],
outputs: [JokeEvent],
},
generateJoke,
);
jokeFlow.addStep(
{
inputs: [JokeEvent],
outputs: [StopEvent<string>],
},
critiqueJoke,
);
// Usage
async function main() {
const result = await jokeFlow.run("pirates");
console.log(result.data);
}
main().catch(console.error);
There's a few moving pieces here, so let's go through this piece by piece.
Defining Workflow Events
export class JokeEvent extends WorkflowEvent<{ joke: string }> {}
Events are user-defined classes that extend WorkflowEvent
and contain arbitrary data provided as template argument. In this case, our workflow relies on a single user-defined event, the JokeEvent
with a joke
attribute of type string
.
Setting up the Workflow Class
const llm = new OpenAI();
...
const jokeFlow = new Workflow({ verbose: true });
Our workflow is implemented by initiating the Workflow
class. For simplicity, we created a OpenAI
llm instance.
Workflow Entry Points
const generateJoke = async (_context: Context, ev: StartEvent) => {
const prompt = `Write your best joke about ${ev.data.input}.`;
const response = await llm.complete({ prompt });
return new JokeEvent({ joke: response.text });
};
Here, we come to the entry-point of our workflow. While events are user-defined, there are two special-case events, the StartEvent
and the StopEvent
. Here, the StartEvent
signifies where to send the initial workflow input.
The StartEvent
is a bit of a special object since it can hold arbitrary attributes. Here, we accessed the topic with ev.data.input
.
At this point, you may have noticed that we haven't explicitly told the workflow what events are handled by which steps.
To do so, we use the addStep
method which adds a step to the workflow. The first argument is the event type that the step will handle, and the second argument is the previously defined step function:
jokeFlow.addStep(StartEvent, generateJoke);
Workflow Exit Points
const critiqueJoke = async (_context: Context, ev: JokeEvent) => {
const prompt = `Give a thorough critique of the following joke: ${ev.data.joke}`;
const response = await llm.complete({ prompt });
return new StopEvent({ result: response.text });
};
Here, we have our second, and last step, in the workflow. We know its the last step because the special StopEvent
is returned. When the workflow encounters a returned StopEvent
, it immediately stops the workflow and returns whatever the result was.
In this case, the result is a string, but it could be a map, array, or any other object.
Don't forget to add the step to the workflow:
jokeFlow.addStep(JokeEvent, critiqueJoke);
Running the Workflow
const result = await jokeFlow.run("pirates");
console.log(result.data.result);
Lastly, we run the workflow. The .run()
method is async, so we use await here to wait for the result.
Validating Workflows
To tell the workflow what events are produced by each step, you can optionally provide a third argument to addStep
to specify the output event type:
jokeFlow.addStep(StartEvent, generateJoke, { outputs: JokeEvent });
jokeFlow.addStep(JokeEvent, critiqueJoke, { outputs: StopEvent });
To validate a workflow, you need to call the validate
method:
jokeFlow.validate();
To automatically validate a workflow when you run it, you can set the validate
flag to true
at initialization:
const jokeFlow = new Workflow({ verbose: true, validate: true });
Working with Global Context/State
Optionally, you can choose to use global context between steps. For example, maybe multiple steps access the original query
input from the user. You can store this in global context so that every step has access.
import { Context } from "@llamaindex/core/workflow";
const query = async (context: Context, ev: MyEvent) => {
// get the query from the context
const query = context.get("query");
// do something with context and event
const val = ...
const result = ...
// store in context
context.set("key", val);
return new StopEvent({ result });
};
Waiting for Multiple Events
The context does more than just hold data, it also provides utilities to buffer and wait for multiple events.
For example, you might have a step that waits for a query and retrieved nodes before synthesizing a response:
const synthesize = async (context: Context, ev: QueryEvent | RetrieveEvent) => {
const events = context.collectEvents(ev, [QueryEvent | RetrieveEvent]);
if (!events) {
return;
}
const prompt = events
.map((event) => {
if (event instanceof QueryEvent) {
return `Answer this query using the context provided: ${event.data.query}`;
} else if (event instanceof RetrieveEvent) {
return `Context: ${event.data.context}`;
}
return "";
})
.join("\n");
const response = await llm.complete({ prompt });
return new StopEvent({ result: response.text });
};
Using ctx.collectEvents()
we can buffer and wait for ALL expected events to arrive. This function will only return events (in the requested order) once all events have arrived.
Manually Triggering Events
Normally, events are triggered by returning another event during a step. However, events can also be manually dispatched using the ctx.sendEvent(event)
method within a workflow.
Examples
You can find many useful examples of using workflows in the examples folder.