Accoda le funzioni con Cloud Tasks


Le funzioni di coda delle attività sfruttano Google Cloud Tasks per aiutare la tua app a eseguire attività che richiedono molto tempo, molte risorse o una larghezza di banda limitata in modo asincrono, al di fuori del flusso principale dell'applicazione.

Ad esempio, supponiamo di voler creare backup di un ampio insieme di file immagine attualmente ospitati su un'API con un limite di frequenza. Per essere un consumer responsabile di questa API, devi rispettare i limiti di frequenza. Inoltre, questo tipo di job a esecuzione prolungata potrebbe essere vulnerabile a errori dovuti a timeout e limiti di memoria.

Per ridurre questa complessità, puoi scrivere una funzione di coda delle attività che imposta le opzioni di base delle attività come scheduleTime e dispatchDeadline, quindi passa la funzione a una coda in Cloud Tasks. L'ambiente Cloud Tasks è progettato specificamente per garantire un controllo efficace della congestione e politiche di ripetizione per questi tipi di operazioni.

L'SDK Firebase per Cloud Functions for Firebase versione 3.20.1 e successive interagisce con Firebase Admin SDK versione 10.2.0 e successive per supportare le funzioni della coda di attività.

L'utilizzo delle funzioni di coda delle attività con Firebase può comportare addebiti per l'elaborazione di Cloud Tasks. Per ulteriori informazioni, consulta la pagina relativa ai prezzi di Cloud Tasks.

Crea funzioni di coda delle attività

Per utilizzare le funzioni della coda di attività, segui questo flusso di lavoro:

  1. Scrivi una funzione di coda di attività utilizzando l'SDK Firebase per Cloud Functions.
  2. Testa la funzione attivandola con una richiesta HTTP.
  3. Esegui il deployment della funzione con la CLI Firebase. Quando esegui il deployment della funzione di coda delle attività per la prima volta, la CLI crea una coda delle attività in Cloud Tasks con le opzioni (limitazione della frequenza e nuovi tentativi) specificate nel codice sorgente.
  4. Aggiungi attività alla coda di attività appena creata, passando i parametri per configurare una pianificazione di esecuzione, se necessario. Puoi farlo scrivendo il codice utilizzando Admin SDK e implementandolo in Cloud Functions for Firebase.

Scrivi funzioni di code di attività

Gli esempi di codice in questa sezione si basano su un'app che configura un servizio che esegue il backup di tutte le immagini della foto astronomica del giorno della NASA. Per iniziare, importa i moduli richiesti:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Utilizza onTaskDispatched o on_task_dispatched per le funzioni della coda di attività. Quando scrivi una funzione di coda delle attività, puoi impostare la configurazione dei tentativi e della limitazione della frequenza per coda.

Configura le funzioni della coda di attività

Le funzioni della coda di attività sono dotate di un potente insieme di impostazioni di configurazione per controllare con precisione i limiti di frequenza e il comportamento di nuovi tentativi di una coda di attività:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: ogni attività nella coda delle attività viene ritentata automaticamente fino a 5 volte. Ciò contribuisce a mitigare gli errori temporanei, come gli errori di rete o l'interruzione temporanea del servizio di un servizio esterno dipendente.

  • retryConfig.minBackoffSeconds=60: Ogni attività viene riprovata ad almeno 60 secondi di distanza da ogni tentativo. In questo modo, viene fornito un ampio buffer tra ogni tentativo per non esaurire troppo rapidamente i 5 tentativi.

  • rateLimits.maxConcurrentDispatch=6: vengono inviate al massimo 6 attività alla volta. Ciò contribuisce a garantire un flusso costante di richieste alla funzione sottostante e a ridurre il numero di istanze attive e avvii a freddo.

Testare le funzioni della coda di attività

Nella maggior parte dei casi, l'emulatore Cloud Functions è il modo migliore per testare le funzioni della coda delle attività. Consulta la documentazione di Emulator Suite per scoprire come strumentare la tua app per l'emulazione delle funzioni della coda di attività.

Inoltre, le funzioni di task queue functions_sdk sono esposte come semplici funzioni HTTP in Firebase Local Emulator Suite. Puoi testare una funzione di attività emulata inviando una richiesta HTTP POST con un payload di dati JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Esegui il deployment delle funzioni della coda di attività

Esegui il deployment della funzione di coda di attività utilizzando la CLI Firebase:

$ firebase deploy --only functions:backupapod

Quando esegui il deployment di una funzione di coda delle attività per la prima volta, la CLI crea una coda delle attività in Cloud Tasks con le opzioni (limitazione della frequenza e nuovi tentativi) specificate nel codice sorgente.

Se si verificano errori di autorizzazione durante il deployment delle funzioni, assicurati che all'utente che esegue i comandi di deployment siano assegnati i ruoli IAM appropriati.

Mettere in coda le funzioni della coda di attività

Le funzioni di coda delle attività possono essere accodate in Cloud Tasks da un ambiente server attendibile come Cloud Functions for Firebase utilizzando Firebase Admin SDK per Node.js o le librerie Google Cloud per Python. Se non hai mai utilizzato Admin SDK, consulta Aggiungere Firebase a un server per iniziare.

Un flusso tipico crea una nuova attività, la mette in coda in Cloud Tasks e imposta la configurazione per l'attività:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Il codice di esempio tenta di distribuire l'esecuzione delle attività associando un ritardo di N minuti per l'N-esima attività. Ciò si traduce nell'attivazione di circa 1 attività al minuto. Tieni presente che puoi anche utilizzare scheduleTime (Node.js) o schedule_time (Python) se vuoi Cloud Tasks per attivare un'attività a un'ora specifica.

  • Il codice di esempio imposta la quantità massima di tempo Cloud Tasks attenderà il completamento di un'attività. Cloud Tasks riproverà l'attività in base alla configurazione dei nuovi tentativi della coda o fino al raggiungimento di questa scadenza. Nell'esempio, la coda è configurata per riprovare l'attività fino a 5 volte, ma l'attività viene annullata automaticamente se l'intero processo (inclusi i tentativi) richiede più di 5 minuti.

Recuperare e includere l'URI di destinazione

A causa del modo in cui Cloud Tasks crea token di autenticazione per autenticare le richieste alle funzioni della coda di attività sottostante, devi specificare l'URL Cloud Run della funzione quando metti in coda le attività. Ti consigliamo di recuperare l'URL della funzione in modo programmatico come mostrato di seguito:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Risoluzione dei problemi

Attivare la registrazione dei log di Cloud Tasks

I log di Cloud Tasks contengono informazioni diagnostiche utili, come lo stato della richiesta associata a un'attività. Per impostazione predefinita, i log di Cloud Tasks sono disattivati a causa del volume elevato di log che possono potenzialmente generare nel tuo progetto. Ti consigliamo di attivare i log di debug mentre sviluppi e esegui il debug delle funzioni della coda di attività. Vedi Attivazione della registrazione.

Autorizzazioni IAM

Potresti visualizzare errori PERMISSION DENIED durante l'accodamento delle attività o quando Cloud Tasks tenta di richiamare le funzioni della coda di attività. Assicurati che il tuo progetto disponga dei seguenti binding IAM:

  • L'identità utilizzata per accodare le attività a Cloud Tasks deve disporre dell'autorizzazione IAM cloudtasks.tasks.create.

    Nell'esempio, questo è il service account predefinito App Engine

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • L'identità utilizzata per accodare le attività a Cloud Tasks deve disporre dell'autorizzazione per utilizzare il service account associato a un'attività in Cloud Tasks.

    Nell'esempio, questo è il service account predefinito App Engine.

Consulta la documentazione di Google Cloud IAM per istruzioni su come aggiungere l'account di servizio predefinito App Engine come utente dell'account di servizio predefinito App Engine.

  • L'identità utilizzata per attivare la funzione di coda delle attività richiede l'autorizzazione cloudfunctions.functions.invoke.

    Nell'esempio, questo è il service account predefinito App Engine

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker