应用的 AI 功能核心是生成模型请求,但很少能仅仅通过获取用户输入、传递给模型并直接将模型输出返回给用户来完成任务。通常,模型调用还需要配合预处理和后处理步骤。例如:
- 检索要随模型调用一起发送的上下文信息。
- 检索用户当前会话的历史记录,例如在聊天应用中。
- 使用一个模型以适合传递给其他模型的方式重新格式化用户输入。
- 在向用户呈现模型输出结果之前,评估其“安全性”。
- 合并多个模型的输出。
此工作流的每个步骤都必须协同工作,才能成功完成任何与 AI 相关的任务。
在 Genkit 中,您可以使用称为 flow 的构造来表示这种紧密关联的逻辑。Flow 的编写方式与函数相同,使用普通的 Go 代码,但它们扩展了一些旨在简化 AI 功能开发的能力:
- 类型安全:输入和输出架构,可提供静态和运行时类型检查。
- 与开发者界面集成:使用开发者界面,独立于应用代码调试 flow。在开发者界面中,您可以运行 flow 并查看 flow 每个步骤的跟踪记录。
- 简化部署:使用任何可托管 Web 应用的平台,将 flow 直接部署为 Web API 端点。
Genkit 的 flow 轻量、无侵入性,不会强制您的应用遵循任何特定的抽象模型。Flow 的所有逻辑均使用标准 Go 编写,且其中的代码无需感知或特意处理 flow 的上下文。
定义和调用 flow
形式最简单的 flow 仅封装一个函数。以下示例封装了一个调用 GenerateData()
的函数:
menuSuggestionFlow := genkit.DefineFlow(g, "menuSuggestionFlow",
func(ctx context.Context, theme string) (string, error) {
resp, err := genkit.GenerateData(ctx, g,
ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
)
if err != nil {
return "", err
}
return resp.Text(), nil
})
只需像这样封装 genkit.Generate()
调用,您即可获得额外功能支持:这样一来,您就可以从 Genkit CLI 和开发者界面运行该 flow,并且这是 Genkit 的多项功能(包括部署和可观测性)的要求(后续部分将讨论这些主题)。
输入和输出架构
与直接调用模型 API 相比,Genkit flow 的一个重要优势是输入和输出均具有类型安全性。在定义 flow 时,您可以定义架构,方法与定义 genkit.Generate()
调用的输出架构非常类似;不过,与 genkit.Generate()
不同,您还可以指定输入架构。
下面是对上一个示例的细化,它定义了一个 flow,该 flow 以字符串作为输入并输出对象:
type MenuItem struct {
Name string `json:"name"`
Description string `json:"description"`
}
menuSuggestionFlow := genkit.DefineFlow(g, "menuSuggestionFlow",
func(ctx context.Context, theme string) (MenuItem, error) {
return genkit.GenerateData[MenuItem](ctx, g,
ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
)
})
请注意,flow 的架构不一定与 flow 中 genkit.Generate()
调用的架构完全一致(事实上,flow 甚至可以完全不包含 genkit.Generate()
调用)。下面是该示例的变体,它会将架构传递给 genkit.Generate()
,但使用结构化输出来设置 flow 返回的简单字符串的格式。
type MenuItem struct {
Name string `json:"name"`
Description string `json:"description"`
}
menuSuggestionMarkdownFlow := genkit.DefineFlow(g, "menuSuggestionMarkdownFlow",
func(ctx context.Context, theme string) (string, error) {
item, _, err := genkit.GenerateData[MenuItem](ctx, g,
ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
)
if err != nil {
return "", err
}
return fmt.Sprintf("**%s**: %s", item.Name, item.Description), nil
})
调用 flow
定义 flow 后,您可以从 Go 代码调用该 flow:
item, err := menuSuggestionFlow.Run(ctx, "bistro")
Flow 的参数必须符合输入架构。
如果您定义了输出架构,则 flow 回答将符合该架构。例如,如果您将输出架构设置为 MenuItem
,flow 输出将包含其属性:
item, err := menuSuggestionFlow.Run(ctx, "bistro")
if err != nil {
log.Fatal(err)
}
log.Println(item.DishName)
log.Println(item.Description)
流式传输 flow
Flow 支持使用类似于 genkit.Generate()
的流式传输接口进行流式传输。当 flow 生成大量输出时,流式传输非常有用,因为可以将生成的输出内容实时呈现给用户,从而提升应用的感知响应能力。一个熟悉的示例是,基于聊天的 LLM 接口通常会在生成回答时将其流式传输给用户。
下面是一个支持流式传输的 flow 示例:
type Menu struct {
Theme string `json:"theme"`
Items []MenuItem `json:"items"`
}
type MenuItem struct {
Name string `json:"name"`
Description string `json:"description"`
}
menuSuggestionFlow := genkit.DefineStreamingFlow(g, "menuSuggestionFlow",
func(ctx context.Context, theme string, callback core.StreamCallback[string]) (Menu, error) {
item, _, err := genkit.GenerateData[MenuItem](ctx, g,
ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
ai.WithStreaming(func(ctx context.Context, chunk *ai.ModelResponseChunk) error {
// Here, you could process the chunk in some way before sending it to
// the output stream using StreamCallback. In this example, we output
// the text of the chunk, unmodified.
return callback(ctx, chunk.Text())
}),
)
if err != nil {
return nil, err
}
return Menu{
Theme: theme,
Items: []MenuItem{item},
}, nil
})
StreamCallback[string]
中的 string
类型用于指定 flow 数据流的值类型。这不一定需要与返回值类型相同——返回值类型是 flow 的完整输出类型(本示例中为 Menu
)。
在此示例中,由 flow 流式传输的值直接与 flow 内 genkit.Generate()
调用流式传输的值相关联。虽然通常如此,但并非必须:您可以根据 flow 的需要,通过回调将值多次输出至数据流。
调用流式传输 flow
流式传输 flow 既可以像非流式传输 flow 一样使用 menuSuggestionFlow.Run(ctx, "bistro")
运行,也可以采用流式方式执行。
streamCh, err := menuSuggestionFlow.Stream(ctx, "bistro")
if err != nil {
log.Fatal(err)
}
for result := range streamCh {
if result.Err != nil {
log.Fatal("Stream error: %v", result.Err)
}
if result.Done {
log.Printf("Menu with %s theme:\n", result.Output.Theme)
for item := range result.Output.Items {
log.Println(" - %s: %s", item.Name, item.Description)
}
} else {
log.Println("Stream chunk:", result.Stream)
}
}
通过命令行运行 flow
您可以使用 Genkit CLI 工具从命令行运行 flow:
genkit flow:run menuSuggestionFlow '"French"'
对于流式传输 flow,您可以通过添加 -s
标志,将流式传输输出打印到控制台:
genkit flow:run menuSuggestionFlow '"French"' -s
通过命令行运行 flow 非常适合用于 flow 测试,或执行临时任务的 flow,例如运行 flow 以将文档注入向量数据库。
调试 flow
将 AI 逻辑封装在 flow 中的一个优势是,您可以使用 Genkit 开发者界面独立于应用之外测试和调试 flow。
如需启动开发者界面,请在项目目录中运行以下命令:
genkit start -- go run .
在开发者界面的运行标签页中,您可以运行项目中定义的任何 flow:
运行 flow 后,要检查 flow 调用的跟踪记录,您可以点击查看跟踪记录,或查看检查标签页。
部署 flow
您可以直接将 flow 部署为 Web API 端点,以便从应用客户端进行调用。有关部署的详细信息,请参阅其他页面;本部分仅简要介绍可用的部署选项。
net/http
服务器
如需使用任何 Go 托管平台(例如 Cloud Run)部署 Flow,请使用 DefineFlow()
定义您的 Flow,并使用提供的 Flow 处理程序启动 net/http
服务器:
import (
"context"
"log"
"net/http"
"github.com/firebase/genkit/go/genkit"
"github.com/firebase/genkit/go/plugins/googlegenai"
"github.com/firebase/genkit/go/plugins/server"
)
func main() {
ctx := context.Background()
g, err := genkit.Init(ctx, genkit.WithPlugins(&googlegenai.GoogleAI{}))
if err != nil {
log.Fatal(err)
}
menuSuggestionFlow := genkit.DefineFlow(g, "menuSuggestionFlow",
func(ctx context.Context, theme string) (MenuItem, error) {
// Flow implementation...
})
mux := http.NewServeMux()
mux.HandleFunc("POST /menuSuggestionFlow", genkit.Handler(menuSuggestionFlow))
log.Fatal(server.Start(ctx, "127.0.0.1:3400", mux))
}
server.Start()
是一个可选的辅助函数,用于启动服务器并管理其生命周期(包括处理中断信号,以便于本地开发),但您也可以采用自定义方式实现。
如需提供代码库中定义的所有 flow,您可以使用 ListFlows()
:
mux := http.NewServeMux()
for _, flow := range genkit.ListFlows(g) {
mux.HandleFunc("POST /"+flow.Name(), genkit.Handler(flow))
}
log.Fatal(server.Start(ctx, "127.0.0.1:3400", mux))
您可以使用 POST 请求调用 flow 端点,如下所示:
curl -X POST "http://localhost:3400/menuSuggestionFlow" \
-H "Content-Type: application/json" -d '{"data": "banana"}'
其他服务器框架
您还可以使用其他服务器框架来部署 flow。例如,您只需几行代码即可使用 Gin:
router := gin.Default()
for _, flow := range genkit.ListFlows(g) {
router.POST("/"+flow.Name(), func(c *gin.Context) {
genkit.Handler(flow)(c.Writer, c.Request)
})
}
log.Fatal(router.Run(":3400"))
如需了解如何部署到特定平台,请参阅使用 Cloud Run 的 Genkit。