Funkcje kolejki zadań korzystają z usługi GoogleCloud Tasks, aby pomagać aplikacji w asynchronicznym wykonywaniu czasochłonnych, zasobożernych lub wymagających dużej przepustowości zadań poza głównym przepływem aplikacji.
Załóżmy na przykład, że chcesz utworzyć kopie zapasowe dużego zestawu plików graficznych, które są obecnie hostowane w interfejsie API z limitem liczby żądań. Aby być odpowiedzialnym użytkownikiem tego interfejsu API, musisz przestrzegać limitów liczby żądań. Dodatkowo takie długotrwałe zadanie może być podatne na awarie z powodu limitów czasu i pamięci.
Aby zmniejszyć tę złożoność, możesz napisać funkcję kolejki zadań, która ustawia podstawowe opcje zadań, takie jak scheduleTime
i dispatchDeadline
, a następnie przekazuje funkcję do kolejki w Cloud Tasks. Środowisko Cloud Tasks zostało zaprojektowane specjalnie z myślą o zapewnieniu skutecznej kontroli przeciążenia i zasad ponawiania w przypadku tego rodzaju operacji.
Pakiet SDK Firebase w wersji Cloud Functions for Firebase 3.20.1 i nowszej współpracuje z Firebase Admin SDK w wersji 10.2.0 i nowszej, aby obsługiwać funkcje kolejki zadań.
Korzystanie z funkcji kolejki zadań w Firebase może wiązać się z opłatami za przetwarzanie.Cloud Tasks Więcej informacji znajdziesz w Cloud Taskscenniku.
Tworzenie funkcji kolejki zadań
Aby korzystać z funkcji kolejki zadań, wykonaj te czynności:
- Napisz funkcję kolejki zadań za pomocą pakietu SDK Firebase dla Cloud Functions.
- Przetestuj funkcję, aktywując ją za pomocą żądania HTTP.
- Wdróż funkcję za pomocą interfejsu Firebase CLI. Podczas pierwszego wdrażania funkcji kolejki zadań interfejs CLI utworzy kolejkę zadań w Cloud Tasks z opcjami (ograniczanie szybkości i ponawianie) określonymi w kodzie źródłowym.
- Dodaj zadania do nowo utworzonej kolejki zadań, przekazując parametry, aby w razie potrzeby skonfigurować harmonogram wykonywania. Możesz to zrobić, pisząc kod za pomocą Admin SDK i wdrażając go w Cloud Functions for Firebase.
Pisanie funkcji kolejki zadań
Przykłady kodu w tej sekcji są oparte na aplikacji, która konfiguruje usługę tworzącą kopie zapasowe wszystkich zdjęć z Astronomy Picture of the Day NASA. Aby rozpocząć, zaimportuj wymagane moduły:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
Używaj onTaskDispatched
lub on_task_dispatched
w przypadku funkcji kolejki zadań. Podczas pisania funkcji kolejki zadań możesz skonfigurować ponawianie i ograniczanie liczby wywołań dla poszczególnych kolejek.
Konfigurowanie funkcji kolejki zadań
Funkcje kolejki zadań mają zaawansowany zestaw ustawień konfiguracji, które umożliwiają precyzyjne kontrolowanie limitów szybkości i ponawiania w kolejce zadań:
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: każde zadanie w kolejce zadań jest automatycznie ponawiane maksymalnie 5 razy. Pomaga to ograniczyć przejściowe błędy, takie jak błędy sieciowe lub tymczasowe przerwy w działaniu zależnej usługi zewnętrznej.retryConfig.minBackoffSeconds=60
: każda próba wykonania zadania jest ponawiana co najmniej co 60 sekund. Dzięki temu między poszczególnymi próbami jest duży bufor, więc nie wyczerpujemy zbyt szybko 5 prób ponowienia.rateLimits.maxConcurrentDispatch=6
: W danym momencie wysyłanych jest maksymalnie 6 zadań. Zapewnia to stały strumień żądań do funkcji bazowej i pomaga zmniejszyć liczbę aktywnych instancji oraz uruchomień „na zimno”.
Testowanie funkcji kolejki zadań
W większości przypadków Cloud Functionsemulator jest najlepszym sposobem na testowanie funkcji kolejki zadań. Więcej informacji o tym, jak przygotować aplikację do emulacji funkcji kolejki zadań, znajdziesz w dokumentacji pakietu Emulator Suite.
Dodatkowo funkcje kolejki zadań functions_sdk są udostępniane jako proste funkcje HTTP w Firebase Local Emulator Suite. Możesz przetestować emulowaną funkcję zadania, wysyłając żądanie HTTP POST z ładunkiem danych JSON:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
Wdrażanie funkcji kolejki zadań
Wdróż funkcję kolejki zadań za pomocą interfejsu wiersza poleceń Firebase:
$ firebase deploy --only functions:backupapod
Podczas pierwszego wdrażania funkcji kolejki zadań interfejs CLI tworzy w Cloud Tasks kolejkę zadań z opcjami (ograniczanie szybkości i ponawianie) określonymi w kodzie źródłowym.
Jeśli podczas wdrażania funkcji wystąpią błędy uprawnień, sprawdź, czy użytkownikowi, który uruchamia polecenia wdrażania, przypisano odpowiednie role uprawnień.
Dodawanie do kolejki zadań funkcji
Funkcje kolejki zadań można umieszczać w kolejce w Cloud Tasks z zaufanego środowiska serwera, takiego jak Cloud Functions for Firebase, za pomocą Firebase Admin SDK dla Node.js lub bibliotek Google Cloud dla Pythona. Jeśli dopiero zaczynasz korzystać z Admin SDK, zapoznaj się z artykułem Dodawanie Firebase do serwera.
Typowy przepływ tworzy nowe zadanie, umieszcza je w kolejce Cloud Tasks i ustawia jego konfigurację:
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
Przykładowy kod próbuje rozłożyć wykonywanie zadań, przypisując do N-tego zadania opóźnienie wynoszące N minut. Oznacza to wywoływanie około 1 zadania na minutę. Pamiętaj, że możesz też użyć
scheduleTime
(Node.js) lubschedule_time
(Python), jeśli chcesz Cloud Tasks wywołać zadanie o określonej godzinie.Przykładowy kod określa maksymalny czas, przez jaki Cloud Tasks będzie czekać na zakończenie zadania. Cloud Tasks ponowi próbę wykonania zadania zgodnie z konfiguracją ponownych prób w kolejce lub do momentu osiągnięcia tego terminu. W przykładzie kolejka jest skonfigurowana tak, aby ponawiać wykonanie zadania maksymalnie 5 razy, ale zadanie jest automatycznie anulowane, jeśli cały proces (w tym próby ponowienia) trwa dłużej niż 5 minut.
Pobierz i uwzględnij docelowy identyfikator URI.
Ze względu na sposób, w jaki Cloud Tasks tworzy tokeny uwierzytelniania na potrzeby uwierzytelniania żądań do funkcji bazowych kolejek zadań, podczas dodawania zadań do kolejki musisz podać adres URL Cloud Run funkcji. Zalecamy programowe pobieranie adresu URL funkcji, jak pokazano poniżej:
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
Rozwiązywanie problemów
Włącz logowanie Cloud Tasks
Dzienniki z Cloud Tasks zawierają przydatne informacje diagnostyczne, takie jak stan żądania powiązanego z zadaniem. Domyślnie logi z usługi Cloud Tasks są wyłączone ze względu na dużą liczbę logów, które mogą być generowane w Twoim projekcie. Zalecamy włączenie dzienników debugowania podczas aktywnego tworzenia i debugowania funkcji kolejki zadań. Zobacz sekcję Włączanie logowania.
Uprawnienia
Podczas dodawania zadań do kolejki lub gdy Cloud Tasks próbuje wywołać funkcje kolejki zadań, mogą pojawić się błędy PERMISSION DENIED
. Sprawdź, czy Twój projekt ma te powiązania IAM:
Tożsamość używana do umieszczania zadań w kolejce w usłudze Cloud Tasks musi mieć uprawnienia
cloudtasks.tasks.create
IAM.W przykładzie jest to App Engine domyślne konto usługi.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
Tożsamość używana do umieszczania zadań w kolejce w usłudze Cloud Tasks musi mieć uprawnienia do korzystania z konta usługi powiązanego z zadaniem w usłudze Cloud Tasks.
W przykładzie jest to App Engine domyślne konto usługi.
Instrukcje dodawania App Engine domyślnego konta usługi jako użytkownika App Engine domyślnego konta usługi znajdziesz w dokumentacji Cloud IAM.
Tożsamość używana do wywoływania funkcji kolejki zadań musi mieć uprawnienie
cloudfunctions.functions.invoke
.W przykładzie jest to App Engine domyślne konto usługi.
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker