הוספת פונקציות לתור באמצעות Cloud Tasks


הפונקציות של תור המשימות מנצלות את Google Cloud Tasks כדי לעזור לאפליקציה להריץ משימות אסינכרוניות שגוזלות זמן, צורכות הרבה משאבים או מוגבלות ברוחב הפס, מחוץ לזרימת האפליקציה הראשית.

לדוגמה, נניח שאתם רוצים ליצור גיבויים של קבוצה גדולה של קובצי תמונות שמתארחים כרגע ב-API עם הגבלת קצב. כדי להיות צרכני API אחראיים, אתם צריכים לכבד את מגבלות קצב הבקשות. בנוסף, יכול להיות שמשימות ארוכות כאלה יהיו פגיעות לכשלים בגלל פסק זמן ומגבלות זיכרון.

כדי לצמצם את המורכבות הזו, אפשר לכתוב פונקציית תור משימות שמגדירה אפשרויות בסיסיות למשימות כמו scheduleTime ו-dispatchDeadline, ואז מעבירה את הפונקציה לתור ב-Cloud Tasks. סביבת Cloud Tasks העבודה מיועדת במיוחד כדי להבטיח בקרה יעילה על עומס ועל מדיניות ניסיון חוזר עבור פעולות מהסוג הזה.

‫Firebase SDK ל-Cloud Functions for Firebase מגרסה 3.20.1 ואילך פועל בשילוב עם Firebase Admin SDK מגרסה 10.2.0 ואילך כדי לתמוך בפונקציות של תור משימות.

שימוש בפונקציות של תור משימות עם Firebase עלול לגרום לחיובים על עיבוד Cloud Tasks. מידע נוסף זמין במאמר בנושא תמחור של Cloud Tasks.

יצירת פונקציות של תור משימות

כדי להשתמש בפונקציות של תור המשימות, פועלים לפי תהליך העבודה הבא:

  1. כתיבת פונקציה של תור משימות באמצעות Firebase SDK ל-Cloud Functions.
  2. בודקים את הפונקציה על ידי הפעלת הטריגר שלה באמצעות בקשת HTTP.
  3. פורסים את הפונקציה באמצעות ה-CLI של Firebase. כשמפעילים את הפונקציה של תור המשימות בפעם הראשונה, ממשק ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וניסיון חוזר) שצוינו בקוד המקור.
  4. מוסיפים משימות לתור המשימות החדש, ומעבירים פרמטרים כדי להגדיר לוח זמנים לביצוע, אם צריך. אפשר לעשות את זה באמצעות כתיבת הקוד באמצעות Admin SDK ופריסתו ב-Cloud Functions for Firebase.

כתיבת פונקציות של תור משימות

דוגמאות הקוד בקטע הזה מבוססות על אפליקציה שמגדירה שירות לגיבוי כל התמונות מתוך Astronomy Picture of the Day של נאס"א. כדי להתחיל, מייבאים את המודולים הנדרשים:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

אפשר להשתמש ב-onTaskDispatched או ב-on_task_dispatched לפונקציות של תור משימות. כשכותבים פונקציה של תור משימות, אפשר להגדיר ניסיון חוזר לכל תור והגבלת קצב של יצירת בקשות.

הגדרת פונקציות של תור משימות

פונקציות של תור משימות מגיעות עם קבוצה עוצמתית של הגדרות קביעה שמאפשרות לשלוט במדויק במגבלות הקצב ובהתנהגות הניסיון החוזר של תור משימות:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: כל משימה בתור המשימות מנסה לבצע מחדש באופן אוטומטי עד 5 פעמים. כך אפשר לצמצם שגיאות זמניות כמו שגיאות ברשת או שיבוש זמני בשירות של שירות חיצוני שתלוי בו.

  • retryConfig.minBackoffSeconds=60: כל ניסיון חוזר של משימה מתבצע בהפרש של 60 שניות לפחות מהניסיון הקודם. כך נוצר מרווח גדול בין כל ניסיון, כדי שלא נמהר למצות את 5 הניסיונות החוזרים מהר מדי.

  • rateLimits.maxConcurrentDispatch=6: עד 6 משימות נשלחות בכל זמן נתון. כך אפשר להבטיח זרם קבוע של בקשות לפונקציה הבסיסית, ולצמצם את מספר המקרים הפעילים וההפעלות במצב התחלתי (cold start).

בדיקת פונקציות של תור משימות

ברוב המקרים, Cloud Functions האמולטור הוא הדרך הכי טובה לבדוק פונקציות של תור משימות. במסמכי התיעוד של Emulator Suite מוסבר איך להטמיע באפליקציה פונקציות של תור משימות לצורך הדמיה.

בנוסף, פונקציות task queue של task_queue מוצגות כפונקציות HTTP פשוטות ב-Firebase Local Emulator Suite. אפשר לבדוק פונקציית משימה מדומה על ידי שליחת בקשת HTTP POST עם מטען נתוני JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

פריסת פונקציות של תור משימות

פורסים את הפונקציה של תור המשימות באמצעות Firebase CLI:

$ firebase deploy --only functions:backupapod

כשמפעילים פונקציה של תור משימות בפעם הראשונה, ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וניסיון חוזר) שצוינו בקוד המקור.

אם נתקלתם בשגיאות הרשאות כשפרסתם פונקציות, ודאו שתפקידי ה-IAM המתאימים הוקצו למשתמש שמריץ את פקודות הפריסה.

הוספה לתור של פונקציות של תור משימות

אפשר להוסיף פונקציות של תור משימות לתור ב-Cloud Tasks מסביבת שרת מהימנה כמו Cloud Functions for Firebase באמצעות Firebase Admin SDK ל-Node.js או ספריות Google Cloud ל-Python. אם אתם חדשים ב-Admin SDK, כדאי לקרוא את המאמר הוספת Firebase לשרת כדי להתחיל.

בתהליך רגיל נוצרת משימה חדשה, היא מתווספת לתור ב-Cloud Tasks ומוגדרת:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • הקוד לדוגמה מנסה לפזר את הביצוע של המשימות על ידי שיוך של עיכוב של N דקות למשימה ה-N. המשמעות היא הפעלה של משימה אחת בדקה. שימו לב שאפשר להשתמש גם ב-scheduleTime (Node.js) או ב-schedule_time (Python) אם רוצים ש-Cloud Tasks יפעיל משימה בשעה ספציפית.

  • בקוד לדוגמה הגדרנו את משך הזמן המקסימלי שבו Cloud Tasks ימתין להשלמת משימה. מערכת Cloud Tasks תנסה שוב לבצע את המשימה בהתאם להגדרת הניסיון החוזר של התור, או עד שתגיעו לתאריך היעד הזה. בדוגמה, התור מוגדר לנסות לבצע את המשימה עד 5 פעמים, אבל המשימה מבוטלת באופן אוטומטי אם התהליך כולו (כולל ניסיונות חוזרים) נמשך יותר מ-5 דקות.

לאחזר ולכלול את ה-URI של היעד

בגלל האופן שבו Cloud Tasks יוצר אסימוני אימות כדי לאמת בקשות לפונקציות של תור המשימות הבסיסי, צריך לציין את כתובת ה-URL של פונקציית Cloud Run כשמוסיפים משימות לתור. מומלץ לאחזר את כתובת ה-URL של הפונקציה באופן פרוגרמטי, כמו בדוגמה הבאה:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

פתרון בעיות

הפעלת רישום ביומן של Cloud Tasks

היומנים מ-Cloud Tasks מכילים מידע אבחוני שימושי כמו הסטטוס של הבקשה שמשויכת למשימה. כברירת מחדל, יומנים מ-Cloud Tasks מושבתים בגלל נפח היומנים הגדול שהם עשויים ליצור בפרויקט. מומלץ להפעיל את יומני הניפוי באגים בזמן שאתם מפתחים ומנפים באגים בפונקציות של תור המשימות. איך מפעילים את הרישום ביומן

הרשאות IAM

יכול להיות שתראו שגיאות PERMISSION DENIED כשמוסיפים משימות לתור או כש-Cloud Tasks מנסה להפעיל את הפונקציות של תור המשימות. מוודאים שלפרויקט יש את הקישורים הבאים של IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

הוראות להוספת חשבון השירות שמוגדר כברירת מחדל App Engine כמשתמש בחשבון השירות שמוגדר כברירת מחדל App Engine מופיעות במסמכי ה-IAM של Google Cloud.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker