Aufgabenwarteschlangen nutzen Google Cloud Tasks, um zeitaufwendige, ressourcenintensive oder bandbreitenbegrenzte Aufgaben asynchron außerhalb des Hauptanwendungsablaufs auszuführen.
Angenommen, Sie möchten Sicherungen einer großen Anzahl von Bilddateien erstellen, die derzeit in einer API mit einem Ratelimit gehostet werden. Wenn Sie diese API verantwortungsvoll nutzen möchten, müssen Sie die Ratenlimits einhalten. Außerdem sind diese Arten von lang laufenden Jobs aufgrund von Zeitüberschreitungen und Arbeitsspeicherlimits anfällig für Fehler.
Um diese Komplexität zu verringern, können Sie eine Task-Queue-Funktion schreiben, die grundlegende Aufgabenoptionen wie scheduleTime
und dispatchDeadline
festlegt und die Funktion dann an eine Queue in Cloud Tasks weitergibt. Die Cloud Tasks-Umgebung wurde speziell für eine effektive Staukontrolle und Wiederholrichtlinien für diese Art von Vorgängen entwickelt.
Das Firebase SDK für Cloud Functions for Firebase Version 3.20.1 und höher ist mit Firebase Admin SDK Version 10.2.0 und höher kompatibel und unterstützt Aufgabenwarteschlangenfunktionen.
Die Verwendung von TaskQueue-Funktionen mit Firebase kann zu Kosten für die Cloud Tasks-Verarbeitung führen. Weitere Informationen finden Sie unter Cloud Tasks-Preise.
Funktionen für Aufgabenwarteschlangen erstellen
So verwenden Sie Funktionen für Aufgabenwarteschlangen:
- Eine Task-Warteschlangenfunktion mit dem Firebase SDK für Cloud Functions schreiben
- Testen Sie die Funktion, indem Sie sie mit einer HTTP-Anfrage auslösen.
- Stellen Sie die Funktion mit der Firebase-Befehlszeile bereit. Wenn Sie die Task-Queue-Funktion zum ersten Mal bereitstellen, wird in der Befehlszeile eine Task-Queue in Cloud Tasks mit den Optionen „Ratenbegrenzung“ und „Wiederholen“ erstellt, die in Ihrem Quellcode angegeben sind.
- Fügen Sie der neu erstellten Aufgabenwarteschlange Aufgaben hinzu und geben Sie bei Bedarf Parameter für die Einrichtung eines Ausführungszeitplans an. Sie können dies erreichen, indem Sie den Code mit Admin SDK schreiben und auf Cloud Functions for Firebase bereitstellen.
Aufgabenwarteschlangenfunktionen schreiben
Die Codebeispiele in diesem Abschnitt basieren auf einer App, mit der ein Dienst eingerichtet wird, der alle Bilder aus dem Astronomy Picture of the Day der NASA sichert. Importieren Sie zuerst die erforderlichen Module:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
Verwenden Sie onTaskDispatched
oder on_task_dispatched
für Funktionen der Aufgabenwarteschlange. Beim Erstellen einer Task-Warteschlangenfunktion können Sie die Wiederholungs- und Ratenbegrenzung pro Warteschlange konfigurieren.
Funktionen der Aufgabenwarteschlange konfigurieren
Aufgabenwarteschlangenfunktionen bieten eine Reihe leistungsstarker Konfigurationseinstellungen, mit denen sich die Ratenbeschränkungen und das Wiederholungsverhalten einer Aufgabenwarteschlange genau steuern lassen:
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: Jede Aufgabe in der Aufgabenwarteschlange wird automatisch bis zu fünfmal wiederholt. So lassen sich vorübergehende Fehler wie Netzwerkfehler oder vorübergehende Dienstunterbrechungen eines abhängigen externen Dienstes abmildern.retryConfig.minBackoffSeconds=60
: Bei jeder Aufgabe wird mindestens 60 Sekunden nach dem vorherigen Versuch ein neuer Versuch unternommen. So haben wir zwischen den einzelnen Versuchen einen großen Puffer, sodass wir die fünf Wiederholungsversuche nicht zu schnell aufbrauchen.rateLimits.maxConcurrentDispatch=6
: Es werden jeweils maximal 6 Aufgaben gesendet. So wird ein konstanter Fluss von Anfragen an die zugrunde liegende Funktion sichergestellt und die Anzahl der aktiven Instanzen und Kaltstarts reduziert.
Aufgabenwarteschlangenfunktionen testen
In den meisten Fällen eignet sich der Cloud Functions-Emulator am besten, um Funktionen für Aufgabenwarteschlangen zu testen. In der Emulator Suite-Dokumentation erfahren Sie, wie Sie Ihre App für die Emulation von Task-Queue-Funktionen instrumentieren.
Darüber hinaus werden die Task-Queue-functions_sdk in der Firebase Local Emulator Suite als einfache HTTP-Funktionen bereitgestellt. Sie können eine emulierte Aufgabenfunktion testen, indem Sie eine HTTP-POST-Anfrage mit einer JSON-Datennutzlast senden:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
Aufgabenwarteschlangenfunktionen bereitstellen
Task-Queue-Funktion mit der Firebase-Befehlszeile bereitstellen:
$ firebase deploy --only functions:backupapod
Wenn Sie eine Task-Queue-Funktion zum ersten Mal bereitstellen, erstellt die Befehlszeile eine Task-Queue in Cloud Tasks mit den Optionen „Ratenbegrenzung“ und „Wiederholen“, die im Quellcode angegeben sind.
Wenn beim Bereitstellen von Funktionen Berechtigungsfehler auftreten, prüfen Sie, ob dem Nutzer, der die Bereitstellungsbefehle ausführt, die entsprechenden IAM-Rollen zugewiesen sind.
Aufgabenwarteschlangenfunktionen in die Warteschlange stellen
Aufgabenwarteschlangenfunktionen können in Cloud Tasks aus einer vertrauenswürdigen Serverumgebung wie Cloud Functions for Firebase mithilfe der Firebase Admin SDK für Node.js oder der Google Cloud-Bibliotheken für Python in die Warteschlange gestellt werden. Wenn Sie noch keine Erfahrung mit Admin SDKs haben, lesen Sie den Hilfeartikel Firebase einem Server hinzufügen.
Bei einem typischen Ablauf wird eine neue Aufgabe erstellt, in Cloud Tasks eingereiht und die Konfiguration für die Aufgabe festgelegt:
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
Im Beispielcode wird versucht, die Ausführung von Aufgaben zu verteilen, indem der n-ten Aufgabe eine Verzögerung von n Minuten zugewiesen wird. Das entspricht etwa einer Aufgabe pro Minute. Sie können auch
scheduleTime
(Node.js) oderschedule_time
(Python) verwenden, wenn Cloud Tasks eine Aufgabe zu einer bestimmten Zeit auslösen soll.Im Beispielcode wird die maximale Zeit festgelegt, die Cloud Tasks auf die Ausführung einer Aufgabe warten soll. Bei Cloud Tasks wird die Aufgabe gemäß der Wiederholungskonfiguration der Warteschlange oder bis zu diesem Termin wiederholt. In diesem Beispiel ist die Warteschlange so konfiguriert, dass die Aufgabe bis zu fünfmal wiederholt wird. Die Aufgabe wird jedoch automatisch abgebrochen, wenn der gesamte Vorgang (einschließlich der Wiederholungsversuche) mehr als fünf Minuten dauert.
Ziel-URI abrufen und einfügen
Da Cloud Tasks Authentifizierungstokens zum Authentifizieren von Anfragen an die zugrunde liegenden Aufgabenwarteschlangenfunktionen erstellt, müssen Sie beim Einfügen von Aufgaben die Cloud Run-URL der Funktion angeben. Wir empfehlen, die URL für Ihre Funktion wie unten gezeigt programmatisch abzurufen:
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
Fehlerbehebung
Cloud Tasks-Protokollierung aktivieren
Logs von Cloud Tasks enthalten nützliche Diagnoseinformationen wie den Status der Anfrage, die mit einer Aufgabe verknüpft ist. Standardmäßig sind Logs von Cloud Tasks deaktiviert, da sie möglicherweise eine große Menge an Logs für Ihr Projekt generieren. Wir empfehlen, die Debug-Logs zu aktivieren, während Sie Ihre Task-Queue-Funktionen aktiv entwickeln und debuggen. Weitere Informationen finden Sie unter Logging aktivieren.
IAM-Berechtigungen
Möglicherweise werden PERMISSION DENIED
-Fehler angezeigt, wenn Aufgaben in die Warteschlange gestellt werden oder Cloud Tasks versucht, die Funktionen der Aufgabenwarteschlange aufzurufen. Achten Sie darauf, dass Ihr Projekt die folgenden IAM-Bindungen hat:
Die Identität, mit der Aufgaben in Cloud Tasks eingereiht werden, benötigt die IAM-Berechtigung
cloudtasks.tasks.create
.Im Beispiel ist das das App Engine-Standarddienstkonto.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
Die Identität, die zum Einreihen von Aufgaben in Cloud Tasks verwendet wird, benötigt die Berechtigung zur Verwendung des Dienstkontos, das mit einer Aufgabe in Cloud Tasks verknüpft ist.
Im Beispiel ist das das App Engine-Standarddienstkonto.
Eine Anleitung zum Hinzufügen des Standarddienstkontos von App Engine als Nutzer des Standarddienstkontos von App Engine finden Sie in der Google Cloud IAM-Dokumentation.
Die Identität, die zum Auslösen der Task-Queue-Funktion verwendet wird, benötigt die Berechtigung
cloudfunctions.functions.invoke
.Im Beispiel ist das das App Engine-Standarddienstkonto.
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker