Aufgabenwarteschlangenfunktionen nutzen Google Cloud Tasks, um zeitaufwendige, ressourcenintensive oder bandbreitenbeschränkte Aufgaben asynchron außerhalb des Hauptanwendungsablaufs auszuführen.
Angenommen, Sie möchten Sicherungen einer großen Anzahl von Bilddateien erstellen, die derzeit auf einer API mit Ratenbegrenzung gehostet werden. Als verantwortungsbewusster Nutzer dieser API müssen Sie die Ratenbegrenzungen einhalten. Außerdem kann es bei solchen Jobs, die lange ausgeführt werden, zu Fehlern aufgrund von Zeitüberschreitungen und Arbeitsspeicherlimits kommen.
Um diese Komplexität zu verringern, können Sie eine Task-Queue-Funktion schreiben, die grundlegende Task-Optionen wie scheduleTime
und dispatchDeadline
festlegt und die Funktion dann an eine Warteschlange in Cloud Tasks übergibt. Die Cloud Tasks-Umgebung wurde speziell entwickelt, um eine effektive Überlastungssteuerung und Wiederholungsrichtlinien für diese Art von Vorgängen zu gewährleisten.
Das Firebase SDK für Cloud Functions for Firebase v3.20.1 und höher ist mit Firebase Admin SDK v10.2.0 und höher kompatibel, um Funktionen für Aufgabenwarteschlangen zu unterstützen.
Die Verwendung von TaskQueue-Funktionen mit Firebase kann zu Gebühren für die Cloud Tasks-Verarbeitung führen. Weitere Informationen finden Sie unter Cloud Tasks-Preise.
Funktionen für Aufgabenwarteschlangen erstellen
So verwenden Sie Task-Queue-Funktionen:
- Schreiben Sie eine Task-Queue-Funktion mit dem Firebase SDK für Cloud Functions.
- Testen Sie Ihre Funktion, indem Sie sie mit einer HTTP-Anfrage auslösen.
- Stellen Sie Ihre Funktion mit der Firebase-CLI bereit. Wenn Sie Ihre Task-Queue-Funktion zum ersten Mal bereitstellen, wird mit der CLI eine Task-Queue in Cloud Tasks mit den in Ihrem Quellcode angegebenen Optionen (Ratenbegrenzung und Wiederholung) erstellt.
- Fügen Sie der neu erstellten Aufgabenwarteschlange Aufgaben hinzu und übergeben Sie bei Bedarf Parameter, um einen Ausführungszeitplan festzulegen. Sie können dies erreichen, indem Sie den Code mit Admin SDK schreiben und in Cloud Functions for Firebase bereitstellen.
Aufgabenwarteschlangenfunktionen schreiben
Die Codebeispiele in diesem Abschnitt basieren auf einer App, die einen Dienst einrichtet, mit dem alle Bilder von Astronomy Picture of the Day der NASA gesichert werden. Importieren Sie zuerst die erforderlichen Module:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
Verwenden Sie onTaskDispatched
oder on_task_dispatched
für Task-Queue-Funktionen. Wenn Sie eine Aufgabenwarteschlangenfunktion schreiben, können Sie die Konfiguration für Wiederholungen und Ratenbegrenzung für jede Warteschlange festlegen.
Funktionen der Aufgabenwarteschlange konfigurieren
Aufgabenwarteschlangenfunktionen bieten eine Reihe leistungsstarker Konfigurationseinstellungen, mit denen Sie Ratenbegrenzungen und das Wiederholungsverhalten einer Aufgabenwarteschlange genau steuern können:
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: Jede Aufgabe in der Aufgabenwarteschlange wird automatisch bis zu fünfmal wiederholt. So lassen sich vorübergehende Fehler wie Netzwerkfehler oder vorübergehende Dienstunterbrechungen eines abhängigen, externen Dienstes vermeiden.retryConfig.minBackoffSeconds=60
: Jede Aufgabe wird mindestens 60 Sekunden nach jedem Versuch wiederholt. So wird ein großer Puffer zwischen den einzelnen Versuchen geschaffen, damit die fünf Wiederholungsversuche nicht zu schnell aufgebraucht werden.rateLimits.maxConcurrentDispatch=6
: Es werden maximal 6 Aufgaben gleichzeitig gesendet. So wird ein stetiger Strom von Anfragen an die zugrunde liegende Funktion sichergestellt und die Anzahl der aktiven Instanzen und Kaltstarts reduziert.
Aufgabenwarteschlangen-Funktionen testen
In den meisten Fällen ist der Cloud Functions-Emulator die beste Möglichkeit, Task-Queue-Funktionen zu testen. In der Emulator Suite-Dokumentation erfahren Sie, wie Sie Ihre App für die Emulation von Task Queue-Funktionen instrumentieren.
Außerdem werden Task-Queue-Funktionen des SDK als einfache HTTP-Funktionen in Firebase Local Emulator Suite bereitgestellt. Sie können eine emulierte Aufgabenfunktion testen, indem Sie eine HTTP-POST-Anfrage mit einer JSON-Datennutzlast senden:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
Aufgabenwarteschlangenfunktionen bereitstellen
Stellen Sie die Task-Queue-Funktion mit der Firebase-CLI bereit:
$ firebase deploy --only functions:backupapod
Wenn Sie eine Task-Queue-Funktion zum ersten Mal bereitstellen, wird mit der CLI eine Task-Queue in Cloud Tasks mit Optionen (Ratenbegrenzung und Wiederholung) erstellt, die in Ihrem Quellcode angegeben sind.
Wenn beim Bereitstellen von Funktionen Berechtigungsfehler auftreten, prüfen Sie, ob dem Nutzer, der die Bereitstellungsbefehle ausführt, die entsprechenden IAM-Rollen zugewiesen sind.
Funktionen für Aufgabenwarteschlangen in die Warteschlange stellen
Aufgabenwarteschlangenfunktionen können in Cloud Tasks aus einer vertrauenswürdigen Serverumgebung wie Cloud Functions for Firebase mit Firebase Admin SDK für Node.js oder Google Cloud-Bibliotheken für Python in die Warteschlange gestellt werden. Wenn Sie noch nicht mit Admin SDK vertraut sind, lesen Sie den Artikel Firebase einem Server hinzufügen.
Bei einem typischen Ablauf wird eine neue Aufgabe erstellt, in Cloud Tasks in die Warteschlange gestellt und die Konfiguration für die Aufgabe festgelegt:
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
Im Beispielcode wird versucht, die Ausführung von Aufgaben zu verteilen, indem der N-ten Aufgabe eine Verzögerung von N Minuten zugewiesen wird. Das entspricht etwa einer Aufgabe pro Minute. Sie können auch
scheduleTime
(Node.js) oderschedule_time
(Python) verwenden, wenn Sie möchten, dass Cloud Tasks eine Aufgabe zu einem bestimmten Zeitpunkt auslöst.Im Beispielcode wird die maximale Zeit festgelegt, die Cloud Tasks auf den Abschluss einer Aufgabe wartet. Cloud Tasks wird die Aufgabe gemäß der Konfiguration für Wiederholungsversuche der Warteschlange oder bis zum Erreichen dieser Frist wiederholen. Im Beispiel wird die Warteschlange so konfiguriert, dass die Aufgabe bis zu fünfmal wiederholt wird. Die Aufgabe wird jedoch automatisch abgebrochen, wenn der gesamte Prozess (einschließlich der Wiederholungsversuche) länger als fünf Minuten dauert.
Ziel-URI abrufen und einfügen
Da Cloud Tasks Authentifizierungstokens zum Authentifizieren von Anfragen an die zugrunde liegenden Funktionen der Aufgabenwarteschlange erstellt, müssen Sie beim Einreihen von Aufgaben die Cloud Run-URL der Funktion angeben. Wir empfehlen, die URL für Ihre Funktion programmatisch abzurufen, wie unten gezeigt:
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
Fehlerbehebung
Cloud Tasks-Protokollierung aktivieren
Logs aus Cloud Tasks enthalten nützliche Diagnoseinformationen wie den Status der Anfrage, die mit einer Aufgabe verknüpft ist. Standardmäßig sind Logs von Cloud Tasks deaktiviert, da sie in Ihrem Projekt ein großes Logvolumen generieren können. Wir empfehlen, die Debug-Logs zu aktivieren, während Sie Ihre Task-Queue-Funktionen aktiv entwickeln und debuggen. Weitere Informationen finden Sie unter Logging aktivieren.
IAM-Berechtigungen
PERMISSION DENIED
-Fehler können beim Einreihen von Aufgaben oder beim Versuch von Cloud Tasks, Ihre Aufgabenwarteschlangenfunktionen aufzurufen, auftreten. Prüfen Sie, ob Ihr Projekt die folgenden IAM-Bindungen hat:
Die Identität, die zum Einreihen von Aufgaben in die Warteschlange für Cloud Tasks verwendet wird, benötigt die IAM-Berechtigung
cloudtasks.tasks.create
.Im Beispiel ist das das Standarddienstkonto App Engine.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
Die Identität, die zum Einreihen von Aufgaben in Cloud Tasks verwendet wird, benötigt die Berechtigung zur Verwendung des Dienstkontos, das einer Aufgabe in Cloud Tasks zugeordnet ist.
Im Beispiel ist das das Standarddienstkonto App Engine.
Eine Anleitung zum Hinzufügen des App Engine-Standarddienstkontos als Nutzer des App Engine-Standarddienstkontos finden Sie in der Google Cloud IAM-Dokumentation.
Die Identität, mit der die Task Queue-Funktion ausgelöst wird, benötigt die Berechtigung
cloudfunctions.functions.invoke
.Im Beispiel ist das das Standarddienstkonto App Engine.
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker