Функции очереди задач используют преимущества Google Cloud Tasks , чтобы помочь вашему приложению выполнять трудоемкие, ресурсоемкие или ограниченные по полосе пропускания задачи асинхронно, вне основного потока приложения.
Например, представьте, что вы хотите создать резервные копии большого набора файлов изображений, которые в настоящее время размещены на API с ограничением скорости. Чтобы быть ответственным потребителем этого API, вам нужно соблюдать его ограничения скорости. Кроме того, такого рода долгосрочные задания могут быть уязвимы для сбоев из-за тайм-аутов и ограничений памяти.
Чтобы уменьшить эту сложность, вы можете написать функцию очереди задач, которая устанавливает основные параметры задач, такие как scheduleTime
и dispatchDeadline
, а затем передает функцию в очередь в Cloud Tasks . Среда Cloud Tasks специально разработана для обеспечения эффективного контроля перегрузки и политик повторных попыток для таких операций.
Firebase SDK для Cloud Functions for Firebase v3.20.1 и выше взаимодействует с Firebase Admin SDK v10.2.0 и выше для поддержки функций очереди задач.
Использование функций очереди задач с Firebase может привести к расходам на обработку Cloud Tasks . Для получения дополнительной информации см. цены на Cloud Tasks .
Создать функции очереди задач
Чтобы использовать функции очереди задач, следуйте этой последовательности действий:
- Напишите функцию очереди задач с помощью Firebase SDK для Cloud Functions .
- Протестируйте свою функцию, запустив ее с помощью HTTP-запроса.
- Разверните свою функцию с помощью Firebase CLI. При первом развертывании функции очереди задач CLI создаст очередь задач в Cloud Tasks с параметрами (ограничение скорости и повтор), указанными в исходном коде.
- Добавьте задачи в недавно созданную очередь задач, передавая параметры для настройки расписания выполнения, если необходимо. Вы можете добиться этого, написав код с помощью Admin SDK и развернув его в Cloud Functions for Firebase .
Функции очереди задач записи
Примеры кода в этом разделе основаны на приложении, которое устанавливает службу, которая создает резервные копии всех изображений из NASA's Astronomy Picture of the Day . Чтобы начать, импортируйте необходимые модули:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Питон
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
Используйте onTaskDispatched
или on_task_dispatched
для функций очереди задач. При написании функции очереди задач вы можете задать конфигурацию повтора и ограничения скорости для каждой очереди.
Настройка функций очереди задач
Функции очереди задач поставляются с мощным набором параметров конфигурации для точного управления ограничениями скорости и поведением повторных попыток очереди задач:
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Питон
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: Каждая задача в очереди задач автоматически повторяется до 5 раз. Это помогает смягчить временные ошибки, такие как сетевые ошибки или временное нарушение обслуживания зависимой внешней службы.retryConfig.minBackoffSeconds=60
: Каждая задача повторяется не менее чем через 60 секунд после каждой попытки. Это обеспечивает большой буфер между каждой попыткой, поэтому мы не торопимся исчерпать 5 попыток слишком быстро.rateLimits.maxConcurrentDispatch=6
: в заданное время отправляется максимум 6 задач. Это помогает обеспечить постоянный поток запросов к базовой функции и помогает сократить количество активных экземпляров и холодных запусков.
Функции очереди тестовых задач
В большинстве случаев эмулятор Cloud Functions — лучший способ протестировать функции очереди задач. Ознакомьтесь с документацией Emulator Suite, чтобы узнать, как инструментировать свое приложение для эмуляции функций очереди задач .
Кроме того, task queue functions_sdk представлены как простые функции HTTP в Firebase Local Emulator Suite . Вы можете протестировать эмулированную функцию задачи, отправив запрос HTTP POST с полезной нагрузкой данных JSON:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
Развертывание функций очереди задач
Разверните функцию очереди задач с помощью Firebase CLI:
$ firebase deploy --only functions:backupapod
При первом развертывании функции очереди задач CLI создает очередь задач в Cloud Tasks с параметрами (ограничение скорости и повтор), указанными в исходном коде.
Если при развертывании функций возникают ошибки разрешений, убедитесь, что пользователю, запускающему команды развертывания, назначены соответствующие роли IAM .
Функции постановки задач в очередь
Функции очереди задач можно поставить в очередь в Cloud Tasks из доверенной серверной среды, например Cloud Functions for Firebase используя Firebase Admin SDK для Node.js или библиотеки Google Cloud для Python. Если вы новичок в Admin SDK , см . раздел Add Firebase to a server , чтобы начать работу.
Типичный поток создает новую задачу, помещает ее в очередь в Cloud Tasks и устанавливает конфигурацию для задачи:
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Питон
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
Пример кода пытается распределить выполнение задач, связывая задержку в N-е минут для N-й задачи. Это означает запуск ~ 1 задачи/минуту. Обратите внимание, что вы также можете использовать
scheduleTime
(Node.js) илиschedule_time
(Python), если хотите, чтобы Cloud Tasks запускал задачу в определенное время.В примере кода задается максимальное время, в течение которого Cloud Tasks будет ожидать завершения задачи. Cloud Tasks будет повторять задачу после настройки повтора очереди или до достижения этого крайнего срока. В примере очередь настроена на повтор задачи до 5 раз, но задача автоматически отменяется, если весь процесс (включая попытки повтора) занимает более 5 минут.
Извлечь и включить целевой URI
Из-за способа, которым Cloud Tasks создает токены аутентификации для аутентификации запросов к базовым функциям очереди задач, вы должны указать URL-адрес Cloud Run функции при постановке задач в очередь. Мы рекомендуем вам программно получить URL-адрес для вашей функции, как показано ниже:
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Питон
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
Поиск неисправностей
Включите ведение журнала Cloud Tasks
Журналы из Cloud Tasks содержат полезную диагностическую информацию, например, статус запроса, связанного с задачей. По умолчанию журналы из Cloud Tasks отключены из-за большого объема журналов, которые они потенциально могут генерировать в вашем проекте. Мы рекомендуем вам включить журналы отладки, пока вы активно разрабатываете и отлаживаете функции очереди задач. См. Включение ведения журнала .
Разрешения IAM
Вы можете увидеть ошибки PERMISSION DENIED
при постановке задач в очередь или когда Cloud Tasks пытается вызвать функции очереди задач. Убедитесь, что ваш проект имеет следующие привязки IAM:
Для идентификации, используемой для постановки задач в очередь в Cloud Tasks необходимо разрешение IAM
cloudtasks.tasks.create
.В примере это учетная запись службы App Engine по умолчанию.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
Для идентификации, используемой для постановки задач в очередь в Cloud Tasks необходимо разрешение на использование учетной записи службы, связанной с задачей в Cloud Tasks .
В примере это учетная запись службы App Engine по умолчанию .
Инструкции по добавлению учетной записи службы App Engine по умолчанию в качестве пользователя учетной записи службы App Engine по умолчанию см. в документации Google Cloud IAM.
Для идентификации, используемой для запуска функции очереди задач, необходимо разрешение
cloudfunctions.functions.invoke
.В примере это учетная запись службы App Engine по умолчанию.
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker