Agently Workflow

强大又易用的工作流编程方案

精心设计的工作流编程开发语法,让复杂的AI任务处理变得简单直观

"Talk is cheap, show me the code!"

—— 2024年7月,AI Agent工作流年

(Dr. Andrew Ng) 在他的AI Agentic Workflow中特别提到了Translation-Agent

Workflow 核心特性

相比 LangChain 和 LangGraph,Agently Workflow 更加直观易用

直观的工作流设计

精心设计的工作流编程语法,保障思路与代码完全一致

  • 链式语法设计,代码即流程图
  • 支持复杂的工作流逻辑设计
  • 可视化流程图自动生成
条件分支控制

灵活的条件判断和分支控制,替代复杂的 conditional_edges

  • if/elif/else 直观条件语法
  • 支持复杂的条件判断逻辑
  • 动态路由和分支选择
循环和并行处理

支持 .loop_with() 循环控制和并行分支处理

  • 简单易用的循环语法
  • 并行任务处理能力
  • 状态同步和数据合并
状态管理

完善的数据状态管理,支持 inputs、storage、public_storage

  • 多层次的状态存储机制
  • 跨工作流的数据传递
  • 状态持久化和恢复

与 LangGraph 对比

在能力毫不逊色的基础上,提供更直观的开发体验

功能特性LangGraphAgently Workflow优势
条件分支conditional_edgesif/elif/else更直观的语法
数据传递Send + conditional_edges.loop_with()更简洁的实现
状态管理statestorage更灵活的存储
全局状态Statepublic_storage跨流程共享

工作流示例

构建一个智能对话工作流

Python - Agently Workflow
import Agently

# 创建工作流实例
workflow = Agently.Workflow()

# 定义工作流节点
@workflow.chunk()
def user_input(inputs, storage):
    return input("[User]: ")

@workflow.chunk()
def assistant_reply(inputs, storage):
    chat_history = storage.get("chat_history") or []
    reply = (
        agent
        .chat_history(chat_history)
        .input(inputs["default"])
        .start()
    )
    print("[Assistant]: ", reply)
    return reply

@workflow.chunk()
def update_chat_history(inputs, storage):
    chat_history = storage.get("chat_history", [])
    chat_history.append({"role": "user", "content": inputs["user_input"]})
    chat_history.append({"role": "assistant", "content": inputs["assistant_reply"]})
    storage.set("chat_history", chat_history)
    return

# 连接工作流
workflow.connect_to("user_input")
(
    workflow.chunks["user_input"]
    .if_condition(lambda return_value, storage: return_value == "#exit")
        .connect_to("goodbye")
        .connect_to("end")
    .else_condition()
        .connect_to("assistant_reply")
)

workflow.chunks["user_input"].connect_to("update_chat_history.user_input")
workflow.chunks["assistant_reply"].connect_to("update_chat_history.assistant_reply")
workflow.chunks["update_chat_history"].connect_to("user_input")

# 启动工作流
result = workflow.start()

为什么选择 Agently Workflow?

90%

代码减少

相比传统工作流框架

5x

开发效率

更快的开发速度

100%

代码可读性

直观的流程表达

开始使用 Agently Workflow