Using Ray for building agentic workflows

Using Ray for building agentic workflows

Friday, November 8, 2024

What is Ray and why did it become our favorite deployment solution for building agentic applications? 

Because MotleyCrew originally emerged as a multi-agent framework, its central concept was the crew - the orchestrating mechanism behind the team of agents. As we explored real-world agentic applications, we grew convinced that agent orchestration has no specificity to it. Our task, task unit, and task graph abstractions are useful in so many cases, yet there are many more where they are either inapplicable or redundant. Furthermore, in real applications, factors such as maintainability and scalability are way more crucial than ease of development and code prettiness. So, we looked at deployment solutions out there and Ray instantly became one of our favorites.

Ray adds the power of distributed computing to Python applications. It is not a solution tailored for AI apps, but rather something that's been around for years. It allows running tasks in parallel, across multiple processes or even machines (you can set up a distributed cluster with it!), and turn regular Python functions into tasks with a single decorator.

Ray also provides a way to declare computation graphs. This is done by binding the output of one task to the input of another:

@ray.remote
def first_task():
    ...

@ray.remote
def second_task(input):
    ...

first_task_ref = first_task.bind()
# the first task's result will be passed to the second task
second_task_ref = second_task.bind(first_task_ref)

ray.get(second_task_ref.execute())  # lazily execute the tasks

The execute() method is non-blocking and returns a future that can be awaited by ray.get(). When execute() is called, all the tasks needed to get the result are executed in topological order. In this case, first_task is executed first and its output is fed into second_task. The tasks are dispatched by Ray's scheduler according to the available resources.

This pattern is similar to MotleyCrew's task DAG, which sets the order in which the tasks depend on each other, except Ray runs them lazily instead of eagerly.

All this makes Ray's execution DAGs a great solution for running agentic workflows, naturally supporting parallel execution and batching.

To illustrate, let's replicate MotleyCrew's blog with images example using Ray. Here we have two tasks: one to write a blog post on a given subject and another to illustrate it.

Please check out the full code, or follow along the reduced version.

The first task is to write the post body using a writer agent and a helper researcher agent that searches the web. The topic for the post is provided as an argument.

@ray.remote
def produce_blog_post(topic: str):
    researcher = CrewAIMotleyAgent(
        role="Senior Research Analyst",
        goal="Uncover cutting-edge developments in various fields, doing web search if necessary",
        tools=[DuckDuckGoSearchRun()]
    )

    writer = ReActToolCallingMotleyAgent(
        name="AI writer agent",
        tools=[researcher]
    )

    prompt = "<long prompt here about writing a post on the given topic>"

    blog_post = writer.invoke({"prompt": prompt})
    return blog_post

The second task is to illustrate the post.

@ray.remote
def create_illustrations(blog_post: str):
    image_generator_tool = DallEImageGeneratorTool(...)

    illustrator = ReActToolCallingMotleyAgent(
        name="Illustrator",
        description="Create beautiful and insightful illustrations for a blog post",
        tools=[image_generator_tool],
    )

    prompt = "<long prompt here about illustrating the given post>"

    illustrated_blog_post = illustrator.invoke({"prompt": prompt})
    return illustrated_blog_post

Now let's form the tasks DAG. There's an abstraction called an input node, which can be used to pass inputs to tasks that need to be executed first. We'll pass the topic through it:

with InputNode() as topic:
    blog_post_ref = produce_blog_post.bind(topic=topic)  # Create writing node
illustrated_blog_post_ref = create_illustrations.bind(
        blog_post=blog_post_ref
    )  # Create illustration node

To execute the graph, all we need to do is call the execute() method on the reference object that we're interested in (in our case, the illustrated blog post). It is asynchronous, so we can do batch processing.

topics = ["AI", "neuroscience", "astrophysics"]
blog_post_refs = [illustrated_blog_post_ref.execute(topic) for topic in topics]  # Does not block
print("Dispatched posts creation")

blog_posts = ray.get(blog_post_refs)  # Wait for all the posts

Now we have 3 blog posts, composed in parallel seamlessly using the Ray computation graph!

 What's next

This post only scratched the surface of Ray's capabilities. Ray recently introduced a "workflows" feature, providing durability guarantees and conditional nesting. It even allows to implement dynamically expanding workflows, such as our research agent. See Ray docs on workflows for a more detailed description.

We'll dive into the workflows feature in one of our next posts. Stay tuned!

Search