Les fonctions de file d'attente de tâches tirent parti de Cloud Tasks de Google pour aider votre application à exécuter des tâches chronophages, gourmandes en ressources ou à bande passante limitée de manière asynchrone, en dehors du flux principal de votre application.
Par exemple, imaginons que vous souhaitiez créer des sauvegardes d'un grand ensemble de fichiers image actuellement hébergés sur une API avec une limite de débit. Pour être un consommateur responsable de cette API, vous devez respecter ses limites de débit. De plus, ce type de job de longue durée peut échouer en raison de délais d'attente et de limites de mémoire.
Pour atténuer cette complexité, vous pouvez écrire une fonction de file d'attente de tâches qui définit des options de tâches de base telles que scheduleTime
et dispatchDeadline
, puis transmet la fonction à une file d'attente dans Cloud Tasks. L'environnement Cloud Tasks est conçu spécifiquement pour assurer un contrôle efficace de la congestion et des règles de réessai pour ces types d'opérations.
Le SDK Firebase pour Cloud Functions for Firebase version 3.20.1 et ultérieure interagit avec Firebase Admin SDK version 10.2.0 et ultérieure pour prendre en charge les fonctions de file d'attente des tâches.
L'utilisation de fonctions de file d'attente de tâches avec Firebase peut entraîner des frais de traitement Cloud Tasks. Pour en savoir plus, consultez la page sur les tarifs de Cloud Tasks.
Créer des fonctions de file d'attente de tâches
Pour utiliser les fonctions de file d'attente des tâches, suivez ce workflow :
- Écrivez une fonction de file d'attente des tâches à l'aide du SDK Firebase pour Cloud Functions.
- Testez votre fonction en la déclenchant avec une requête HTTP.
- Déployez votre fonction avec la CLI Firebase. Lorsque vous déployez votre fonction de file d'attente des tâches pour la première fois, la CLI crée une file d'attente des tâches dans Cloud Tasks avec les options (limitation du débit et nouvelle tentative) spécifiées dans votre code source.
- Ajoutez des tâches à la file d'attente de tâches que vous venez de créer, en transmettant des paramètres pour configurer un calendrier d'exécution si nécessaire. Pour ce faire, écrivez le code à l'aide de Admin SDK et déployez-le sur Cloud Functions for Firebase.
Écrire des fonctions de file d'attente des tâches
Les exemples de code de cette section sont basés sur une application qui configure un service permettant de sauvegarder toutes les images de la photo d'astronomie du jour de la NASA. Pour commencer, importez les modules requis :
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
Utilisez onTaskDispatched
ou on_task_dispatched
pour les fonctions de file d'attente des tâches. Lorsque vous écrivez une fonction de file d'attente de tâches, vous pouvez définir une configuration de nouvelles tentatives et de limitation du débit par file d'attente.
Configurer les fonctions de file d'attente des tâches
Les fonctions de file d'attente de tâches sont fournies avec un ensemble puissant de paramètres de configuration permettant de contrôler précisément les limites de débit et le comportement de nouvelle tentative d'une file d'attente de tâches :
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: chaque tâche de la file d'attente des tâches fait automatiquement l'objet de cinq tentatives au maximum. Cela permet d'atténuer les erreurs temporaires telles que les erreurs réseau ou les interruptions temporaires de service d'un service externe dépendant.retryConfig.minBackoffSeconds=60
: chaque tâche est relancée au moins 60 secondes après chaque tentative. Cela offre une grande marge entre chaque tentative afin de ne pas épuiser trop rapidement les cinq tentatives.rateLimits.maxConcurrentDispatch=6
: Au maximum six tâches sont distribuées à un moment donné. Cela permet d'assurer un flux constant de requêtes vers la fonction sous-jacente et de réduire le nombre d'instances actives et de démarrages à froid.
Tester les fonctions de file d'attente des tâches
Dans la plupart des cas, l'émulateur Cloud Functions est la meilleure façon de tester les fonctions de file d'attente des tâches. Consultez la documentation d'Emulator Suite pour savoir comment instrumenter votre application pour l'émulation des fonctions de file d'attente des tâches.
De plus, les fonctions task queue functions_sdk sont exposées en tant que fonctions HTTP simples dans Firebase Local Emulator Suite. Vous pouvez tester une fonction de tâche émulée en envoyant une requête HTTP POST avec une charge utile de données JSON :
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
Déployer des fonctions de file d'attente de tâches
Déployez la fonction de file d'attente des tâches à l'aide de la CLI Firebase :
$ firebase deploy --only functions:backupapod
Lorsque vous déployez une fonction de file d'attente des tâches pour la première fois, la CLI crée une file d'attente des tâches dans Cloud Tasks avec les options (limitation du débit et nouvelle tentative) spécifiées dans votre code source.
Si vous rencontrez des erreurs d'autorisation lors du déploiement de fonctions, assurez-vous que les rôles IAM appropriés sont attribués à l'utilisateur qui exécute les commandes de déploiement.
Mettre en file d'attente des fonctions de file d'attente de tâches
Les fonctions de file d'attente des tâches peuvent être mises en file d'attente dans Cloud Tasks à partir d'un environnement de serveur de confiance tel que Cloud Functions for Firebase à l'aide de Firebase Admin SDK pour Node.js ou des bibliothèques Google Cloud pour Python. Si vous n'avez jamais utilisé Admin SDK, consultez Ajouter Firebase à un serveur pour commencer.
Un flux typique crée une tâche, la met en file d'attente dans Cloud Tasks et définit la configuration de la tâche :
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
L'exemple de code tente de répartir l'exécution des tâches en associant un délai de N minutes à la Nème tâche. Cela se traduit par le déclenchement d'environ une tâche par minute. Notez que vous pouvez également utiliser
scheduleTime
(Node.js) ouschedule_time
(Python) si vous souhaitez que Cloud Tasks déclenche une tâche à une heure spécifique.L'exemple de code définit le délai maximal pendant lequel Cloud Tasks attendra la fin d'une tâche. Cloud Tasks tentera de relancer la tâche en suivant la configuration des tentatives de la file d'attente ou jusqu'à ce que ce délai soit atteint. Dans l'exemple, la file d'attente est configurée pour réessayer la tâche jusqu'à cinq fois, mais la tâche est automatiquement annulée si l'ensemble du processus (y compris les tentatives) prend plus de cinq minutes.
Récupérer et inclure l'URI cible
En raison de la façon dont Cloud Tasks crée des jetons d'authentification pour authentifier les requêtes adressées aux fonctions de file d'attente de tâches sous-jacentes, vous devez spécifier l'URL Cloud Run de la fonction lorsque vous mettez des tâches en file d'attente. Nous vous recommandons de récupérer l'URL de votre fonction de manière programmatique, comme indiqué ci-dessous :
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
Dépannage
Activer la journalisation Cloud Tasks
Les journaux de Cloud Tasks contiennent des informations de diagnostic utiles, comme l'état de la requête associée à une tâche. Par défaut, les journaux de Cloud Tasks sont désactivés en raison du volume important de journaux qu'il peut potentiellement générer dans votre projet. Nous vous recommandons d'activer les journaux de débogage lorsque vous développez et déboguez activement vos fonctions de file d'attente des tâches. Consultez Activer la journalisation.
Autorisations IAM
Des erreurs PERMISSION DENIED
peuvent s'afficher lorsque vous mettez des tâches en file d'attente ou lorsque Cloud Tasks tente d'appeler vos fonctions de file d'attente de tâches. Assurez-vous que votre projet dispose des liaisons IAM suivantes :
L'identité utilisée pour mettre en file d'attente les tâches sur Cloud Tasks doit disposer de l'autorisation IAM
cloudtasks.tasks.create
.Dans l'exemple, il s'agit du compte de service par défaut App Engine.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
L'identité utilisée pour mettre en file d'attente des tâches dans Cloud Tasks doit être autorisée à utiliser le compte de service associé à une tâche dans Cloud Tasks.
Dans l'exemple, il s'agit du compte de service par défaut App Engine.
Consultez la documentation Google Cloud IAM pour savoir comment ajouter le compte de service par défaut App Engine en tant qu'utilisateur du compte de service par défaut App Engine.
L'identité utilisée pour déclencher la fonction de file d'attente des tâches doit disposer de l'autorisation
cloudfunctions.functions.invoke
.Dans l'exemple, il s'agit du compte de service par défaut App Engine.
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker