Accoda le funzioni con Cloud Tasks


Le funzioni di coda di attività sfruttano Google Cloud Tasks per aiutare la tua app a eseguire attività asincrone che richiedono molto tempo, richiedono molte risorse o hanno limitazioni di larghezza di banda al di fuori del flusso dell'applicazione principale.

Ad esempio, immagina di voler creare i backup di un ampio insieme di file di immagini attualmente ospitati su un'API con un limite di frequenza. Per essere un consumatore responsabile di questa API, devi rispettare i relativi limiti di frequenza. Inoltre, questo tipo di job a lungo termine potrebbe essere vulnerabile a errori a causa di timeout e limiti di memoria.

Per ridurre questa complessità, puoi scrivere una funzione di coda di attività che imposta opzioni di attività di base come scheduleTime e dispatchDeadline e poi la passa a una coda in Cloud Tasks. L'ambiente Cloud Tasks è progettato specificamente per garantire un controllo efficace della congestione e criteri di ripetizione per questo tipo di operazioni.

L'SDK Firebase per Cloud Functions for Firebase versione 3.20.1 e successive è interoperabile con Firebase Admin SDK versione 10.2.0 e successive per supportare le funzioni di coda di attività.

L'utilizzo delle funzioni di coda di lavoro con Firebase può comportare addebiti per l'elaborazioneCloud Tasks. Per ulteriori informazioni, consulta la pagina relativa ai prezzi di Cloud Tasks.

Creare funzioni per le code di attività

Per utilizzare le funzioni della coda di attività, segui questo flusso di lavoro:

  1. Scrivi una funzione di coda di attività utilizzando l'SDK Firebase per Cloud Functions.
  2. Testa la funzione attivandola con una richiesta HTTP.
  3. Esegui il deployment della funzione con l'interfaccia a riga di comando Firebase. Quando esegui il deployment della funzione di coda di attività per la prima volta, la CLI crea una coda di attività in Cloud Tasks con le opzioni (limitazione della frequenza e riprova) specificate nel codice sorgente.
  4. Aggiungi attività alla coda di attività appena creata, passando i parametri per impostare una pianificazione di esecuzione, se necessario. Puoi farlo scrivendo il codice utilizzando Admin SDK e eseguendo il deployment in Cloud Functions for Firebase.

Scrivere funzioni di coda di attività

Gli esempi di codice in questa sezione si basano su un'app che configura un servizio che esegue il backup di tutte le immagini dell'Astronomy Picture of the Day della NASA. Per iniziare, importa i moduli richiesti:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Utilizza onTaskDispatched o on_task_dispatched per le funzioni della coda di attività. Quando scrivi una funzione di coda di attività, puoi impostare la configurazione di ripetizione e limitazione della frequenza per coda.

Configurare le funzioni della coda di attività

Le funzioni di coda di attività sono dotate di un potente insieme di impostazioni di configurazione per controllare con precisione i limiti di frequenza e il comportamento di nuovo tentativo di una coda di attività:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: per ogni attività nella coda viene eseguito automaticamente un nuovo tentativo fino a 5 volte. In questo modo è possibile ridurre gli errori temporanei come quelli di rete o l'interruzione temporanea del servizio di un servizio esterno dipendente.

  • retryConfig.minBackoffSeconds=60: ogni attività viene riprovata almeno 60 secondi dopo ogni tentativo. In questo modo, viene fornito un ampio buffer tra ogni tentativo, in modo da non esaurire troppo rapidamente i 5 tentativi di nuovo invio.

  • rateLimits.maxConcurrentDispatch=6: vengono inviate al massimo 6 attività contemporaneamente. In questo modo, puoi garantire un flusso costante di richieste alla funzione di base e ridurre il numero di istanze attive e di avvii a freddo.

Testare le funzioni della coda di attività

Nella maggior parte dei casi, l'emulatore Cloud Functions è il modo migliore per testare le funzioni di coda delle attività. Consulta la documentazione di Emulator Suite per scoprire come strumentare l'app per l'emulazione delle funzioni di coda di lavoro.

Inoltre, le funzioni_sdk della coda di attività vengono esposte come semplici funzioni HTTP nel Firebase Local Emulator Suite. Puoi testare una funzione di task emulata inviando una richiesta POST HTTP con un payload di dati JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Esegui il deployment delle funzioni di coda di attività

Esegui il deployment della funzione di coda di attività utilizzando l'interfaccia a riga di comando Firebase:

$ firebase deploy --only functions:backupapod

Quando esegui il deployment di una funzione di coda di attività per la prima volta, la CLI crea una coda di attività in Cloud Tasks con le opzioni (limitazione della frequenza e nuovo tentativo) specificate nel codice sorgente.

Se riscontri errori di autorizzazione durante il deployment delle funzioni, assicurati che i ruoli IAM appropriati siano assegnati all'utente che esegue i comandi di deployment.

Inserire in coda le funzioni della coda di attività

Le funzioni della coda di attività possono essere messe in coda in Cloud Tasks da un ambiente server attendibile come Cloud Functions for Firebase utilizzando Firebase Admin SDK per Node.js o le librerie Google Cloud per Python. Se non hai mai utilizzato i Admin SDK, consulta Aggiungere Firebase a un server per iniziare.

Un flusso di lavoro tipico crea una nuova attività, la mette in coda in Cloud Tasks e imposta la configurazione per l'attività:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Il codice di esempio tenta di distribuire l'esecuzione delle attività associando un ritardo di N minuti per la N-esima attività. Ciò equivale a attivare circa 1 attività al minuto. Tieni presente che puoi anche utilizzare scheduleTime (Node.js) o schedule_time (Python) se vuoi Cloud Tasks attivare un'attività in un momento specifico.

  • Il codice di esempio imposta il tempo massimo che Cloud Tasks attenderà per il completamento di un'attività. Cloud Tasks proverà nuovamente a eseguire l'attività in base alla configurazione dei nuovi tentativi della coda o fino al raggiungimento di questa scadenza. Nel sample, la coda è configurata per riprovare l'attività fino a 5 volte, ma l'attività viene annullata automaticamente se l'intera procedura (inclusi i tentativi di ripetizione) richiede più di 5 minuti.

Recupera e includi l'URI di destinazione

A causa del modo in cui Cloud Tasks crea i token di autenticazione per autenticare le richieste alle funzioni di coda di attività sottostanti, devi specificare l'URL Cloud Run della funzione quando inserisci le attività in coda. Ti consigliamo di recuperare l'URL della funzione in modo programmatico, come mostrato di seguito:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Risoluzione dei problemi

Attivare il logging di Cloud Tasks

I log di Cloud Tasks contengono informazioni diagnostiche utili, come lo stato della richiesta associata a un'attività. Per impostazione predefinita, i log di Cloud Tasks sono disattivati a causa dell'elevato volume di log che possono potenzialmente generare nel tuo progetto. Ti consigliamo di attivare i log di debug durante lo sviluppo e il debug delle funzioni della coda di attività. Consulta Attivare il logging.

Autorizzazioni IAM

Potresti visualizzare errori PERMISSION DENIED durante l'inserimento in coda delle attività o quando Cloud Tasks tenta di richiamare le funzioni della coda di attività. Assicurati che il progetto abbia le seguenti associazioni IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • L'identità utilizzata per mettere in coda le attività in Cloud Tasks deve disporre dell'autorizzazione per utilizzare l'account di servizio associato a un'attività in Cloud Tasks.

    Nel campione, si tratta dell'account di servizio predefinito App Engine.

Consulta la documentazione di Google Cloud IAM per istruzioni su come aggiungere l'account di servizio predefinito App Engine come utente dell'account di servizio predefinito App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker