ПР. Однократная отправка сообщений
Кратко:
- Реализация проекта по конвертированию видеофайлов в GIF с использованием Yandex Cloud Functions.
- Использование Yandex Message Queue для передачи задач в очередь.
- Создание очереди с помощью утилиты aws или Terraform.
- Создание базы данных в Yandex Database Service (YDB) и создание документной таблицы.
- Создание объекта хранения в Yandex Object Storage (Object Storage) и получение URL для доступа к нему.
- Создание функций для обработки видео и получения URL для результатов.
- Создание триггера для вызова функции обработчика и отправки сообщений в очередь.
- Протестирование системы с использованием идентификатора задачи для получения статуса из базы данных.
- Удаление триггера после завершения работы
Практическая работа. Однократная отправка сообщений
В этой практической работе мы реализуем проект, который позволит пользователям конвертировать видеофайлы в GIF. Такая задача хорошо подходит для Cloud Functions, потому что конвертирование отнимает немало ресурсов процессора, и чем качественнее видео, тем больше ресурсов требуется на его обработку.
Почему для решения этой задачи нужны очереди?
Представим, что мы попытались решить эту задачу «в лоб». Пользователь заходит на страницу и вводит ссылку на видеофайл. Сервис скачивает его, конвертирует и отдает ссылку на GIF. Возникают две серьёзные проблемы:
- Синхронное соединение не всегда стабильно. Чем дольше вы его держите, тем выше вероятность, что оно разорвётся. В этом случае всё придётся сделать заново. А если соединение нестабильно, то пользователь может и не дождаться результата.
- Задача ресурсоёмкая: если сервисом одновременно воспользуются много пользователей с большими видеороликами, мощностей может не хватить.
Чтобы избежать этих проблем, в архитектуру сервиса необходимо встроить очередь.
Шаг 1. Сервисный аккаунт и Lockbox
Создание сервисного аккаунта
Создайте сервисный аккаунт с именем
ffmpeg-account-for-cf
:
export SERVICE_ACCOUNT=$(yc iam service-account create --name ffmpeg-account-for-cf \
--description "service account for serverless" \
--format json | jq -r .)
Проверьте текущий список сервисных аккаунтов:
yc iam service-account list
После проверки запишите ID созданного сервисного аккаунта в переменную
SERVICE_ACCOUNT_ID
:
echo "export SERVICE_ACCOUNT_FFMPEG_ID=<ID>" >> ~/.bashrc && . ~/.bashrc
echo $SERVICE_ACCOUNT_FFMPEG_ID
Назначение роли сервисному аккаунту
Добавим вновь созданному сервисному аккаунту роли
storage.viewer
, storage.uploader
, ymq.reader
, ymq.writer
, ydb.admin
, serverless.functions.invoker
, и lockbox.payloadViewer
:
echo "export FOLDER_ID=$(yc config get folder-id)" >> ~/.bashrc && . ~/.bashrc
echo $FOLDER_ID
yc resource-manager folder add-access-binding $FOLDER_ID \
--subject serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--role storage.viewer
yc resource-manager folder add-access-binding $FOLDER_ID \
--subject serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--role storage.uploader
yc resource-manager folder add-access-binding $FOLDER_ID \
--subject serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--role ymq.reader
yc resource-manager folder add-access-binding $FOLDER_ID \
--subject serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--role ymq.writer
yc resource-manager folder add-access-binding $FOLDER_ID \
--subject serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--role ydb.admin
yc resource-manager folder add-access-binding $FOLDER_ID \
--subject serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--role serverless.functions.invoker
yc resource-manager folder add-access-binding $FOLDER_ID \
--subject serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--role lockbox.payloadViewer
yc resource-manager folder add-access-binding $FOLDER_ID \
--subject serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--role editor
Вы можете назначить несколько ролей и с помощью команды
set-access-binding
. Но эта команда полностью перезаписывает права доступа к ресурсу и все текущие роли на него будут удалены! Поэтому сначала убедитесь, что ресурсу не назначены роли, которые вы не хотите потерять:
yc resource-manager folder list-access-bindings $FOLDER_ID
yc resource-manager folder set-access-bindings $FOLDER_ID \
--access-binding role=storage.viewer,subject=serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--access-binding role=storage.uploader,subject=serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--access-binding role=ymq.reader,subject=serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--access-binding role=ymq.writer,subject=serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--access-binding role=ydb.admin,subject=serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--access-binding role=serverless.functions.invoker,subject=serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--access-binding role=lockbox.payloadViewer,subject=serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID \
--access-binding role=editor,subject=serviceAccount:$SERVICE_ACCOUNT_FFMPEG_ID
Создание ключа доступа для сервисного аккаунта
Этот этап нужен для получения идентификатора ключа доступа и секретного ключа, которые будут использованы для загрузки файлов в Object Storage, работы с Yandex Message Queue и т. д. Для создания ключа доступа необходимо вызвать следующую команду:
yc iam access-key create --service-account-name ffmpeg-account-for-cf
В результате вы получите примерно следующее:
access_key:
id: ajefraollq5puj2tir1o
service_account_id: ajetdv28pl0a1a8r41f0
created_at: "2021-08-23T21:13:05.677319393Z"
key_id: BTPNvWthv0ZX2xVmlPIU
secret: cWLQ0HrTM0k_qAac43cwMNJA8VV_rfTg_kd4xVPi
Здесь
key_id
— это идентификатор ключа доступа ACCESS_KEY_ID
. А secret
— это секретный ключ SECRET_ACCESS_KEY
. Переменные ACCESS_KEY_ID
и SECRET_ACCESS_KEY
могут быть использованы для задания соответствующих значений aws_access_key_id
и aws_secret_access_key
при использовании библиотеки boto3.Создание элемента в сервисе Lockbox
В сервисе Lockbox (находится на стадии Preview) создайте ваш первый секрет, состоящий из набора версий, в которых хранятся ваши данные. Версия содержит наборы ключей и значений:
- Ключ — несекретное название для значения, по которому вы будете его идентифицировать.
- Значение — это секретные данные.
Версия не изменяется. Для любого изменения количества пар ключей-значений или их содержимого необходимо создать новую версию. Создадим секрет с именем
ffmpeg-sa-key
и парой ключей ACCESS_KEY_ID
и SECRET_ACCESS_KEY
:
yc lockbox secret create --name ffmpeg-sa-key \
--folder-id $FOLDER_ID \
--description "keys for serverless" \
--payload '[{"key": "ACCESS_KEY_ID", "text_value": <ACCESS_KEY_ID>}, {"key": "SECRET_ACCESS_KEY", "text_value": "<SECRET_ACCESS_KEY>"}]'
Получим и запишем значение
SECRET_ID
, оно нам потребуется при создании функции:
yc lockbox secret list
yc lockbox secret get --name ffmpeg-sa-key
echo "export SECRET_ID=<SECRET_ID>" >> ~/.bashrc && . ~/.bashrc
echo $SECRET_ID
Шаг 2. Создание очереди Yandex Message Queue
Для создания очереди Yandex Message Queue вы можете использовать три разных способа:
- консоль управления;
- консольная утилита
aws
; - Terraform.
Создание очереди с помощью утилиты aws
Воспользуемся AWS CLI. Для начала задайте конфигурацию с помощью команды
aws configure
. При этом от вас потребуется ввести:AWS Access Key ID
— идентификатор ключа доступаkey_id
сервисного аккаунта, полученный на предыдущем шаге.AWS Secret Access Key
— секретный ключsecret
сервисного аккаунта, полученный на предыдущем шаге.Default region name
— используйте значениеru-central1
.
По завершению конфигурации вы сможете создать очередь:
aws configure
aws sqs create-queue --queue-name ffmpeg --endpoint https://message-queue.api.cloud.yandex.net/
В результате успешного выполнения предыдущей команды в ответ вы получите URL:
{
"QueueUrl": "https://message-queue.api.cloud.yandex.net/b1ga4gj7agij03ln6aov/dj6000000003kv2t02b3/ffmpeg"
}
Запишем значения URL в переменную
YMQ_QUEUE_URL
. Она потребуется нам при создании функции:
echo "export YMQ_QUEUE_URL=<YMQ_QUEUE_URL>" >> ~/.bashrc && . ~/.bashrc
echo $YMQ_QUEUE_URL
Ещё вам потребует значение атрибута
QueueArn
, получим его:
aws sqs get-queue-attributes \
--endpoint https://message-queue.api.cloud.yandex.net \
--queue-url $YMQ_QUEUE_URL \
--attribute-names QueueArn
В результате вы получите ответ вида:
{
"Attributes": {
"QueueArn": "yrn:yc:ymq:ru-central1:b1gl21bkgss4msekt08i:ffmpeg"
}
}
Сохраним значение
QueueArn
в переменную YMQ_QUEUE_ARN
:
echo "export YMQ_QUEUE_ARN=<YMQ_QUEUE_ARN>" >> ~/.bashrc && . ~/.bashrc
echo $YMQ_QUEUE_ARN
Шаг 3. Создание базы данных в сервисе YDB
Создадим базу данных YDB с именем
ffmpeg
и типом serverless, используя для этого флаг --serverless
:
yc ydb database create ffmpeg \
--serverless \
--folder-id $FOLDER_ID
yc ydb database list
Сразу получим и сохраним
document_api_endpoint
в значение переменной DOCAPI_ENDPOINT
:
yc ydb database get --name ffmpeg
echo "export DOCAPI_ENDPOINT=<DOCAPI_ENDPOINT>" >> ~/.bashrc && . ~/.bashrc
echo $DOCAPI_ENDPOINT
Как только база данных создана, воспользуемся ранее использованной утилитой AWS CLI для создания документной таблицы в этой базе данных. Всю конфигурацию возьмем из файла
tasks.json
:
{
"AttributeDefinitions": [
{
"AttributeName": "task_id",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "task_id",
"KeyType": "HASH"
}
],
"TableName": "tasks"
}
Находясь в одном каталоге с файлом
tasks.json
, вызовите следующую команду для создания таблицы:
aws dynamodb create-table \
--cli-input-json file://tasks.json \
--endpoint-url $DOCAPI_ENDPOINT \
--region ru-central1
В консоли управления убедитесь, что БД
ffmpeg
создана, и в ней есть пустая таблица tasks
.Шаг 4. Создание бакета в сервисе Object Storage
Самый простой способ создания бакета в Object Storage — это использование консоли управления.
В консоли управления в вашем рабочем каталоге выберите сервис Object Storage. Нажмите кнопку Создать бакет. На странице создания бакета введите имя, в нашем примере это будет
storage-for-ffmpeg
, остальные параметры не меняйте.Нажмите кнопку Создать бакет для завершения операции. Далее вы всегда сможете поменять класс хранилища, его размер и настройки доступа.
Сохраним название бакета для дальнейшего использования:
echo "export S3_BUCKET=<имя бакета>" >> ~/.bashrc && . ~/.bashrc
echo $S3_BUCKET
Шаг 5. Создание функций
При создании функций нам потребуется ряд переменных:
SECRET_ID
— идентификатор секрета (можно получить из таблицы со списком секретов);YMQ_QUEUE_URL
— URL очереди (можно получить на странице обзора);DOCAPI_ENDPOINT
— его можно получить на странице обзора БД, нужен именно Document API;S3_BUCKET
— имя бакета, в нашем случае этоstorage-for-ffmpeg
.
Проверим заданные ранее переменные:
echo $SERVICE_ACCOUNT_FFMPEG_ID
echo $SECRET_ID
echo $YMQ_QUEUE_URL
echo $DOCAPI_ENDPOINT
echo $S3_BUCKET
Для обработки видео понадобится утилита
FFmpeg
. Скачайте статический релизный бинарный файл для Linux amd64 на сайте ffmpeg.org (обычно он находится в разделе FFmpeg Static Builds и называется примерно так: ffmpeg-release-amd64-static.tar.xz
). Извлеките из архива файл ffmpeg
. Обратите внимание, что у этого файла должны быть заданы права доступа на выполнение (установить нужный флаг можно с помощью команды chmod a+x ffmpeg
).Поскольку через консоль управления можно прикладывать файлы размером не более 3,5 МБ, загрузим код функций и файл
ffmpeg
в объектное хранилище (Object Storage).Исходный код в файле
index.py
содержит обе необходимые нам функции:
import json
import os
import subprocess
import uuid
from urllib.parse import urlencode
import boto3
import requests
import yandexcloud
from yandex.cloud.lockbox.v1.payload_service_pb2 import GetPayloadRequest
from yandex.cloud.lockbox.v1.payload_service_pb2_grpc import PayloadServiceStub
boto_session = None
storage_client = None
docapi_table = None
ymq_queue = None
def get_boto_session():
global boto_session
if boto_session is not None:
return boto_session
# initialize lockbox and read secret value
yc_sdk = yandexcloud.SDK()
channel = yc_sdk._channels.channel("lockbox-payload")
lockbox = PayloadServiceStub(channel)
response = lockbox.Get(GetPayloadRequest(secret_id=os.environ['SECRET_ID']))
# extract values from secret
access_key = None
secret_key = None
for entry in response.entries:
if entry.key == 'ACCESS_KEY_ID':
access_key = entry.text_value
elif entry.key == 'SECRET_ACCESS_KEY':
secret_key = entry.text_value
if access_key is None or secret_key is None:
raise Exception("secrets required")
print("Key id: " + access_key)
# initialize boto session
boto_session = boto3.session.Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key
)
return boto_session
def get_ymq_queue():
global ymq_queue
if ymq_queue is not None:
return ymq_queue
ymq_queue = get_boto_session().resource(
service_name='sqs',
endpoint_url='https://message-queue.api.cloud.yandex.net',
region_name='ru-central1'
).Queue(os.environ['YMQ_QUEUE_URL'])
return ymq_queue
def get_docapi_table():
global docapi_table
if docapi_table is not None:
return docapi_table
docapi_table = get_boto_session().resource(
'dynamodb',
endpoint_url=os.environ['DOCAPI_ENDPOINT'],
region_name='ru-central1'
).Table('tasks')
return docapi_table
def get_storage_client():
global storage_client
if storage_client is not None:
return storage_client
storage_client = get_boto_session().client(
service_name='s3',
endpoint_url='https://storage.yandexcloud.net',
region_name='ru-central1'
)
return storage_client
# API handler
def create_task(src_url):
task_id = str(uuid.uuid4())
get_docapi_table().put_item(Item={
'task_id': task_id,
'ready': False
})
get_ymq_queue().send_message(MessageBody=json.dumps({'task_id': task_id, "src": src_url}))
return {
'task_id': task_id
}
def get_task_status(task_id):
task = get_docapi_table().get_item(Key={
"task_id": task_id
})
if task['Item']['ready']:
return {
'ready': True,
'gif_url': task['Item']['gif_url']
}
return {'ready': False}
def handle_api(event, context):
action = event['action']
if action == 'convert':
return create_task(event['src_url'])
elif action == 'get_task_status':
return get_task_status(event['task_id'])
else:
return {"error": "unknown action: " + action}
# Converter handler
def download_from_ya_disk(public_key, dst):
api_call_url = 'https://cloud-api.yandex.net/v1/disk/public/resources/download?' + \
urlencode(dict(public_key=public_key))
response = requests.get(api_call_url)
download_url = response.json()['href']
download_response = requests.get(download_url)
with open(dst, 'wb') as video_file:
video_file.write(download_response.content)
def upload_and_presign(file_path, object_name):
client = get_storage_client()
bucket = os.environ['S3_BUCKET']
client.upload_file(file_path, bucket, object_name)
return client.generate_presigned_url('get_object', Params={'Bucket': bucket, 'Key': object_name}, ExpiresIn=3600)
def handle_process_event(event, context):
for message in event['messages']:
task_json = json.loads(message['details']['message']['body'])
task_id = task_json['task_id']
# Download video
download_from_ya_disk(task_json['src'], '/tmp/video.mp4')
# Convert with ffmpeg
subprocess.run(['ffmpeg', '-i', '/tmp/video.mp4', '-r', '10', '-s', '320x240', '/tmp/result.gif'])
result_object = task_id + ".gif"
# Upload to Object Storage and generate presigned url
result_download_url = upload_and_presign('/tmp/result.gif', result_object)
# Update task status in DocAPI
get_docapi_table().update_item(
Key={'task_id': task_id},
AttributeUpdates={
'ready': {'Value': True, 'Action': 'PUT'},
'gif_url': {'Value': result_download_url, 'Action': 'PUT'},
}
)
return "OK"
Сгенерируйте файл
requirements.txt
:
pipreqs $PWD --force
Находясь в директории с исходными файлами, упакуем все нужные файлы в ZIP-архив.
zip src.zip index.py requirements.txt ffmpeg
В Object Storage для простоты используем тот же бакет, куда далее будем складывать видео. На вкладке Объекты, вверху справа нажмите кнопку Загрузить и выберите созданный архив.
Создадим функции
ffmpeg-api
и ffmpeg-converter
, при этом сразу зададим все необходимые переменные и сервисный аккаунт:
yc serverless function create \
--name ffmpeg-api \
--description "function for ffmpeg-api"
yc serverless function create \
--name ffmpeg-converter \
--description "function for ffmpeg-converter"
yc serverless function version create \
--function-name ffmpeg-api \
--memory=256m \
--execution-timeout=5s \
--runtime=python37 \
--entrypoint=index.handle_api \
--service-account-id $SERVICE_ACCOUNT_FFMPEG_ID \
--environment SECRET_ID=$SECRET_ID \
--environment YMQ_QUEUE_URL=$YMQ_QUEUE_URL \
--environment DOCAPI_ENDPOINT=$DOCAPI_ENDPOINT \
--package-bucket-name $S3_BUCKET \
--package-object-name src.zip
yc serverless function version create \
--function-name ffmpeg-converter \
--memory=2048m \
--execution-timeout=600s \
--runtime=python37 \
--entrypoint=index.handle_process_event \
--service-account-id $SERVICE_ACCOUNT_FFMPEG_ID \
--environment SECRET_ID=$SECRET_ID \
--environment YMQ_QUEUE_URL=$YMQ_QUEUE_URL \
--environment DOCAPI_ENDPOINT=$DOCAPI_ENDPOINT \
--environment S3_BUCKET=$S3_BUCKET \
--package-bucket-name $S3_BUCKET \
--package-object-name src.zip
Тестирование функции
В консоли управления перейдите из рабочего каталога в раздел Cloud Functions и выберите ранее созданную функцию
ffmpeg-api
. Перейдите на вкладку Тестирование в боковом меню, выберите шаблон данных Без шаблона
и добавьте во вводные данные JSON:
{"action":"convert", "src_url":"https://disk.yandex.ru/i/38RbVC0spb_jQQ"}
Нажмите кнопку Запустить тест. Этим самым мы загрузим файл в хранилище и создадим задачу в БД. Если всё сделано правильно, то вы увидите такой результат:
{
"task_id": "133e05c2-1b98-41cc-9aab-b816d71af343"
}

Воспользуемся полученным идентификатором задачи
task_id
для получения статуса из базы данных. Для этого внесите в вводные данные JSON следующие изменения:
{"action":"get_task_status", "task_id":"<идентификатор задачи>"}
Нажмите кнопку Запустить тест. Так как мы ещё не обрабатывали задачи в очереди, результат очевиден:
{
"ready": false
}

Шаг 6. Создание триггера
Теперь создайте триггер, который будет вызывать функцию обработки сообщений из очереди. После создания триггер начинает работать через пять минут. Он будет брать по одному сообщению и раз в 10 секунд отправлять в функцию:
yc serverless trigger create message-queue \
--name ffmpeg \
--queue $YMQ_QUEUE_ARN \
--queue-service-account-id $SERVICE_ACCOUNT_FFMPEG_ID \
--invoke-function-name ffmpeg-converter \
--invoke-function-service-account-id $SERVICE_ACCOUNT_FFMPEG_ID \
--batch-size 1 \
--batch-cutoff 10s
С этого момента очередь начнёт обрабатываться. Можно проверить, готова ли задача, и, если это так, запросить по URL результат обработки из Object Storage.
Теперь у нас есть функция, которая выполняет функцию API, через которую мы можем ставить задачи в очередь на исполнение. Триггер раз в 10 секунд берет по одному сообщению в очереди и передает функции обработчику. Функция-обработчик формирует результат и обновляет данные в базе данных. При этом мы получаем сконвертированные GIF-файлы из видео.
Протестируйте систему, используя полученный ранее идентификатор задачи
task_id
для получения статуса из базы данных. Для этого внесите изменения в вводные данные JSON:
{"action":"get_task_status", "task_id":"133e05c2-1b98-41cc-9aab-b816d71af343"}
Нажмите кнопку Запустить тест. Если задача уже успела обработаться, то вы получите URL.

Удаление триггера
По завершении работы не забудьте удалить созданный триггер
ffmpeg
, иначе он будет продолжать работать:
yc serverless trigger delete ffmpeg
Не забудьте также удалить или остановить все созданные вами ресурсы.