定义 AI 工作流

应用的 AI 功能核心是生成模型请求,但很少能仅仅通过获取用户输入、传递给模型并直接将模型输出返回给用户来完成任务。通常,模型调用还需要配合预处理和后处理步骤。例如:

  • 检索要随模型调用一起发送的上下文信息。
  • 检索用户当前会话的历史记录,例如在聊天应用中。
  • 使用一个模型以适合传递给其他模型的方式重新格式化用户输入。
  • 在向用户呈现模型输出结果之前,评估其“安全性”。
  • 合并多个模型的输出。

此工作流的每个步骤都必须协同工作,才能成功完成任何与 AI 相关的任务。

在 Genkit 中,您可以使用称为 flow 的构造来表示这种紧密关联的逻辑。Flow 的编写方式与函数相同,使用普通的 Go 代码,但它们扩展了一些旨在简化 AI 功能开发的能力:

  • 类型安全:输入和输出架构,可提供静态和运行时类型检查。
  • 与开发者界面集成:使用开发者界面,独立于应用代码调试 flow。在开发者界面中,您可以运行 flow 并查看 flow 每个步骤的跟踪记录。
  • 简化部署:使用任何可托管 Web 应用的平台,将 flow 直接部署为 Web API 端点。

Genkit 的 flow 轻量、无侵入性,不会强制您的应用遵循任何特定的抽象模型。Flow 的所有逻辑均使用标准 Go 编写,且其中的代码无需感知或特意处理 flow 的上下文。

定义和调用 flow

形式最简单的 flow 仅封装一个函数。以下示例封装了一个调用 GenerateData() 的函数:

menuSuggestionFlow := genkit.DefineFlow(g, "menuSuggestionFlow",
    func(ctx context.Context, theme string) (string, error) {
        resp, err := genkit.GenerateData(ctx, g,
            ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
        )
        if err != nil {
            return "", err
        }

        return resp.Text(), nil
    })

只需像这样封装 genkit.Generate() 调用,您即可获得额外功能支持:这样一来,您就可以从 Genkit CLI 和开发者界面运行该 flow,并且这是 Genkit 的多项功能(包括部署和可观测性)的要求(后续部分将讨论这些主题)。

输入和输出架构

与直接调用模型 API 相比,Genkit flow 的一个重要优势是输入和输出均具有类型安全性。在定义 flow 时,您可以定义架构,方法与定义 genkit.Generate() 调用的输出架构非常类似;不过,与 genkit.Generate() 不同,您还可以指定输入架构。

下面是对上一个示例的细化,它定义了一个 flow,该 flow 以字符串作为输入并输出对象:

type MenuItem struct {
    Name        string `json:"name"`
    Description string `json:"description"`
}

menuSuggestionFlow := genkit.DefineFlow(g, "menuSuggestionFlow",
    func(ctx context.Context, theme string) (MenuItem, error) {
        return genkit.GenerateData[MenuItem](ctx, g,
            ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
        )
    })

请注意,flow 的架构不一定与 flow 中 genkit.Generate() 调用的架构完全一致(事实上,flow 甚至可以完全不包含 genkit.Generate() 调用)。下面是该示例的变体,它会将架构传递给 genkit.Generate(),但使用结构化输出来设置 flow 返回的简单字符串的格式。

type MenuItem struct {
    Name        string `json:"name"`
    Description string `json:"description"`
}

menuSuggestionMarkdownFlow := genkit.DefineFlow(g, "menuSuggestionMarkdownFlow",
    func(ctx context.Context, theme string) (string, error) {
        item, _, err := genkit.GenerateData[MenuItem](ctx, g,
            ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
        )
        if err != nil {
            return "", err
        }

        return fmt.Sprintf("**%s**: %s", item.Name, item.Description), nil
    })

调用 flow

定义 flow 后,您可以从 Go 代码调用该 flow:

item, err := menuSuggestionFlow.Run(ctx, "bistro")

Flow 的参数必须符合输入架构。

如果您定义了输出架构,则 flow 回答将符合该架构。例如,如果您将输出架构设置为 MenuItem,flow 输出将包含其属性:

item, err := menuSuggestionFlow.Run(ctx, "bistro")
if err != nil {
    log.Fatal(err)
}

log.Println(item.DishName)
log.Println(item.Description)

流式传输 flow

Flow 支持使用类似于 genkit.Generate() 的流式传输接口进行流式传输。当 flow 生成大量输出时,流式传输非常有用,因为可以将生成的输出内容实时呈现给用户,从而提升应用的感知响应能力。一个熟悉的示例是,基于聊天的 LLM 接口通常会在生成回答时将其流式传输给用户。

下面是一个支持流式传输的 flow 示例:

type Menu struct {
    Theme  string     `json:"theme"`
    Items  []MenuItem `json:"items"`
}

type MenuItem struct {
    Name        string `json:"name"`
    Description string `json:"description"`
}

