Функции постановки в очередь с помощью Cloud Tasks


Функции очереди задач используют преимущества Google Cloud Tasks , чтобы помочь вашему приложению выполнять трудоемкие, ресурсоемкие или ограниченные по полосе пропускания задачи асинхронно, вне основного потока приложения.

Например, представьте, что вы хотите создать резервные копии большого набора файлов изображений, которые в настоящее время размещены через API с ограничением по скорости. Чтобы быть ответственным пользователем этого API, необходимо соблюдать установленные им ограничения. Кроме того, подобные длительные задачи могут быть подвержены сбоям из-за тайм-аутов и ограничений по памяти.

Чтобы снизить эту сложность, можно написать функцию очереди задач, которая задаёт базовые параметры задачи, такие как scheduleTime и dispatchDeadline , а затем передаёт функцию в очередь в Cloud Tasks . Среда Cloud Tasks разработана специально для обеспечения эффективного контроля перегрузки и политик повтора для подобных операций.

Firebase SDK для Cloud Functions for Firebase v3.20.1 и выше взаимодействует с Firebase Admin SDK v10.2.0 и выше для поддержки функций очереди задач.

Использование функций очереди задач в Firebase может привести к взиманию платы за обработку Cloud Tasks . Подробнее см. в разделе «Цены на Cloud Tasks .

Создание функций очереди задач

Чтобы использовать функции очереди задач, следуйте этой схеме:

  1. Напишите функцию очереди задач с использованием Firebase SDK для Cloud Functions .
  2. Протестируйте свою функцию, запустив ее с помощью HTTP-запроса.
  3. Разверните свою функцию с помощью Firebase CLI. При первом развёртывании функции очереди задач CLI создаст очередь задач в Cloud Tasks с параметрами (ограничение частоты и количество повторов), указанными в исходном коде.
  4. Добавьте задачи в созданную очередь задач, передав параметры для настройки расписания выполнения при необходимости. Это можно сделать, написав код с помощью Admin SDK и развернув его в Cloud Functions for Firebase .

Написать функции очереди задач

Примеры кода в этом разделе основаны на приложении, которое настраивает службу резервного копирования всех изображений из программы NASA «Астрономическая фотография дня» . Для начала импортируйте необходимые модули:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Питон

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Используйте onTaskDispatched или on_task_dispatched для функций очереди задач. При написании функции очереди задач можно настроить количество повторных попыток и ограничение скорости для каждой очереди.

Настройка функций очереди задач

Функции очереди задач поставляются с мощным набором параметров конфигурации для точного управления ограничениями скорости и поведением повторных попыток очереди задач:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Питон

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : каждая задача в очереди задач автоматически повторяется до 5 раз. Это помогает снизить вероятность возникновения временных ошибок, таких как сетевые ошибки или временные сбои в работе зависимой внешней службы.

  • retryConfig.minBackoffSeconds=60 : Каждая задача повторяется с интервалом не менее 60 секунд после каждой попытки. Это обеспечивает большой буфер между попытками, чтобы не спешить исчерпать все 5 попыток слишком быстро.

  • rateLimits.maxConcurrentDispatch=6 : одновременно отправляется не более 6 задач. Это помогает обеспечить стабильный поток запросов к базовой функции и сократить количество активных экземпляров и холодных запусков.

Функции очереди тестовых задач

В большинстве случаев эмулятор Cloud Functions — лучший способ протестировать функции очереди задач. Чтобы узнать, как настроить приложение для эмуляции функций очереди задач , см. документацию Emulator Suite.

Кроме того, функции очереди задач (functions_sdk) представлены как простые HTTP-функции в Firebase Local Emulator Suite . Вы можете протестировать эмулированную функцию задачи, отправив HTTP-запрос POST с полезными данными JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Развертывание функций очереди задач

Разверните функцию очереди задач с помощью Firebase CLI:

$ firebase deploy --only functions:backupapod

При первом развертывании функции очереди задач CLI создает очередь задач в Cloud Tasks с параметрами (ограничение скорости и повторы), указанными в исходном коде.

Если при развертывании функций вы столкнулись с ошибками разрешений, убедитесь, что пользователю, запускающему команды развертывания, назначены соответствующие роли IAM .

Функции постановки задач в очередь

Функции очереди задач можно добавлять в очередь в Cloud Tasks из доверенной серверной среды, например, Cloud Functions for Firebase с помощью Firebase Admin SDK для Node.js или библиотек Google Cloud для Python. Если вы не знакомы с Admin SDK , ознакомьтесь с разделом «Добавление Firebase на сервер» , чтобы начать работу.

Типичный поток создает новую задачу, помещает ее в очередь в Cloud Tasks и задает конфигурацию для задачи:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Питон

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • В примере кода выполняется попытка распределить выполнение задач, назначая задержку в N минут для каждой N-й задачи. Это соответствует запуску примерно одной задачи в минуту. Обратите внимание, что вы также можете использовать scheduleTime (Node.js) или schedule_time (Python), если хотите, чтобы Cloud Tasks запускал задачу в определённое время.

  • В примере кода задаётся максимальное время ожидания завершения задачи. Cloud Tasks Cloud Tasks будет повторять попытку выполнения задачи в соответствии с настройками очереди повторных попыток или до достижения этого крайнего срока. В примере очередь настроена на повторные попытки выполнения задачи до 5 раз, но задача автоматически отменяется, если весь процесс (включая попытки повтора) занимает более 5 минут.

Извлечь и включить целевой URI

В связи с тем, как Cloud Tasks создаёт токены аутентификации для аутентификации запросов к базовым функциям очереди задач, при добавлении задач в очередь необходимо указать URL-адрес функции Cloud Run. Мы рекомендуем программно получать URL-адрес функции, как показано ниже:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Питон

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Поиск неисправностей

Включить ведение журнала Cloud Tasks

Журналы Cloud Tasks содержат полезную диагностическую информацию, например, статус запроса, связанного с задачей. По умолчанию журналы Cloud Tasks отключены из-за большого объёма журналов, которые они потенциально могут генерировать в вашем проекте. Мы рекомендуем включить журналы отладки во время активной разработки и отладки функций очереди задач. См. раздел Включение журналирования .

Разрешения IAM

Вы можете столкнуться с ошибками PERMISSION DENIED при добавлении задач в очередь или при попытке Cloud Tasks вызвать функции очереди задач. Убедитесь, что в вашем проекте есть следующие привязки IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Для удостоверения, используемого для постановки задач в очередь Cloud Tasks необходимо разрешение на использование учетной записи службы, связанной с задачей в Cloud Tasks .

    В примере это учетная запись службы App Engine по умолчанию .

Инструкции по добавлению учетной записи службы App Engine по умолчанию в качестве пользователя учетной записи службы App Engine по умолчанию см. в документации Google Cloud IAM.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker