Funktionen mit Cloud Tasks in die Warteschlange stellen


Aufgabenwarteschlangen nutzen Google Cloud Tasks, um zeitaufwendige, ressourcenintensive oder bandbreitenbegrenzte Aufgaben asynchron außerhalb des Hauptanwendungsablaufs auszuführen.

Angenommen, Sie möchten Sicherungen einer großen Anzahl von Bilddateien erstellen, die derzeit in einer API mit einem Ratelimit gehostet werden. Wenn Sie diese API verantwortungsvoll nutzen möchten, müssen Sie die Ratenlimits einhalten. Außerdem sind diese Arten von lang laufenden Jobs aufgrund von Zeitüberschreitungen und Arbeitsspeicherlimits anfällig für Fehler.

Um diese Komplexität zu verringern, können Sie eine Task-Queue-Funktion schreiben, die grundlegende Aufgabenoptionen wie scheduleTime und dispatchDeadline festlegt und die Funktion dann an eine Queue in Cloud Tasks weitergibt. Die Cloud Tasks-Umgebung wurde speziell für eine effektive Staukontrolle und Wiederholrichtlinien für diese Art von Vorgängen entwickelt.

Das Firebase SDK für Cloud Functions for Firebase Version 3.20.1 und höher ist mit Firebase Admin SDK Version 10.2.0 und höher kompatibel und unterstützt Aufgabenwarteschlangenfunktionen.

Die Verwendung von TaskQueue-Funktionen mit Firebase kann zu Kosten für die Cloud Tasks-Verarbeitung führen. Weitere Informationen finden Sie unter Cloud Tasks-Preise.

Funktionen für Aufgabenwarteschlangen erstellen

So verwenden Sie Funktionen für Aufgabenwarteschlangen:

  1. Eine Task-Warteschlangenfunktion mit dem Firebase SDK für Cloud Functions schreiben
  2. Testen Sie die Funktion, indem Sie sie mit einer HTTP-Anfrage auslösen.
  3. Stellen Sie die Funktion mit der Firebase-Befehlszeile bereit. Wenn Sie die Task-Queue-Funktion zum ersten Mal bereitstellen, wird in der Befehlszeile eine Task-Queue in Cloud Tasks mit den Optionen „Ratenbegrenzung“ und „Wiederholen“ erstellt, die in Ihrem Quellcode angegeben sind.
  4. Fügen Sie der neu erstellten Aufgabenwarteschlange Aufgaben hinzu und geben Sie bei Bedarf Parameter für die Einrichtung eines Ausführungszeitplans an. Sie können dies erreichen, indem Sie den Code mit Admin SDK schreiben und auf Cloud Functions for Firebase bereitstellen.

Aufgabenwarteschlangenfunktionen schreiben

Die Codebeispiele in diesem Abschnitt basieren auf einer App, mit der ein Dienst eingerichtet wird, der alle Bilder aus dem Astronomy Picture of the Day der NASA sichert. Importieren Sie zuerst die erforderlichen Module:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Verwenden Sie onTaskDispatched oder on_task_dispatched für Funktionen der Aufgabenwarteschlange. Beim Erstellen einer Task-Warteschlangenfunktion können Sie die Wiederholungs- und Ratenbegrenzung pro Warteschlange konfigurieren.

Funktionen der Aufgabenwarteschlange konfigurieren

Aufgabenwarteschlangenfunktionen bieten eine Reihe leistungsstarker Konfigurationseinstellungen, mit denen sich die Ratenbeschränkungen und das Wiederholungsverhalten einer Aufgabenwarteschlange genau steuern lassen:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Jede Aufgabe in der Aufgabenwarteschlange wird automatisch bis zu fünfmal wiederholt. So lassen sich vorübergehende Fehler wie Netzwerkfehler oder vorübergehende Dienstunterbrechungen eines abhängigen externen Dienstes abmildern.

  • retryConfig.minBackoffSeconds=60: Bei jeder Aufgabe wird mindestens 60 Sekunden nach dem vorherigen Versuch ein neuer Versuch unternommen. So haben wir zwischen den einzelnen Versuchen einen großen Puffer, sodass wir die fünf Wiederholungsversuche nicht zu schnell aufbrauchen.

  • rateLimits.maxConcurrentDispatch=6: Es werden jeweils maximal 6 Aufgaben gesendet. So wird ein konstanter Fluss von Anfragen an die zugrunde liegende Funktion sichergestellt und die Anzahl der aktiven Instanzen und Kaltstarts reduziert.

Aufgabenwarteschlangenfunktionen testen

In den meisten Fällen eignet sich der Cloud Functions-Emulator am besten, um Funktionen für Aufgabenwarteschlangen zu testen. In der Emulator Suite-Dokumentation erfahren Sie, wie Sie Ihre App für die Emulation von Task-Queue-Funktionen instrumentieren.

Darüber hinaus werden die Task-Queue-functions_sdk in der Firebase Local Emulator Suite als einfache HTTP-Funktionen bereitgestellt. Sie können eine emulierte Aufgabenfunktion testen, indem Sie eine HTTP-POST-Anfrage mit einer JSON-Datennutzlast senden:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Aufgabenwarteschlangenfunktionen bereitstellen

Task-Queue-Funktion mit der Firebase-Befehlszeile bereitstellen:

$ firebase deploy --only functions:backupapod

Wenn Sie eine Task-Queue-Funktion zum ersten Mal bereitstellen, erstellt die Befehlszeile eine Task-Queue in Cloud Tasks mit den Optionen „Ratenbegrenzung“ und „Wiederholen“, die im Quellcode angegeben sind.

Wenn beim Bereitstellen von Funktionen Berechtigungsfehler auftreten, prüfen Sie, ob dem Nutzer, der die Bereitstellungsbefehle ausführt, die entsprechenden IAM-Rollen zugewiesen sind.

Aufgabenwarteschlangenfunktionen in die Warteschlange stellen

Aufgabenwarteschlangenfunktionen können in Cloud Tasks aus einer vertrauenswürdigen Serverumgebung wie Cloud Functions for Firebase mithilfe der Firebase Admin SDK für Node.js oder der Google Cloud-Bibliotheken für Python in die Warteschlange gestellt werden. Wenn Sie noch keine Erfahrung mit Admin SDKs haben, lesen Sie den Hilfeartikel Firebase einem Server hinzufügen.

Bei einem typischen Ablauf wird eine neue Aufgabe erstellt, in Cloud Tasks eingereiht und die Konfiguration für die Aufgabe festgelegt:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Im Beispielcode wird versucht, die Ausführung von Aufgaben zu verteilen, indem der n-ten Aufgabe eine Verzögerung von n Minuten zugewiesen wird. Das entspricht etwa einer Aufgabe pro Minute. Sie können auch scheduleTime (Node.js) oder schedule_time (Python) verwenden, wenn Cloud Tasks eine Aufgabe zu einer bestimmten Zeit auslösen soll.

  • Im Beispielcode wird die maximale Zeit festgelegt, die Cloud Tasks auf die Ausführung einer Aufgabe warten soll. Bei Cloud Tasks wird die Aufgabe gemäß der Wiederholungskonfiguration der Warteschlange oder bis zu diesem Termin wiederholt. In diesem Beispiel ist die Warteschlange so konfiguriert, dass die Aufgabe bis zu fünfmal wiederholt wird. Die Aufgabe wird jedoch automatisch abgebrochen, wenn der gesamte Vorgang (einschließlich der Wiederholungsversuche) mehr als fünf Minuten dauert.

Ziel-URI abrufen und einfügen

Da Cloud Tasks Authentifizierungstokens zum Authentifizieren von Anfragen an die zugrunde liegenden Aufgabenwarteschlangenfunktionen erstellt, müssen Sie beim Einfügen von Aufgaben die Cloud Run-URL der Funktion angeben. Wir empfehlen, die URL für Ihre Funktion wie unten gezeigt programmatisch abzurufen:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Fehlerbehebung

Cloud Tasks-Protokollierung aktivieren

Logs von Cloud Tasks enthalten nützliche Diagnoseinformationen wie den Status der Anfrage, die mit einer Aufgabe verknüpft ist. Standardmäßig sind Logs von Cloud Tasks deaktiviert, da sie möglicherweise eine große Menge an Logs für Ihr Projekt generieren. Wir empfehlen, die Debug-Logs zu aktivieren, während Sie Ihre Task-Queue-Funktionen aktiv entwickeln und debuggen. Weitere Informationen finden Sie unter Logging aktivieren.

IAM-Berechtigungen

Möglicherweise werden PERMISSION DENIED-Fehler angezeigt, wenn Aufgaben in die Warteschlange gestellt werden oder Cloud Tasks versucht, die Funktionen der Aufgabenwarteschlange aufzurufen. Achten Sie darauf, dass Ihr Projekt die folgenden IAM-Bindungen hat:

  • Die Identität, mit der Aufgaben in Cloud Tasks eingereiht werden, benötigt die IAM-Berechtigung cloudtasks.tasks.create.

    Im Beispiel ist das das App Engine-Standarddienstkonto.

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Die Identität, die zum Einreihen von Aufgaben in Cloud Tasks verwendet wird, benötigt die Berechtigung zur Verwendung des Dienstkontos, das mit einer Aufgabe in Cloud Tasks verknüpft ist.

    Im Beispiel ist das das App Engine-Standarddienstkonto.

Eine Anleitung zum Hinzufügen des Standarddienstkontos von App Engine als Nutzer des Standarddienstkontos von App Engine finden Sie in der Google Cloud IAM-Dokumentation.

  • Die Identität, die zum Auslösen der Task-Queue-Funktion verwendet wird, benötigt die Berechtigung cloudfunctions.functions.invoke.

    Im Beispiel ist das das App Engine-Standarddienstkonto.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker