Kolejkowanie funkcji za pomocą Cloud Tasks

Funkcje kolejki zadań korzystają z Google Cloud Tasks aby pomóc aplikacji w asynchronicznym wykonywaniu czasochłonnych, wymagających dużej ilości zasobów lub ograniczonych przepustowością zadań poza głównym przepływem aplikacji.

Wyobraź sobie na przykład, że chcesz utworzyć kopie zapasowe dużego zbioru plików graficznych, które są obecnie hostowane w interfejsie API z limitem liczby żądań. Aby korzystać z tego interfejsu API w sposób odpowiedzialny, musisz przestrzegać jego limitów liczby żądań. Ponadto tego rodzaju długotrwałe zadania mogą się nie powieść z powodu przekroczenia limitu czasu lub pamięci.

Aby uprościć ten proces, możesz napisać funkcję kolejki zadań, która ustawia podstawowe opcje zadań, takie jak scheduleTime i dispatchDeadline, a następnie przekazuje funkcję do kolejki w Cloud Tasks. Środowisko Cloud Tasks zostało zaprojektowane specjalnie w celu zapewnienia skutecznej kontroli przeciążenia i zasad ponawiania w przypadku tego rodzaju operacji.

Pakiet SDK Firebase dla Cloud Functions for Firebase w wersji 3.20.1 i nowszych współpracuje z Firebase Admin SDK w wersji 10.2.0 i nowszych, aby obsługiwać funkcje kolejki zadań.

Korzystanie z funkcji kolejki zadań w Firebase może wiązać się z opłatami za Cloud Tasks przetwarzanie. Więcej informacji znajdziesz w cenniku Cloud Tasks.

Tworzenie funkcji kolejki zadań

Aby korzystać z funkcji kolejki zadań, wykonaj te czynności:

  1. Napisz funkcję kolejki zadań za pomocą pakietu Firebase SDK dla Cloud Functions.
  2. Przetestuj funkcję, aktywując ją za pomocą żądania HTTP.
  3. Wdróż funkcję za pomocą wiersza poleceń Firebase. Podczas pierwszego wdrażania funkcji kolejki zadań interfejs wiersza poleceń utworzy w Cloud Tasks kolejkę zadań z opcjami (ograniczanie liczby żądań i ponawianie) określonymi w kodzie źródłowym.
  4. Dodaj zadania do nowo utworzonej kolejki zadań, przekazując parametry, aby w razie potrzeby skonfigurować harmonogram wykonywania. Możesz to zrobić, pisząc kod za pomocą Admin SDK i wdrażając go w Cloud Functions for Firebase.

Pisanie funkcji kolejki zadań

Przykłady kodu w tej sekcji są oparte na aplikacji, która konfiguruje usługę tworzącą kopie zapasowe wszystkich zdjęć z NASA's Astronomy Picture of the Day. Aby rozpocząć, zaimportuj wymagane moduły:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/tasks");
const {onRequest, HttpsError} = require("firebase-functions/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions");

// Dependencies for image backup.
const path = require("path");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

W przypadku funkcji kolejki zadań użyj onTaskDispatched lub on_task_dispatched. Podczas pisania funkcji kolejki zadań możesz ustawić konfigurację ponawiania i ograniczania liczby żądań dla każdej kolejki.

Konfigurowanie funkcji kolejki zadań

Funkcje kolejki zadań mają zaawansowany zestaw ustawień konfiguracyjnych, które umożliwiają precyzyjne kontrolowanie limitów liczby żądań i zachowania ponawiania kolejki zadań:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(
    retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
    rate_limits=RateLimits(max_concurrent_dispatches=10),
)
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: każde zadanie w kolejce zadań jest automatycznie ponawiane maksymalnie 5 razy. Pomaga to ograniczyć przejściowe błędy, takie jak błędy sieciowe lub tymczasowe przerwy w działaniu zależnej usługi zewnętrznej.

  • retryConfig.minBackoffSeconds=60: każde zadanie jest ponawiane co najmniej 60 sekund po każdej próbie. Zapewnia to duży bufor między poszczególnymi próbami, dzięki czemu nie wyczerpiemy zbyt szybko 5 prób ponowienia.

  • rateLimits.maxConcurrentDispatch=6: w danym momencie wysyłanych jest maksymalnie 6 zadań. Pomaga to zapewnić stały strumień żądań do funkcji bazowej i zmniejszyć liczbę aktywnych instancji oraz uruchomień „na zimno”.

Testowanie funkcji kolejki zadań

W większości przypadków najlepszym sposobem na testowanie funkcji kolejki zadań jest emulator Cloud Functions. Informacje o tym, jak przygotować aplikację do emulacji funkcji kolejki zadań, znajdziesz w dokumentacji pakietu Emulator Suite.

Dodatkowo funkcje kolejki zadań są udostępniane jako proste funkcje HTTP w Firebase Local Emulator Suite. Możesz przetestować emulowaną funkcję zadania, wysyłając żądanie HTTP POST z ładunkiem danych JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Wdrażanie funkcji kolejki zadań

Wdróż funkcję kolejki zadań za pomocą interfejsu wiersza poleceń Firebase:

$ firebase deploy --only functions:backupapod

Podczas pierwszego wdrażania funkcji kolejki zadań interfejs wiersza poleceń utworzy kolejkę zadań w Cloud Tasks z opcjami (ograniczanie liczby żądań i ponawianie) określonymi w kodzie źródłowym.

Jeśli podczas wdrażania funkcji wystąpią błędy uprawnień, upewnij się, że do użytkownika wykonującego polecenia wdrożenia są przypisane odpowiednie role uprawnień.

Dodawanie funkcji kolejki zadań do kolejki

Funkcje kolejki zadań można dodawać do kolejki w Cloud Tasks z zaufanego środowiska serwera, takiego jak Cloud Functions for Firebase za pomocą Firebase Admin SDK dla Node.js lub bibliotek Google Cloud dla Pythona. Jeśli dopiero zaczynasz korzystać z Admin SDK, zapoznaj się z artykułem Dodawanie Firebase do serwera.

Typowy przepływ pracy polega na utworzeniu nowego zadania, dodaniu go do kolejki w Cloud Tasks, i skonfigurowaniu go:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(
            schedule_time=schedule_time,
            dispatch_deadline_seconds=dispatch_deadline_seconds,
            uri=target_uri,
        )
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Przykładowy kod próbuje rozłożyć wykonywanie zadań, przypisując opóźnienie o N minut do N-tego zadania. Oznacza to aktywowanie ok. 1 zadania na minutę. Pamiętaj, że możesz też użyć scheduleTime (Node.js) lub schedule_time (Python), jeśli chcesz Cloud Tasks aktywować zadanie o określonej godzinie.

  • Przykładowy kod ustawia maksymalny czas Cloud Tasksbędzie czekać na zakończenie zadania. Cloud Tasks ponowi zadanie zgodnie z konfiguracją ponawiania kolejki lub do momentu osiągnięcia tego terminu. W tym przykładzie kolejka jest skonfigurowana tak, aby ponawiać zadanie maksymalnie 5 razy, ale zadanie jest automatycznie anulowane, jeśli cały proces (w tym próby ponowienia) trwa dłużej niż 5 minut.

Rozwiązywanie problemów

Włączanie logowania Cloud Tasks

Dzienniki z Cloud Tasks zawierają przydatne informacje diagnostyczne, takie jak stan żądania powiązanego z zadaniem. Domyślnie dzienniki z Cloud Tasks są wyłączone ze względu na dużą liczbę dzienników, które mogą być generowane w Twoim projekcie. Zalecamy włączenie dzienników debugowania podczas aktywnego tworzenia i debugowania funkcji kolejki zadań. Więcej informacji znajdziesz w artykule Włączanie logowania.

Uprawnienia

Podczas dodawania zadań do kolejki lub gdy Cloud Tasks próbuje wywołać funkcje kolejki zadań, mogą pojawić się błędy PERMISSION DENIED. Upewnij się, że Twój projekt ma te powiązania uprawnień:

  • Tożsamość używana do dodawania zadań do kolejki w Cloud Tasks musi mieć cloudtasks.tasks.create uprawnienie.

    W tym przykładzie jest to App Engine domyślne konto usługi

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Tożsamość używana do dodawania zadań do kolejki w Cloud Tasks musi mieć uprawnienia do korzystania z konta usługi powiązanego z zadaniem w Cloud Tasks.

    W tym przykładzie jest to App Engine domyślne konto usługi.

Instrukcje dotyczące dodawania App Engine domyślnego konta usługi jako użytkownika App Engine domyślnego konta usługi znajdziesz w dokumentacji Google Cloud IAM.

  • Tożsamość używana do aktywowania funkcji kolejki zadań musi mieć uprawnienie cloudfunctions.functions.invoke.

    W tym przykładzie jest to App Engine domyślne konto usługi

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker