LangGraph AI代理:加入持久化同串流功能,逐步教學

Ai

創建基於AI代理的系統:增加持久性和流媒體(逐步指南)

在我們之前的教程中,我們構建了一個能夠通過瀏覽網絡回答查詢的AI代理。然而,在構建用於長時間運行任務的代理時,有兩個關鍵概念需要考慮:持久性和流媒體。持久性允許你在任何給定時刻保存代理的狀態,使你能夠在未來的交互中從該狀態恢復,這對於長時間運行的應用至關重要。另一方面,流媒體可以讓你隨時發出關於代理正在做什麼的實時信號,從而提供透明度和控制權。在這個教程中,我們將通過增加這些強大的功能來增強我們的代理。

設置代理

讓我們開始重新創建我們的代理。我們將加載必要的環境變量,安裝並導入所需的庫,設置Tavily搜索工具,定義代理狀態,最後構建代理。

“`python
pip install langgraph==0.2.53 langgraph-checkpoint==2.0.6 langgraph-sdk==0.1.36 langchain-groq langchain-community langgraph-checkpoint-sqlite==2.0.1
“`

“`python
import os
os.environ[‘TAVILY_API_KEY’] = “”
os.environ[‘GROQ_API_KEY’] = “”

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_groq import ChatGroq
from langchain_community.tools.tavily_search import TavilySearchResults

tool = TavilySearchResults(max_results=2)

class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]

class Agent:
def __init__(self, model, tools, system=””):
self.system = system
graph = StateGraph(AgentState)
graph.add_node(“llm”, self.call_openai)
graph.add_node(“action”, self.take_action)
graph.add_conditional_edges(“llm”, self.exists_action, {True: “action”, False: END})
graph.add_edge(“action”, “llm”)
graph.set_entry_point(“llm”)
self.graph = graph.compile()
self.tools = {t.name: t for t in tools}
self.model = model.bind_tools(tools)

def call_openai(self, state: AgentState):
messages = state[‘messages’]
if self.system:
messages = [SystemMessage(content=self.system)] + messages
message = self.model.invoke(messages)
return {‘messages’: [message]}

def exists_action(self, state: AgentState):
result = state[‘messages’][-1]
return len(result.tool_calls) > 0

def take_action(self, state: AgentState):
tool_calls = state[‘messages’][-1].tool_calls
results = []
for t in tool_calls:
print(f”Calling: {t}”)
result = self.tools[t[‘name’]].invoke(t[‘args’])
results.append(ToolMessage(tool_call_id=t[‘id’], name=t[‘name’], content=str(result)))
print(“Back to the model!”)
return {‘messages’: results}
“`

增加持久性

為了增加持久性,我們將使用LangGraph的檢查點功能。檢查點在每個節點之後和之間保存代理的狀態。對於本教程,我們將使用SqliteSaver,這是一個利用SQLite的簡單檢查點。雖然我們將使用內存數據庫以簡化操作,但你可以輕鬆地將其連接到外部數據庫或使用其他檢查點,如Redis或Postgres,以獲得更穩健的持久性。

“`python
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
sqlite_conn = sqlite3.connect(“checkpoints.sqlite”, check_same_thread=False)
memory = SqliteSaver(sqlite_conn)
“`

接下來,我們將修改我們的代理以接受檢查點:

“`python
class Agent:
def __init__(self, model, tools, checkpointer, system=””):
# 其餘部分保持不變
self.graph = graph.compile(checkpointer=checkpointer)
# 其餘部分保持不變
“`

現在,我們可以創建一個啟用持久性的代理:

“`python
prompt = “””你是一個智能研究助手。使用搜索引擎查找信息。
你可以進行多次調用(無論是一起還是依次)。
只有在確定你想要什麼時,才可以查找信息。
如果你需要查找一些信息以便提出後續問題,你可以這樣做!
“””
model = ChatGroq(model=”Llama-3.3-70b-Specdec”)
bot = Agent(model, [tool], system=prompt, checkpointer=memory)
“`

增加流媒體

流媒體對於實時更新至關重要。我們將重點關注兩種類型的流媒體:

1. 流媒體消息:發出中間消息,例如AI決策和工具結果。

2. 流媒體令牌:從LLM的響應中流式傳輸單個令牌。

讓我們從流媒體消息開始。我們將創建一條人類消息並使用流方法觀察代理的行為。

“`python
messages = [HumanMessage(content=”德克薩斯州的天氣如何?”)]
thread = {“configurable”: {“thread_id”: “1”}}
for event in bot.graph.stream({“messages”: messages}, thread):
for v in event.values():
print(v[‘messages’])
“`

最終輸出:德克薩斯州的當前天氣是晴天,溫度為19.4°C(66.9°F),風速為4.3 mph(6.8 kph)…..

當你運行這個時,你會看到一系列結果。首先是一條AI消息,指示代理調用Tavily,然後是帶有搜索結果的工具消息,最後是一條回答問題的AI消息。

理解線程ID

thread_id是線程配置的重要部分。它允許代理與不同用戶或上下文保持獨立對話。通過為每個對話分配唯一的thread_id,代理可以同時跟踪多個交互而不會混淆。

例如,讓我們繼續對話,問:“洛杉磯的天氣怎麼樣?”使用相同的thread_id:

“`python
messages = [HumanMessage(content=”洛杉磯的天氣怎麼樣?”)]
thread = {“configurable”: {“thread_id”: “1”}}
for event in bot.graph.stream({“messages”: messages}, thread):
for v in event.values():
print(v)
“`

最終輸出:洛杉磯當前的天氣是晴天,溫度為17.2°C(63.0°F),風速為2.2 mph(3.6 kph) ….

代理推斷我們在詢問天氣,這得益於持久性。為了驗證,我們來問:“哪個更熱?”:

“`python
messages = [HumanMessage(content=”哪個更熱?”)]
thread = {“configurable”: {“thread_id”: “1”}}
for event in bot.graph.stream({“messages”: messages}, thread):
for v in event.values():
print(v)
“`

最終輸出:德克薩斯州比洛杉磯更熱。德克薩斯州的當前溫度為19.4°C(66.9°F),而洛杉磯的當前溫度為17.2°C(63.0°F)。

代理正確比較了德克薩斯州和洛杉磯的天氣。為了測試持久性是否保持對話的獨立性,讓我們用不同的thread_id問同樣的問題:

“`python
messages = [HumanMessage(content=”哪個更熱?”)]
thread = {“configurable”: {“thread_id”: “2”}}
for event in bot.graph.stream({“messages”: messages}, thread):
for v in event.values():
print(v)
“`

輸出:我需要更多信息來回答這個問題。你能提供更多的背景信息或具體說明你在比較哪兩樣東西嗎?

這次,代理感到困惑,因為它無法訪問之前對話的歷史記錄。

流媒體令牌

要流式傳輸令牌,我們將使用astream_events方法,這是異步的。我們還將切換到異步檢查點。

“`python
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

async with AsyncSqliteSaver.from_conn_string(“:memory:”) as checkpointer:
abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
messages = [HumanMessage(content=”舊金山的天氣如何?”)]
thread = {“configurable”: {“thread_id”: “4”}}
async for event in abot.graph.astream_events({“messages”: messages}, thread, version=”v1″):
kind = event[“event”]
if kind == “on_chat_model_stream”:
content = event[“data”][“chunk”].content
if content:
print(content, end=”|”)
“`

這將實時流式傳輸令牌,讓你即時了解代理的思考過程。

結論

通過增加持久性和流媒體,我們顯著增強了我們的AI代理的能力。持久性允許代理在交互中保持上下文,而流媒體則提供了對其行為的實時洞察。這些功能對於構建生產就緒的應用至關重要,特別是涉及多用戶或人機協作的交互。

在下一個教程中,我們將深入探討人機協作的互動,其中持久性在促進人類和AI代理之間的無縫協作中發揮著至關重要的作用。敬請期待!

這篇文章展示了AI代理在處理長期任務時的潛力,特別是在持久性和流媒體方面的應用。隨著技術的進步,這些功能將成為未來AI系統不可或缺的一部分。對於開發者而言,理解這些概念不僅有助於提升AI的實用性,更能在設計上考慮用戶體驗的流暢性。持久性不僅能夠保留上下文,還能促進對話的連貫性,而流媒體則使得用戶能夠實時獲取信息,這在許多應用中都是至關重要的。未來,這樣的技術將可能在教育、客戶服務等多個領域發揮更大的作用。

以上文章由特價GPT API KEY所翻譯及撰寫。而圖片則由FLUX根據內容自動生成。

Chat Icon