menuSuggestionFlow := genkit.DefineStreamingFlow(g, "menuSuggestionFlow",
    func(ctx context.Context, theme string, callback core.StreamCallback[string]) (Menu, error) {
        item, _, err := genkit.GenerateData[MenuItem](ctx, g,
            ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
            ai.WithStreaming(func(ctx context.Context, chunk *ai.ModelResponseChunk) error {
                // Here, you could process the chunk in some way before sending it to
                // the output stream using StreamCallback. In this example, we output
                // the text of the chunk, unmodified.
                return callback(ctx, chunk.Text())
            }),
        )
        if err != nil {
            return nil, err
        }

        return Menu{
            Theme: theme,
            Items: []MenuItem{item},
        }, nil
    })

StreamCallback[string] 中的 string 类型用于指定 flow 数据流的值类型。这不一定需要与返回值类型相同——返回值类型是 flow 的完整输出类型(本示例中为 Menu)。

在此示例中,由 flow 流式传输的值直接与 flow 内 genkit.Generate() 调用流式传输的值相关联。虽然通常如此,但并非必须:您可以根据 flow 的需要,通过回调将值多次输出至数据流。

调用流式传输 flow

流式传输 flow 既可以像非流式传输 flow 一样使用 menuSuggestionFlow.Run(ctx, "bistro") 运行,也可以采用流式方式执行。

streamCh, err := menuSuggestionFlow.Stream(ctx, "bistro")
if err != nil {
    log.Fatal(err)
}

for result := range streamCh {
    if result.Err != nil {
        log.Fatal("Stream error: %v", result.Err)
    }
    if result.Done {
        log.Printf("Menu with %s theme:\n", result.Output.Theme)
        for item := range result.Output.Items {
            log.Println(" - %s: %s", item.Name, item.Description)
        }
    } else {
        log.Println("Stream chunk:", result.Stream)
    }
}

通过命令行运行 flow

您可以使用 Genkit CLI 工具从命令行运行 flow:

genkit flow:run menuSuggestionFlow '"French"'

对于流式传输 flow,您可以通过添加 -s 标志,将流式传输输出打印到控制台:

genkit flow:run menuSuggestionFlow '"French"' -s

通过命令行运行 flow 非常适合用于 flow 测试,或执行临时任务的 flow,例如运行 flow 以将文档注入向量数据库。

调试 flow

将 AI 逻辑封装在 flow 中的一个优势是,您可以使用 Genkit 开发者界面独立于应用之外测试和调试 flow。

如需启动开发者界面,请在项目目录中运行以下命令:

genkit start -- go run .

在开发者界面的运行标签页中,您可以运行项目中定义的任何 flow:

Flow 运行器的屏幕截图

运行 flow 后,要检查 flow 调用的跟踪记录,您可以点击查看跟踪记录,或查看检查标签页。

部署 flow

您可以直接将 flow 部署为 Web API 端点,以便从应用客户端进行调用。有关部署的详细信息,请参阅其他页面;本部分仅简要介绍可用的部署选项。

net/http 服务器

如需使用任何 Go 托管平台(例如 Cloud Run)部署 Flow,请使用 DefineFlow() 定义您的 Flow,并使用提供的 Flow 处理程序启动 net/http 服务器:

import (
    "context"
    "log"
    "net/http"

    "github.com/firebase/genkit/go/genkit"
    "github.com/firebase/genkit/go/plugins/googlegenai"
    "github.com/firebase/genkit/go/plugins/server"
)

func main() {
    ctx := context.Background()

    g, err := genkit.Init(ctx, genkit.WithPlugins(&googlegenai.GoogleAI{}))
    if err != nil {
      log.Fatal(err)
    }

    menuSuggestionFlow := genkit.DefineFlow(g, "menuSuggestionFlow",
        func(ctx context.Context, theme string) (MenuItem, error) {
            // Flow implementation...
        })

    mux := http.NewServeMux()
    mux.HandleFunc("POST /menuSuggestionFlow", genkit.Handler(menuSuggestionFlow))
    log.Fatal(server.Start(ctx, "127.0.0.1:3400", mux))
}

server.Start() 是一个可选的辅助函数,用于启动服务器并管理其生命周期(包括处理中断信号,以便于本地开发),但您也可以采用自定义方式实现。

如需提供代码库中定义的所有 flow,您可以使用 ListFlows()

mux := http.NewServeMux()
for _, flow := range genkit.ListFlows(g) {
    mux.HandleFunc("POST /"+flow.Name(), genkit.Handler(flow))
}
log.Fatal(server.Start(ctx, "127.0.0.1:3400", mux))

您可以使用 POST 请求调用 flow 端点,如下所示:

curl -X POST "http://localhost:3400/menuSuggestionFlow" \
    -H "Content-Type: application/json" -d '{"data": "banana"}'

其他服务器框架

您还可以使用其他服务器框架来部署 flow。例如,您只需几行代码即可使用 Gin

router := gin.Default()
for _, flow := range genkit.ListFlows(g) {
    router.POST("/"+flow.Name(), func(c *gin.Context) {
        genkit.Handler(flow)(c.Writer, c.Request)
    })
}
log.Fatal(router.Run(":3400"))

如需了解如何部署到特定平台,请参阅使用 Cloud Run 的 Genkit