工作佇列函式會運用 Google Cloud Tasks,協助應用程式在主要應用程式流程之外,以非同步方式執行耗時、耗用大量資源或頻寬受限的工作。
舉例來說,假設您想備份目前在 API 上託管的大量圖片檔案,但 API 有速率限制。如要成為該 API 的負責任消費者,您必須遵守其速率限制。此外,這類長時間執行的工作可能會因逾時和記憶體限制而失敗。
如要減輕這項複雜度,您可以編寫工作佇列函式,設定 scheduleTime 和 dispatchDeadline 等基本工作選項,然後將函式交給 Cloud Tasks 中的佇列。Cloud Tasks
環境專門用於確保這類作業的有效壅塞控制和重試政策。
Firebase SDK for Cloud Functions for Firebase 3.20.1 以上版本可與 Firebase Admin SDK 10.2.0 以上版本互通,支援工作佇列函式。
在 Firebase 中使用工作佇列函式可能會產生 Cloud Tasks 處理費用。詳情請參閱Cloud Tasks定價。
建立工作佇列函式
如要使用工作佇列函式,請按照下列工作流程操作:
- 使用 Firebase SDK for Cloud Functions 編寫工作佇列函式。
 - 透過 HTTP 要求觸發函式,藉此測試函式。
 - 使用 Firebase CLI 部署函式。首次部署工作佇列函式時,CLI 會在 Cloud Tasks 中建立工作佇列,並使用來源程式碼中指定的選項 (速率限制和重試)。
 - 將工作新增至新建立的工作佇列,並視需要傳遞參數來設定執行時間表。如要達成這個目標,請使用 Admin SDK 編寫程式碼,然後部署至 Cloud Functions for Firebase。
 
編寫工作佇列函式
本節中的程式碼範例是以應用程式為基礎,該應用程式會設定服務,備份 NASA「每日天文圖片」中的所有圖片。首先,請匯入必要模組:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/tasks");
const {onRequest, HttpsError} = require("firebase-functions/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions");
// Dependencies for image backup.
const path = require("path");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
請使用 onTaskDispatched 或 on_task_dispatched 函式,編寫工作佇列函式時,您可以設定每個佇列的重試和速率限制設定。
設定工作佇列函式
工作佇列函式提供強大的設定選項,可精確控管工作佇列的速率限制和重試行為:
Node.js
exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5:工作佇列中的每個工作都會自動重試最多 5 次。這有助於減輕暫時性錯誤,例如網路錯誤或依附的外部服務暫時中斷。retryConfig.minBackoffSeconds=60:每次重試間隔至少 60 秒。這會在每次嘗試之間提供大量緩衝區,因此我們不會急著太快用完 5 次重試嘗試。rateLimits.maxConcurrentDispatch=6:一次最多可調度 6 項工作。這有助於確保基礎函式能持續接收要求,並減少有效執行個體和冷啟動的數量。
測試工作佇列函式
在大多數情況下,Cloud Functions 模擬器是測試工作佇列函式的最佳方式。請參閱 Emulator Suite 說明文件,瞭解如何為工作佇列函式模擬功能設定應用程式。
此外,工作佇列函式在 Firebase Local Emulator Suite 中會公開為簡單的 HTTP 函式。您可以傳送含有 JSON 資料酬載的 HTTP POST 要求,測試模擬工作函式:
 # start the Local Emulator Suite
 firebase emulators:start
 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data
部署工作佇列函式
使用 Firebase CLI 部署工作佇列函式:
$ firebase deploy --only functions:backupapod
首次部署工作佇列函式時,CLI 會在 Cloud Tasks 中建立工作佇列,並使用來源程式碼中指定的選項 (速率限制和重試)。
如果在部署函式時發生權限錯誤,請確認執行部署指令的使用者已獲派適當的 IAM 角色。
將工作佇列函式排入佇列
您可以使用 Node.js 適用的 Firebase Admin SDK 或 Python 適用的 Google Cloud 程式庫,從受信任的伺服器環境 (例如 Cloud Functions for Firebase) 將工作佇列函式排入 Cloud Tasks。如果您是 Admin SDK 新手,請參閱「將 Firebase 新增至伺服器」一文,瞭解如何開始使用。
一般流程會建立新工作、將工作加入 Cloud Tasks 的佇列,並設定工作:
Node.js
exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");
      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);
        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")
    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE
        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay
        dispatch_deadline_seconds = 60 * 5  # 5 minutes
        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
範例程式碼會為第 N 個工作關聯 N 分鐘的延遲時間,藉此分散工作執行時間。這表示每分鐘會觸發約 1 項工作。請注意,如要讓工作在特定時間觸發,也可以使用
scheduleTime(Node.js) 或schedule_time(Python)。Cloud Tasks範例程式碼會設定等待工作完成的時間上限。Cloud TasksCloud Tasks 會根據佇列的重試設定重試工作,或直到達到這個截止日期為止。在範例中,佇列設定為最多重試工作 5 次,但如果整個程序 (包括重試) 超過 5 分鐘,工作就會自動取消。
疑難排解
開啟 Cloud Tasks 記錄功能
Cloud Tasks 的記錄包含實用的診斷資訊,例如與工作相關聯的要求狀態。根據預設,系統會停用 Cloud Tasks 的記錄,因為這項功能可能會在專案中產生大量記錄。建議您在積極開發及偵錯工作佇列函式時,開啟偵錯記錄。請參閱「開啟記錄」。
IAM 權限
排隊工作時,或 Cloud Tasks 嘗試叫用工作佇列函式時,可能會出現 PERMISSION DENIED 錯誤。請確認專案具有下列 IAM 繫結:
用來將工作加入 Cloud Tasks 佇列的身分需要
cloudtasks.tasks.createIAM 權限。在範例中,這是App Engine預設服務帳戶
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
將工作加入 Cloud Tasks 時使用的身分,必須具備使用 Cloud Tasks 中與工作相關聯服務帳戶的權限。
在範例中,這是App Engine預設服務帳戶。
如要瞭解如何將App Engine預設服務帳戶新增為App Engine預設服務帳戶的使用者,請參閱 Google Cloud IAM 說明文件。
用於觸發工作佇列函式的身分需要
cloudfunctions.functions.invoke權限。在範例中,這是App Engine預設服務帳戶
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker