ПР. Проверка доступности веб-ресурсов
Кратко:
- Создание системы проверки доступности веб-ресурсов с использованием Yandex Cloud Functions, API Gateway и Yandex Message Queue.
- Добавление возможности ставить задачи по проверке доступности других веб-ресурсов.
- Использование библиотеки boto3 для работы с YMQ.
- Создание очереди Yandex Message Queue и функции для проверки доступности URL.
- Обновление спецификации API Gateway для предоставления доступа к функции.
- Создание функции для чтения из очереди и проверка ее работы.
- Создание триггера для вызова функции обработки сообщений из очереди один раз в минуту.
- Удаление триггера-таймера после завершения практической работы.
- Не забудьте удалить или остановить все созданные вами ресурсы.
Практическая работа. Проверка доступности веб-ресурсов
В этом уроке вы доработаете систему проверки доступности веб-ресурсов, которую создали на предыдущих практических занятиях. В текущем варианте она проверяет только доступность сайта ya.ru. Теперь давайте добавим в неё возможность ставить задачи по проверке доступности других веб-ресурсов.
Общая архитектура системы
У системы есть два метода:
CheckUrl
— ставит задачу на проверку указанного URL.GetResult
— считывает результаты проверки.
Метод
CheckUrl
обрабатывается функцией, которая будет складывать все запросы в очередь. Функция-обработчик будет вызываться раз в секунду, считывать URL из очереди, проверять его доступность и записывать результат в базу данных. Оттуда этот результат можно будет получить с помощью метода GetResult
.
Мы не будем менять уже созданные функции и таблицу в PostgreSQL, сделаем новые.
Работать с YMQ из функций мы будем с помощью библиотеки boto3. Чтобы её использовать, нужно создать сервисный аккаунт с секретным ключом доступа, а затем настроить зависимости функции. Сделаем это после того, как создадим очередь.
Шаг 1. Проверить наличие сервисного аккаунта
Если вы ранее создавали сервисный аккаунт с именем
service-account-for-cf
, добавляли вновь созданному сервисному аккаунту роли editor
и другие, то вам остаётся только создать ключ доступа:
yc iam access-key create --service-account-name service-account-for-cf
В результате вы получите примерно следующее:
access_key:
id: ajefraollq5puj2tir1o
service_account_id: ajetdv28pl0a1a8r41f0
created_at: "2021-08-23T21:13:05.677319393Z"
key_id: BTPNvWthv0ZX2xVmlPIU
secret: cWLQ0HrTM0k_qAac43cwMNJA8VV_rfTg_kd4xVPi
Здесь
key_id
— это идентификатор ключа доступа ACCESS_KEY
. А secret
— это секретный ключ SECRET_KEY
. Переменные ACCESS_KEY
и SECRET_KEY
могут быть использованы для задания соответствующих значений aws_access_key_id
и aws_secret_access_key
при использовании библиотеки boto3.Шаг 2. Создание очереди Yandex Message Queue
Вы можете создать очередь одним из трёх способов:
- через консоль управления;
- с помощью консольной утилиты
aws
; - с помощью Terraform.
В этом уроке мы будем использовать консоль управления. Откройте раздел Message Queue и нажмите кнопку Создать очередь.

В настройках создаваемой очереди задайте имя очереди
my-first-queue
, затем выберите тип очереди Стандартная и нажмите кнопку Создать.
Очередь создана.

Теперь зайдите в настройки очереди, чтобы посмотреть параметры подключения к ней. Нам потребуется значение URL.

Шаг 3. Создание функции
Для создания функции зададим ряд переменных:
VERBOSE_LOG
— определяет, пишет ли функция подробности своего выполнения в журнал.AWS_ACCESS_KEY_ID
— значение «Идентификатор ключа» из сервисного аккаунта, который мы сделали ранее.AWS_SECRET_ACCESS_KEY
— значение «Секретный ключ» из того же сервисного аккаунта.QUEUE_URL
— URL на очередь, его можно получить на обзорной странице созданной ранее очереди.
Чтобы задать переменные, выполните в консоли следующие команды:
echo "export VERBOSE_LOG=True" >> ~/.bashrc && . ~/.bashrc
echo "export AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>" >> ~/.bashrc && . ~/.bashrc
echo "export AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>" >> ~/.bashrc && . ~/.bashrc
echo "export QUEUE_URL=<QUEUE_URL>" >> ~/.bashrc && . ~/.bashrc
Создайте файл
my-url-receiver-function.py
со следующим содержанием:
import logging
import os
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
verboseLogging = eval(os.environ['VERBOSE_LOG']) ## Convert to bool
queue_url = os.environ['QUEUE_URL']
def log(logString):
if verboseLogging:
logger.info(logString)
def handler(event, context):
# Get url
try:
url = event['queryStringParameters']['url']
except Exception as error:
logger.error(error)
statusCode = 400
return {
'statusCode': statusCode
}
# Create client
client = boto3.client(
service_name='sqs',
endpoint_url='https://message-queue.api.cloud.yandex.net',
region_name='ru-central1'
)
# Send message to queue
client.send_message(
QueueUrl=queue_url,
MessageBody=url
)
log('Successfully sent test message to queue')
statusCode = 200
return {
'statusCode': statusCode
}
Затем воспользуйтесь командой
pipreqs $PWD --force
для формирования файла requirements.txt
и упакуйте файлы с функцией и требованиями в ZIP-архив.
zip my-url-receiver-function my-url-receiver-function.py requirements.txt
Создайте функцию и её версию:
yc serverless function create \
--name my-url-receiver-function \
--description "function for url"
yc serverless function version create \
--function-name=my-url-receiver-function \
--memory=256m \
--execution-timeout=5s \
--runtime=python312 \
--entrypoint=my-url-receiver-function.handler \
--service-account-id $SERVICE_ACCOUNT_ID \
--environment VERBOSE_LOG=$VERBOSE_LOG \
--environment AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
--environment AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
--environment QUEUE_URL=$QUEUE_URL \
--source-path my-url-receiver-function.zip
Тестирование функции
Перейдите в раздел Cloud Functions консоли управления облаком и выберите созданную функцию
my-url-receiver-function
. На вкладке Тестирование в боковом меню выберите шаблон HTTPS-вызов и замените раздел queryStringParameters
:
"queryStringParameters": {
"a": "2",
"b": "1",
},
на аналогичный, но с параметром
url
с любым сайтом. Важно указывать ссылку целиком.
"queryStringParameters": {
"url": "https://ya.ru/"
},
Нажмите кнопку Запустить тест.

Если вы всё сделали правильно, то увидите код статуса
200
. При этом в очереди увеличится количество сообщений.
Шаг 4. Обновление спецификации API Gateway
Функция готова, но по умолчанию она не является публичной. Предоставим доступ к ней с помощью API-шлюза. Для этого необходимо обновить ранее созданную спецификацию
hello-world.yaml
. Если у вас нет её под рукой, выгрузите её из облака:
yc serverless api-gateway get-spec \
--name hello-world >> hello-world-new.yaml
Внесите изменения, добавив секцию о ранее созданной функции:
/check:
get:
x-yc-apigateway-integration:
type: cloud-functions
function_id: <идентификатор функции>
service_account_id: <идентификатор сервисного аккаунта>
operationId: add-url
Обновите конфигурацию:
yc serverless api-gateway update \
--name hello-world \
--spec=hello-world-new.yaml
Для тестирования выполните вызов функции в браузере:
https://<идентификатор API Gateway>.apigw.yandexcloud.net/check?url=https://ya.ru/
После каждого запроса количество сообщений в очереди будет увеличиваться на одно.

Шаг 5. Создание функции для чтения из очереди
В предыдущих работах мы создавали функцию, использующую подключение к БД. Здесь мы повторим этот опыт.
Проверим, что нам доступны переменные для инициации подключения:
CONNECTION_ID
, DB_USER
, DB_HOST
. Мы создавали их в предыдущей работе с помощью следующих команд:
echo "export CONNECTION_ID=<CONNECTION_ID>" >> ~/.bashrc && . ~/.bashrc
echo "export DB_USER=<DB_USER>" >> ~/.bashrc && . ~/.bashrc
echo "export DB_HOST=<DB_HOST>" >> ~/.bashrc && . ~/.bashrc
Также для работы с очередью нам потребуются переменные
VERBOSE_LOG
, AWS_ACCESS_KEY_ID
, AWS_SECRET_ACCESS_KEY
и QUEUE_URL
, заданные на предыдущих шагах.Создадим функцию
function-for-url-from-mq.py
и воспользуемся командой pipreqs $PWD --force
, чтобы сформировать для нее файл requirements.txt
.
import logging
import os
import boto3
import datetime
import requests
#Эти библиотеки нужны для работы с PostgreSQL
import psycopg2
import psycopg2.errors
import psycopg2.extras
CONNECTION_ID = os.getenv("CONNECTION_ID")
DB_USER = os.getenv("DB_USER")
DB_HOST = os.getenv("DB_HOST")
QUEUE_URL = os.environ['QUEUE_URL']
# Настраиваем функцию для записи информации в журнал функции
# Получаем стандартный логер языка Python
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Вычитываем переменную VERBOSE_LOG, которую мы указываем в переменных окружения
verboseLogging = eval(os.environ['VERBOSE_LOG']) ## Convert to bool
#Функция log, которая запишет текст в журнал выполнения функции, если в переменной окружения VERBOSE_LOG будет значение True
def log(logString):
if verboseLogging:
logger.info(logString)
#Получаем подключение
def getConnString(context):
"""
Extract env variables to connect to DB and return a db string
Raise an error if the env variables are not set
:return: string
"""
connection = psycopg2.connect(
database=CONNECTION_ID, # Идентификатор подключения
user=DB_USER, # Пользователь БД
password=context.token["access_token"],
host=DB_HOST, # Точка входа
port=6432,
sslmode="require")
return connection
"""
Create SQL query with table creation
"""
def makeCreateDataTableQuery(table_name):
query = f"""CREATE TABLE public.{table_name} (
url text,
result integer,
time float
)"""
return query
def makeInsertDataQuery(table_name, url, result, time):
query = f"""INSERT INTO {table_name}
(url, result,time)
VALUES('{url}', {result}, {time})
"""
return query
def handler(event, context):
# Create client
client = boto3.client(
service_name='sqs',
endpoint_url='https://message-queue.api.cloud.yandex.net',
region_name='ru-central1'
)
# Receive sent message
messages = client.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=1,
VisibilityTimeout=60,
WaitTimeSeconds=1
).get('Messages')
if messages is None:
return {
'statusCode': 200
}
for msg in messages:
log('Received message: "{}"'.format(msg.get('Body')))
# Get url from message
url = msg.get('Body');
# Check url
try:
now = datetime.datetime.now()
response = requests.get(url, timeout=(1.0000, 3.0000))
timediff = datetime.datetime.now() - now
result = response.status_code
except requests.exceptions.ReadTimeout:
result = 601
except requests.exceptions.ConnectTimeout:
result = 602
except requests.exceptions.Timeout:
result = 603
log(f'Result: {result} Time: {timediff.total_seconds()}')
connection = getConnString(context)
log(f'Connecting: {connection}')
cursor = connection.cursor()
table_name = 'custom_request_result'
sql = makeInsertDataQuery(table_name, url, result, timediff.total_seconds())
log(f'Exec: {sql}')
try:
cursor.execute(sql)
except psycopg2.errors.UndefinedTable as error:
log(f'Table not exist - create and repeate insert')
connection.rollback()
logger.error(error)
createTable = makeCreateDataTableQuery(table_name)
log(f'Exec: {createTable}')
cursor.execute(createTable)
connection.commit()
log(f'Exec: {sql}')
cursor.execute(sql)
except Exception as error:
logger.error( error)
connection.commit()
cursor.close()
connection.close()
# Delete processed messages
for msg in messages:
client.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=msg.get('ReceiptHandle')
)
print('Successfully deleted message by receipt handle "{}"'.format(msg.get('ReceiptHandle')))
statusCode = 200
return {
'statusCode': statusCode
}
При создании сразу задайте все необходимые переменные и сервисный аккаунт:
zip function-for-url-from-mq function-for-url-from-mq.py requirements.txt
yc serverless function create \
--name function-for-url-from-mq \
--description "function for url from mq"
yc serverless function version create \
--function-name=function-for-url-from-mq \
--memory=256m \
--execution-timeout=5s \
--runtime=python312 \
--entrypoint=function-for-url-from-mq.handler \
--service-account-id $SERVICE_ACCOUNT_ID \
--environment VERBOSE_LOG=True \
--environment CONNECTION_ID=$CONNECTION_ID \
--environment DB_USER=$DB_USER \
--environment DB_HOST=$DB_HOST \
--environment AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
--environment AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
--environment QUEUE_URL=$QUEUE_URL \
--source-path function-for-url-from-mq.zip
Протестируйте функцию.

После её выполнения количество сообщений в очереди уменьшится, а в базе данных появится новая таблица с результатами тестирования доступности функции.


Шаг 6. Создание триггера
Создадим триггер, который будет вызывать функцию обработки сообщений из очереди один раз в минуту. Он будет использовать cron-выражение:
yc serverless trigger create timer \
--name trigger-for-mq \
--invoke-function-name function-for-url-from-mq \
--invoke-function-service-account-id $SERVICE_ACCOUNT_ID \
--cron-expression '* * * * ? *'
Cron-выражение
* * * * ? *
означает вызов функции function-for-url-from-mq
один раз в минуту. Подробнее про cron-выражения можно прочитать в документации.
Теперь у нас есть функция, которая раз в минуту будет пробовать взять из очереди URL и проверить его. Также есть метод REST API, который позволяет записывать URL в очередь независимо от работы обработчика. Мы можем вызывать созданный метод как угодно часто. Очередь будет просто накапливаться, а затем обработчик будет постепенно её разбирать.
В итоге вы получили асинхронную систему проверки доступности URL с доступом по REST API. Вы не создали ни одной виртуальной машины, но решили вопросы масштабирования и отказоустойчивости системы.
Удаление триггера-таймера
По завершении практической работы не забудьте удалить созданный вами триггер
trigger-for-mq
, иначе он будет работать, пока не исчерпает деньги на аккаунте:
yc serverless trigger delete trigger-for-mq
Не забудьте удалить или остановить все созданные вами ресурсы: триггеры, очереди YMQ и кластер базы данных.
Следующий практический урок завершает тему. Вы попробуете создать онлайн-сервис, конвертирующий произвольные видеофайлы в GIF-анимацию. Для этого вы объедините в одно решение сервисы Yandex Cloud Functions, Yandex Message Queue, YDB и Yandex Object Storage. А заодно закрепите использование консольных инструментов
yc
и aws
.