ฟังก์ชันคิวของงานใช้ประโยชน์จาก Google Cloud Tasks เพื่อช่วยให้แอปของคุณเรียกใช้งานที่ใช้เวลานาน ใช้ทรัพยากรมาก หรือมีแบนด์วิดท์จำกัด แบบไม่พร้อมกันภายนอกโฟลว์แอปพลิเคชันหลัก
เช่น สมมติว่าคุณต้องการสร้างข้อมูลสำรองของชุดไฟล์รูปภาพขนาดใหญ่ที่โฮสต์อยู่ใน API ที่มีขีดจำกัดอัตรา คุณต้องเคารพขีดจำกัดอัตราของ API นั้นเพื่อให้เป็นผู้ใช้ API ที่มีความรับผิดชอบ นอกจากนี้ งานที่ใช้เวลานานเช่นนี้อาจเสี่ยงต่อการล้มเหลวเนื่องจากหมดเวลาและ ขีดจำกัดของหน่วยความจำ
คุณสามารถเขียนฟังก์ชันคิวของงานเพื่อตั้งค่าตัวเลือกงานพื้นฐาน เช่น scheduleTime
และ dispatchDeadline
แล้วส่งต่อฟังก์ชันไปยังคิวใน Cloud Tasks เพื่อลดความซับซ้อนนี้ Cloud Tasks
สภาพแวดล้อมได้รับการออกแบบมาโดยเฉพาะเพื่อให้มั่นใจว่าการควบคุมความแออัดและ
นโยบายการลองใหม่มีประสิทธิภาพสำหรับการดำเนินการประเภทนี้
Firebase SDK สำหรับ Cloud Functions for Firebase v3.20.1 ขึ้นไปทำงานร่วมกับ Firebase Admin SDK v10.2.0 ขึ้นไปเพื่อรองรับฟังก์ชันคิวงาน
การใช้ฟังก์ชันคิวงานกับ Firebase อาจทำให้เกิดค่าใช้จ่ายสำหรับ Cloud Tasks การประมวลผล ดูข้อมูลเพิ่มเติมได้ที่ Cloud Tasksการกำหนดราคา
สร้างฟังก์ชันคิวของงาน
หากต้องการใช้ฟังก์ชันคิวของงาน ให้ทำตามเวิร์กโฟลว์ต่อไปนี้
- เขียนฟังก์ชันคิวงานโดยใช้ Firebase SDK สำหรับ Cloud Functions
- ทดสอบฟังก์ชันโดยทริกเกอร์ด้วยคำขอ HTTP
- ทำให้ฟังก์ชันใช้งานได้ด้วย Firebase CLI เมื่อทําให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks พร้อมตัวเลือก (การจํากัดอัตราและการลองใหม่) ที่ระบุไว้ในซอร์สโค้ด
- เพิ่มงานลงในคิวงานที่สร้างขึ้นใหม่ โดยส่งพารามิเตอร์เพื่อตั้งค่า กำหนดการดำเนินการหากจำเป็น คุณทำได้โดยเขียนโค้ด โดยใช้ Admin SDK และนำไปใช้งานใน Cloud Functions for Firebase
เขียนฟังก์ชันคิวงาน
ตัวอย่างโค้ดในส่วนนี้อิงตามแอปที่ตั้งค่า บริการที่สำรองข้อมูลรูปภาพทั้งหมดจาก ภาพดาราศาสตร์ประจำวันของ NASA หากต้องการเริ่มต้นใช้งาน ให้นำเข้าโมดูลที่จำเป็นดังนี้
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
ใช้ onTaskDispatched
หรือ on_task_dispatched
สำหรับฟังก์ชันคิวของงาน เมื่อเขียนฟังก์ชันคิวงาน
คุณจะกำหนดค่าการลองใหม่และจำกัดอัตราต่อคิวได้
กำหนดค่าฟังก์ชันคิวของงาน
ฟังก์ชันคิวงานมาพร้อมชุดการตั้งค่าการกำหนดค่าที่มีประสิทธิภาพ เพื่อควบคุมขีดจํากัดอัตราและลักษณะการทํางานของการลองใหม่ของคิวงานได้อย่างแม่นยํา
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: ระบบจะลองส่งงานแต่ละรายการในคิวงานอีกครั้งโดยอัตโนมัติ สูงสุด 5 ครั้ง ซึ่งจะช่วยลดข้อผิดพลาดชั่วคราว เช่น ข้อผิดพลาดของเครือข่าย หรือการหยุดชะงักของบริการชั่วคราวของบริการภายนอกที่ขึ้นอยู่กับบริการนี้retryConfig.minBackoffSeconds=60
: ระบบจะลองทำงานแต่ละอย่างอีกครั้งโดยเว้นระยะห่างจากการลองครั้งก่อนอย่างน้อย 60 วินาที ซึ่งจะช่วยให้มีช่วงเวลาพักระหว่างการพยายามแต่ละครั้ง เราจึงไม่ต้องรีบใช้ความพยายามในการลองใหม่ 5 ครั้งเร็วเกินไปrateLimits.maxConcurrentDispatch=6
: ระบบจะจ่ายงานสูงสุด 6 งานในเวลาที่กำหนด ซึ่งจะช่วยให้มั่นใจได้ว่าจะมีคำขอไปยังฟังก์ชันพื้นฐานอย่างต่อเนื่อง และช่วยลดจำนวนอินสแตนซ์ที่ใช้งานอยู่และ Cold Start
ทดสอบฟังก์ชันคิวงาน
ในกรณีส่วนใหญ่ Cloud Functionsโปรแกรมจำลองเป็นวิธีที่ดีที่สุดในการทดสอบฟังก์ชันคิวของงาน ดูเอกสารประกอบของ Emulator Suite เพื่อดูวิธีวัดประสิทธิภาพแอปสำหรับการจำลองฟังก์ชันคิวของงาน
นอกจากนี้ ฟังก์ชัน Task Queue ของ SDK ยังแสดงเป็นฟังก์ชัน HTTP แบบง่ายใน Firebase Local Emulator Suite คุณทดสอบฟังก์ชันงานที่จำลองได้โดยส่งคำขอ HTTP POST พร้อมเพย์โหลดข้อมูล JSON ดังนี้
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
ทำให้ฟังก์ชันคิวงานใช้งานได้
ทำให้ฟังก์ชันคิวของงานใช้งานได้โดยใช้ Firebase CLI
$ firebase deploy --only functions:backupapod
เมื่อทําให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks พร้อมตัวเลือก (การจํากัดอัตราและการลองใหม่) ที่ระบุไว้ในซอร์สโค้ด
หากพบข้อผิดพลาดเกี่ยวกับสิทธิ์เมื่อติดตั้งใช้งานฟังก์ชัน ให้ตรวจสอบว่าได้มอบหมายบทบาท IAM ที่เหมาะสม ให้กับผู้ใช้ที่เรียกใช้คำสั่งการติดตั้งใช้งานแล้ว
จัดคิวฟังก์ชันคิวงาน
ฟังก์ชันคิวของงานสามารถจัดคิวใน Cloud Tasks จากสภาพแวดล้อมเซิร์ฟเวอร์ที่เชื่อถือได้ เช่น Cloud Functions for Firebase โดยใช้ Firebase Admin SDK สำหรับ Node.js หรือไลบรารี Google Cloud สำหรับ Python หากคุณเพิ่งเริ่มใช้ Admin SDK โปรดดูหัวข้อ เพิ่ม Firebase ลงในเซิร์ฟเวอร์เพื่อเริ่มต้นใช้งาน
โดยปกติแล้วโฟลว์จะสร้างงานใหม่ จัดคิวใน Cloud Tasks และตั้งค่าสำหรับงาน ดังนี้
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
โค้ดตัวอย่างพยายามกระจายการดำเนินการของ งานโดยเชื่อมโยงการหน่วงเวลาเป็นเวลา N นาทีสำหรับงานที่ N ซึ่งหมายถึงการทริกเกอร์งานประมาณ 1 งาน/นาที โปรดทราบว่าคุณยังใช้
scheduleTime
(Node.js) หรือschedule_time
(Python) ได้ด้วยหากต้องการให้ Cloud Tasks เรียกใช้งานที่เวลาที่เฉพาะเจาะจงโค้ดตัวอย่างจะกำหนดระยะเวลาสูงสุดที่ Cloud Tasks จะรอ ให้งานเสร็จสมบูรณ์ Cloud Tasks จะลองทำงานอีกครั้งตามการกำหนดค่าการลองใหม่ของคิว หรือจนกว่าจะถึงกำหนดเวลานี้ ในตัวอย่าง คิวได้รับการกำหนดค่าให้ลองทำงานอีกครั้งได้สูงสุด 5 ครั้ง แต่ระบบจะ ยกเลิกงานโดยอัตโนมัติหากกระบวนการทั้งหมด (รวมถึงการพยายามลองอีกครั้ง) ใช้เวลานานกว่า 5 นาที
ดึงและใส่ URI เป้าหมาย
เนื่องจากวิธีที่ Cloud Tasks สร้างโทเค็นการตรวจสอบสิทธิ์เพื่อตรวจสอบสิทธิ์ คำขอไปยังฟังก์ชันคิวของงานพื้นฐาน คุณจึงต้องระบุ URL ของ Cloud Run ของฟังก์ชันเมื่อจัดคิวงาน เราขอแนะนำให้คุณดึง URL สำหรับฟังก์ชันของคุณโดยอัตโนมัติตามที่แสดงด้านล่าง
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
การแก้ปัญหา
เปิดการบันทึก Cloud Tasks
บันทึกจาก Cloud Tasks มีข้อมูลการวินิจฉัยที่เป็นประโยชน์ เช่น สถานะของคำขอที่เชื่อมโยงกับงาน โดยค่าเริ่มต้น ระบบจะปิดบันทึกจาก Cloud Tasks เนื่องจากบันทึกจำนวนมากที่อาจสร้างขึ้นในโปรเจ็กต์ เราขอแนะนำให้คุณเปิดบันทึกการแก้ไขข้อบกพร่อง ขณะที่กำลังพัฒนาและแก้ไขข้อบกพร่องของฟังก์ชันคิวของงาน ดูการเปิด การบันทึก
สิทธิ์ IAM
คุณอาจเห็นPERMISSION DENIED
ข้อผิดพลาดเมื่อจัดคิวงานหรือเมื่อ Cloud Tasks พยายามเรียกใช้ฟังก์ชันคิวงาน ตรวจสอบว่าโปรเจ็กต์มี Binding IAM ต่อไปนี้
ข้อมูลประจำตัวที่ใช้ในการจัดคิวงานไปยัง Cloud Tasks ต้องมีสิทธิ์ IAM
cloudtasks.tasks.create
ในตัวอย่างนี้คือApp Engineบัญชีบริการเริ่มต้น
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
ข้อมูลประจำตัวที่ใช้ในการจัดคิวงานไปยัง Cloud Tasks ต้องมีสิทธิ์ ในการใช้บัญชีบริการที่เชื่อมโยงกับงานใน Cloud Tasks
ในตัวอย่างนี้คือApp Engineบัญชีบริการเริ่มต้น
ดูวิธีการเพิ่มApp Engineบัญชีบริการเริ่มต้น เป็นผู้ใช้App Engineบัญชีบริการเริ่มต้นได้ในเอกสารประกอบของ Google Cloud IAM
ข้อมูลประจำตัวที่ใช้ทริกเกอร์ฟังก์ชันคิวของงานต้องมีสิทธิ์
cloudfunctions.functions.invoke
ในตัวอย่างนี้คือApp Engineบัญชีบริการเริ่มต้น
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker