Como definir fluxos de trabalho de IA

O núcleo dos recursos de IA do app são as solicitações de modelo generativo, mas não é sempre que você pode receber a entrada do usuário, transmiti-la ao modelo e mostrar a saída do modelo de volta ao usuário. Geralmente, há etapas anterior e posterior ao processamento que precisam acompanhar a chamada do modelo. Exemplo:

  • Recuperar informações contextuais para enviar com a chamada de modelo.
  • Recuperar o histórico da sessão atual do usuário, por exemplo, em um app de chat.
  • Usar um modelo para reformatar a entrada do usuário de uma maneira adequada para transmitir a outro modelo.
  • Avaliar a "segurança" da saída de um modelo antes de apresentá-la ao usuário.
  • Combinação da saída de vários modelos.

Todas as etapas desse fluxo de trabalho precisam funcionar juntas para que qualquer tarefa relacionada à IA tenha sucesso.

No Genkit, você representa essa lógica estreitamente vinculada usando uma construção chamada fluxo. Os fluxos são escritos como funções, usando o código Go comum, mas adicionam recursos adicionais para facilitar o desenvolvimento de recursos de IA:

  • Segurança de tipos: esquemas de entrada e saída, que oferecem verificação de tipo estática e de execução.
  • Integração com a interface do desenvolvedor: depure fluxos independentemente do código do aplicativo usando a IU do desenvolvedor. Na IU do desenvolvedor, é possível executar fluxos e conferir os rastros de cada etapa.
  • Implantação simplificada: implante fluxos diretamente como endpoints da API da Web usando qualquer plataforma que possa hospedar um app da Web.

Os fluxos do Genkit são leves e discretos, e não forçam seu app a estar em conformidade com nenhuma abstração específica. Toda a lógica do fluxo é escrita em Go padrão, e o código dentro de um fluxo não precisa estar ciente do fluxo.

Como definir e chamar fluxos

Na forma mais simples, um fluxo envolve apenas uma função. O exemplo a seguir envolve uma função que chama GenerateData():

menuSuggestionFlow := genkit.DefineFlow(g, "menuSuggestionFlow",
    func(ctx context.Context, theme string) (string, error) {
        resp, err := genkit.GenerateData(ctx, g,
            ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
        )
        if err != nil {
            return "", err
        }

        return resp.Text(), nil
    })

Ao agrupar as chamadas genkit.Generate() dessa forma, você adiciona algumas funcionalidades: isso permite executar o fluxo da CLI do Genkit e da interface do desenvolvedor, e é um requisito para vários recursos do Genkit, incluindo implantação e observabilidade (seções posteriores discutem esses tópicos).

Esquemas de entrada e saída

Uma das vantagens mais importantes que os fluxos do Genkit têm em comparação à chamada direta a uma API de modelo é a segurança de tipo de entradas e saídas. Ao definir fluxos, é possível definir esquemas da mesma forma que o esquema de saída de uma chamada genkit.Generate(). No entanto, ao contrário de genkit.Generate(), também é possível especificar um esquema de entrada.

Confira um refinamento do último exemplo, que define um fluxo que recebe uma string como entrada e gera um objeto:

type MenuItem struct {
    Name        string `json:"name"`
    Description string `json:"description"`
}

menuSuggestionFlow := genkit.DefineFlow(g, "menuSuggestionFlow",
    func(ctx context.Context, theme string) (MenuItem, error) {
        return genkit.GenerateData[MenuItem](ctx, g,
            ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
        )
    })

O esquema de um fluxo não precisa necessariamente estar alinhado com o esquema das chamadas genkit.Generate() dentro do fluxo. Na verdade, um fluxo pode nem mesmo conter chamadas genkit.Generate(). Confira uma variação do exemplo que transmite um esquema para genkit.Generate(), mas usa a saída estruturada para formatar uma string simples, que o fluxo retorna.

type MenuItem struct {
    Name        string `json:"name"`
    Description string `json:"description"`
}

menuSuggestionMarkdownFlow := genkit.DefineFlow(g, "menuSuggestionMarkdownFlow",
    func(ctx context.Context, theme string) (string, error) {
        item, _, err := genkit.GenerateData[MenuItem](ctx, g,
            ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
        )
        if err != nil {
            return "", err
        }

        return fmt.Sprintf("**%s**: %s", item.Name, item.Description), nil
    })

Chamar fluxos

Depois de definir um fluxo, você pode chamá-lo no código Go:

item, err := menuSuggestionFlow.Run(ctx, "bistro")

O argumento para o fluxo precisa estar em conformidade com o esquema de entrada.

Se você tiver definido um esquema de saída, a resposta do fluxo estará em conformidade com ele. Por exemplo, se você definir o esquema de saída como MenuItem, a saída do fluxo vai conter as propriedades dele:

item, err := menuSuggestionFlow.Run(ctx, "bistro")
if err != nil {
    log.Fatal(err)
}

log.Println(item.DishName)
log.Println(item.Description)

Fluxos de streaming

Os fluxos oferecem suporte para streaming usando uma interface semelhante à interface de streaming do genkit.Generate(). O streaming é útil quando o fluxo gera uma grande quantidade de saídas, porque você pode apresentar a saída ao usuário conforme ela é gerada, o que melhora a capacidade de resposta percebida do app. Um exemplo conhecido são as interfaces de LLM baseadas em chat, que geralmente transmitem as respostas ao usuário à medida que são geradas.

Confira um exemplo de fluxo compatível com streaming:

type Menu struct {
    Theme  string     `json:"theme"`
    Items  []MenuItem `json:"items"`
}

type MenuItem struct {
    Name        string `json:"name"`
    Description string `json:"description"`
}

menuSuggestionFlow := genkit.DefineStreamingFlow(g, "menuSuggestionFlow",
    func(ctx context.Context, theme string, callback core.StreamCallback[string]) (Menu, error) {
        item, _, err := genkit.GenerateData[MenuItem](ctx, g,
            ai.WithPrompt("Invent a menu item for a %s themed restaurant.", theme),
            ai.WithStreaming(func(ctx context.Context, chunk *ai.ModelResponseChunk) error {
                // Here, you could process the chunk in some way before sending it to
                // the output stream using StreamCallback. In this example, we output
                // the text of the chunk, unmodified.
                return callback(ctx, chunk.Text())
            }),
        )
        if err != nil {
            return nil, err
        }

        return Menu{
            Theme: theme,
            Items: []MenuItem{item},
        }, nil
    })

O tipo string em StreamCallback[string] especifica o tipo de valores dos fluxos de fluxo. Ele não precisa necessariamente ser do mesmo tipo que o tipo de retorno, que é o tipo da saída completa do fluxo (Menu neste exemplo).

Neste exemplo, os valores transmitidos pelo fluxo são acoplados diretamente aos valores transmitidos pela chamada genkit.Generate() dentro do fluxo. Embora isso seja comum, não precisa ser assim: você pode enviar valores para o stream usando o callback com a frequência que for útil para seu fluxo.

Como chamar fluxos de streaming

Os fluxos de streaming podem ser executados como fluxos sem streaming com menuSuggestionFlow.Run(ctx, "bistro") ou podem ser transmitidos:

streamCh, err := menuSuggestionFlow.Stream(ctx, "bistro")
if err != nil {
    log.Fatal(err)
}

for result := range streamCh {
    if result.Err != nil {
        log.Fatal("Stream error: %v", result.Err)
    }
    if result.Done {
        log.Printf("Menu with %s theme:\n", result.Output.Theme)
        for item := range result.Output.Items {
            log.Println(" - %s: %s", item.Name, item.Description)
        }
    } else {
        log.Println("Stream chunk:", result.Stream)
    }
}

Como executar fluxos na linha de comando

É possível executar fluxos na linha de comando usando a ferramenta CLI do Genkit:

genkit flow:run menuSuggestionFlow '"French"'

Para fluxos de streaming, é possível imprimir a saída de streaming no console adicionando a sinalização -s:

genkit flow:run menuSuggestionFlow '"French"' -s

Executar um fluxo na linha de comando é útil para testar um fluxo ou executar fluxos que realizam tarefas necessárias de forma ad hoc. Por exemplo, para executar um fluxo que ingere um documento no seu banco de dados vetorial.

Fluxos de depuração

Uma das vantagens de encapsular a lógica de IA em um fluxo é que você pode testar e depurar o fluxo independentemente do app usando a interface do desenvolvedor do Genkit.

A interface do desenvolvedor depende de o app Go continuar em execução, mesmo que a lógica tenha sido concluída. Se você está começando agora e o Genkit não faz parte de um app mais amplo, adicione select {} como a última linha de main() para evitar que o app seja encerrado e possa ser inspecionado na interface.

Para iniciar a interface do desenvolvedor, execute o seguinte comando no diretório do projeto:

genkit start -- go run .

Na guia Executar da interface do desenvolvedor, é possível executar qualquer um dos fluxos definidos no seu projeto:

Captura de tela do executor do Flow

Depois de executar um fluxo, é possível inspecionar um rastro da invocação do fluxo clicando em Visualizar rastros ou na guia Inspecionar.

Como implantar fluxos

É possível implantar seus fluxos diretamente como endpoints da API da Web, prontos para serem chamados pelos clientes do app. A implantação é discutida em detalhes em várias outras páginas, mas esta seção fornece uma visão geral das opções de implantação.

Servidor net/http

Para implantar um fluxo usando qualquer plataforma de hospedagem do Go, como o Cloud Run, defina seu fluxo usando DefineFlow() e inicie um servidor net/http com o gerenciador de fluxo fornecido:

import (
    "context"
    "log"
    "net/http"

    "github.com/firebase/genkit/go/genkit"
    "github.com/firebase/genkit/go/plugins/googlegenai"
    "github.com/firebase/genkit/go/plugins/server"
)

func main() {
    ctx := context.Background()

    g, err := genkit.Init(ctx, genkit.WithPlugins(&googlegenai.GoogleAI{}))
    if err != nil {
      log.Fatal(err)
    }

    menuSuggestionFlow := genkit.DefineFlow(g, "menuSuggestionFlow",
        func(ctx context.Context, theme string) (MenuItem, error) {
            // Flow implementation...
        })

    mux := http.NewServeMux()
    mux.HandleFunc("POST /menuSuggestionFlow", genkit.Handler(menuSuggestionFlow))
    log.Fatal(server.Start(ctx, "127.0.0.1:3400", mux))
}

server.Start() é uma função auxiliar opcional que inicia o servidor e gerencia o ciclo de vida dele, incluindo a captura de sinais de interrupção para facilitar o desenvolvimento local, mas você pode usar seu próprio método.

Para atender todos os fluxos definidos na base de código, use ListFlows():

mux := http.NewServeMux()
for _, flow := range genkit.ListFlows(g) {
    mux.HandleFunc("POST /"+flow.Name(), genkit.Handler(flow))
}
log.Fatal(server.Start(ctx, "127.0.0.1:3400", mux))

É possível chamar um endpoint de fluxo com uma solicitação POST da seguinte maneira:

curl -X POST "http://localhost:3400/menuSuggestionFlow" \
    -H "Content-Type: application/json" -d '{"data": "banana"}'

Outros frameworks de servidor

Também é possível usar outros frameworks de servidor para implantar seus fluxos. Por exemplo, é possível usar Gin com apenas algumas linhas:

router := gin.Default()
for _, flow := range genkit.ListFlows(g) {
    router.POST("/"+flow.Name(), func(c *gin.Context) {
        genkit.Handler(flow)(c.Writer, c.Request)
    })
}
log.Fatal(router.Run(":3400"))

Para informações sobre a implantação em plataformas específicas, consulte Genkit com o Cloud Run.