ПР. Проверка доступности веб-ресурсов

Кратко:

  • Создание системы проверки доступности веб-ресурсов с использованием Yandex Cloud Functions, API Gateway и Yandex Message Queue.
  • Добавление возможности ставить задачи по проверке доступности других веб-ресурсов.
  • Использование библиотеки boto3 для работы с YMQ.
  • Создание очереди Yandex Message Queue и функции для проверки доступности URL.
  • Обновление спецификации API Gateway для предоставления доступа к функции.
  • Создание функции для чтения из очереди и проверка ее работы.
  • Создание триггера для вызова функции обработки сообщений из очереди один раз в минуту.
  • Удаление триггера-таймера после завершения практической работы.
  • Не забудьте удалить или остановить все созданные вами ресурсы.

Практическая работа. Проверка доступности веб-ресурсов

В этом уроке вы доработаете систему проверки доступности веб-ресурсов, которую создали на предыдущих практических занятиях. В текущем варианте она проверяет только доступность сайта ya.ru. Теперь давайте добавим в неё возможность ставить задачи по проверке доступности других веб-ресурсов.

Общая архитектура системы

У системы есть два метода:
  1. CheckUrl — ставит задачу на проверку указанного URL.
  2. GetResult — считывает результаты проверки.
Метод CheckUrl обрабатывается функцией, которая будет складывать все запросы в очередь. Функция-обработчик будет вызываться раз в секунду, считывать URL из очереди,  проверять его доступность и записывать результат в базу данных. Оттуда этот результат можно будет получить с помощью метода GetResult.
image
Мы не будем менять уже созданные функции и таблицу в PostgreSQL, сделаем новые.
Работать с YMQ из функций мы будем с помощью библиотеки boto3. Чтобы её использовать, нужно создать сервисный аккаунт с секретным ключом доступа, а затем настроить зависимости функции. Сделаем это после того, как создадим очередь.

Шаг 1. Проверить наличие сервисного аккаунта

Если вы ранее создавали сервисный аккаунт с именем service-account-for-cf, добавляли вновь созданному сервисному аккаунту роли editor и другие, то вам остаётся только создать ключ доступа:
yc iam access-key create --service-account-name service-account-for-cf
В результате вы получите примерно следующее:
    access_key:
        id: ajefraollq5puj2tir1o
        service_account_id: ajetdv28pl0a1a8r41f0
        created_at: "2021-08-23T21:13:05.677319393Z"
        key_id: BTPNvWthv0ZX2xVmlPIU
    secret: cWLQ0HrTM0k_qAac43cwMNJA8VV_rfTg_kd4xVPi 
Здесь key_id — это идентификатор ключа доступа ACCESS_KEY. А secret — это секретный ключ SECRET_KEY. Переменные ACCESS_KEY и SECRET_KEY могут быть использованы для задания соответствующих значений aws_access_key_id и aws_secret_access_key при использовании библиотеки boto3.

Шаг 2. Создание очереди Yandex Message Queue

Вы можете создать очередь одним из трёх способов:
  • через консоль управления;
  • с помощью консольной утилиты aws;
  • с помощью Terraform.
В этом уроке мы будем использовать консоль управления. Откройте раздел Message Queue и нажмите кнопку Создать очередь.
image
В настройках создаваемой очереди задайте имя очереди my-first-queue, затем выберите тип очереди Стандартная и нажмите кнопку Создать.
image
Очередь создана.
image
Теперь зайдите в настройки очереди, чтобы посмотреть параметры подключения к ней. Нам потребуется значение URL.
image

Шаг 3. Создание функции

Для создания функции зададим ряд переменных:
  • VERBOSE_LOG — определяет, пишет ли функция подробности своего выполнения в журнал.
  • AWS_ACCESS_KEY_ID — значение «Идентификатор ключа» из сервисного аккаунта, который мы сделали ранее.
  • AWS_SECRET_ACCESS_KEY — значение «Секретный ключ» из того же сервисного аккаунта.
  • QUEUE_URL — URL на очередь, его можно получить на обзорной странице созданной ранее очереди.
Чтобы задать переменные, выполните в консоли следующие команды:
echo "export VERBOSE_LOG=True" >> ~/.bashrc && . ~/.bashrc
echo "export AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>" >> ~/.bashrc && . ~/.bashrc
echo "export AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>" >> ~/.bashrc && . ~/.bashrc
echo "export QUEUE_URL=<QUEUE_URL>" >> ~/.bashrc && . ~/.bashrc
Создайте файл my-url-receiver-function.py со следующим содержанием:
import logging
import os
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

verboseLogging = eval(os.environ['VERBOSE_LOG'])  ## Convert to bool
queue_url = os.environ['QUEUE_URL']

def log(logString):
    if verboseLogging:
        logger.info(logString)

def handler(event, context):

    # Get url
    try:
        url = event['queryStringParameters']['url']
    except Exception as error:
        logger.error(error)
        statusCode = 400
        return {
            'statusCode': statusCode
        }

    # Create client
    client = boto3.client(
        service_name='sqs',
        endpoint_url='https://message-queue.api.cloud.yandex.net',
        region_name='ru-central1'
    )

    # Send message to queue
    client.send_message(
        QueueUrl=queue_url,
        MessageBody=url
    )
    log('Successfully sent test message to queue')

    statusCode = 200

    return {
        'statusCode': statusCode
    }
Затем воспользуйтесь командой pipreqs $PWD --force для формирования файла requirements.txt и упакуйте файлы с функцией и требованиями в ZIP-архив.
zip my-url-receiver-function my-url-receiver-function.py requirements.txt
Создайте функцию и её версию:
yc serverless function create \
  --name  my-url-receiver-function \
  --description "function for url"

yc serverless function version create \
  --function-name=my-url-receiver-function \
  --memory=256m \
  --execution-timeout=5s \
  --runtime=python312 \
  --entrypoint=my-url-receiver-function.handler \
  --service-account-id $SERVICE_ACCOUNT_ID \
  --environment VERBOSE_LOG=$VERBOSE_LOG \
  --environment AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
  --environment AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
  --environment QUEUE_URL=$QUEUE_URL \
  --source-path my-url-receiver-function.zip

Тестирование функции

Перейдите в раздел Cloud Functions консоли управления облаком и выберите созданную функцию my-url-receiver-function. На вкладке Тестирование в боковом меню выберите шаблон HTTPS-вызов и замените раздел queryStringParameters:
    "queryStringParameters": {
        "a": "2",
        "b": "1",
    }, 
на аналогичный, но с параметром url с любым сайтом. Важно указывать ссылку целиком.
    "queryStringParameters": {
        "url": "https://ya.ru/"
    }, 
Нажмите кнопку Запустить тест.
image
Если вы всё сделали правильно, то увидите код статуса 200. При этом в очереди увеличится количество сообщений.
image

Шаг 4. Обновление спецификации API Gateway

Функция готова, но по умолчанию она не является публичной. Предоставим доступ к ней с помощью API-шлюза. Для этого необходимо обновить ранее созданную спецификацию hello-world.yaml. Если у вас нет её под рукой, выгрузите её из облака:
yc serverless api-gateway get-spec \
  --name hello-world >> hello-world-new.yaml
Внесите изменения, добавив секцию о ранее созданной функции:
    /check:
        get:
            x-yc-apigateway-integration:
                type: cloud-functions
                function_id: <идентификатор функции>
                service_account_id: <идентификатор сервисного аккаунта>
            operationId: add-url 
Обновите конфигурацию:
yc serverless api-gateway update \
  --name hello-world \
  --spec=hello-world-new.yaml
Для тестирования выполните вызов функции в браузере:
https://<идентификатор API Gateway>.apigw.yandexcloud.net/check?url=https://ya.ru/
После каждого запроса количество сообщений в очереди будет увеличиваться на одно.
image

Шаг 5. Создание функции для чтения из очереди

В предыдущих работах мы создавали функцию, использующую подключение к БД. Здесь мы повторим этот опыт.
Проверим, что нам доступны переменные для инициации подключения: CONNECTION_ID, DB_USER, DB_HOST. Мы создавали их в предыдущей работе с помощью следующих команд:
echo "export CONNECTION_ID=<CONNECTION_ID>" >> ~/.bashrc && . ~/.bashrc
echo "export DB_USER=<DB_USER>" >> ~/.bashrc && . ~/.bashrc
echo "export DB_HOST=<DB_HOST>" >> ~/.bashrc && . ~/.bashrc
Также для работы с очередью нам потребуются переменные VERBOSE_LOG, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY и QUEUE_URL, заданные на предыдущих шагах.
Создадим функцию function-for-url-from-mq.py и воспользуемся командой pipreqs $PWD --force, чтобы сформировать для нее файл requirements.txt.
import logging
import os
import boto3
import datetime
import requests

#Эти библиотеки нужны для работы с PostgreSQL
import psycopg2
import psycopg2.errors
import psycopg2.extras

CONNECTION_ID = os.getenv("CONNECTION_ID")
DB_USER = os.getenv("DB_USER")
DB_HOST = os.getenv("DB_HOST")
QUEUE_URL = os.environ['QUEUE_URL']

# Настраиваем функцию для записи информации в журнал функции
# Получаем стандартный логер языка Python
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Вычитываем переменную VERBOSE_LOG, которую мы указываем в переменных окружения 
verboseLogging = eval(os.environ['VERBOSE_LOG'])  ## Convert to bool

#Функция log, которая запишет текст в журнал выполнения функции, если в переменной окружения VERBOSE_LOG будет значение True
def log(logString):
    if verboseLogging:
        logger.info(logString)

#Получаем подключение
def getConnString(context):
    """
    Extract env variables to connect to DB and return a db string
    Raise an error if the env variables are not set
    :return: string
    """
    connection = psycopg2.connect(
        database=CONNECTION_ID, # Идентификатор подключения
        user=DB_USER, # Пользователь БД
        password=context.token["access_token"],
        host=DB_HOST, # Точка входа
        port=6432,
        sslmode="require")
    return connection

"""
    Create SQL query with table creation
"""
def makeCreateDataTableQuery(table_name):
    query = f"""CREATE TABLE public.{table_name} (
    url text,
    result integer,
    time float
    )"""
    return query

def makeInsertDataQuery(table_name, url, result, time):
    query = f"""INSERT INTO {table_name} 
    (url, result,time)
    VALUES('{url}', {result}, {time})
    """
    return query

def handler(event, context):

    # Create client
    client = boto3.client(
        service_name='sqs',
        endpoint_url='https://message-queue.api.cloud.yandex.net',
        region_name='ru-central1'
    )

    # Receive sent message
    messages = client.receive_message(
        QueueUrl=QUEUE_URL,
        MaxNumberOfMessages=1,
        VisibilityTimeout=60,
        WaitTimeSeconds=1
    ).get('Messages')

    if messages is None:
        return {
            'statusCode': 200
        }

    for msg in messages:
        log('Received message: "{}"'.format(msg.get('Body')))

    # Get url from message
    url = msg.get('Body');

    # Check url
    try:
        now = datetime.datetime.now()
        response = requests.get(url, timeout=(1.0000, 3.0000))
        timediff = datetime.datetime.now() - now
        result = response.status_code
    except requests.exceptions.ReadTimeout:
        result = 601
    except requests.exceptions.ConnectTimeout:
        result = 602
    except requests.exceptions.Timeout:
        result = 603
    log(f'Result: {result} Time: {timediff.total_seconds()}')
    
    connection = getConnString(context)
    log(f'Connecting: {connection}')    
    cursor = connection.cursor()

    table_name = 'custom_request_result'
    sql = makeInsertDataQuery(table_name, url, result, timediff.total_seconds())

    log(f'Exec: {sql}')
    try:
        cursor.execute(sql)
    except psycopg2.errors.UndefinedTable as error:
        log(f'Table not exist - create and repeate insert')
        connection.rollback()
        logger.error(error)
        createTable = makeCreateDataTableQuery(table_name)
        log(f'Exec: {createTable}')
        cursor.execute(createTable)
        connection.commit()
        log(f'Exec: {sql}')
        cursor.execute(sql)
    except Exception as error:
        logger.error( error)

    connection.commit()
    cursor.close()
    connection.close()

    # Delete processed messages
    for msg in messages:
        client.delete_message(
            QueueUrl=QUEUE_URL,
            ReceiptHandle=msg.get('ReceiptHandle')
        )
        print('Successfully deleted message by receipt handle "{}"'.format(msg.get('ReceiptHandle')))

    statusCode = 200

    return {
        'statusCode': statusCode
    }
При создании сразу задайте все необходимые переменные и сервисный аккаунт:
zip function-for-url-from-mq function-for-url-from-mq.py requirements.txt

yc serverless function create \
  --name function-for-url-from-mq \
  --description "function for url from mq"

yc serverless function version create \
  --function-name=function-for-url-from-mq \
  --memory=256m \
  --execution-timeout=5s \
  --runtime=python312 \
  --entrypoint=function-for-url-from-mq.handler \
  --service-account-id $SERVICE_ACCOUNT_ID \
  --environment VERBOSE_LOG=True \
  --environment CONNECTION_ID=$CONNECTION_ID \
  --environment DB_USER=$DB_USER \
  --environment DB_HOST=$DB_HOST \
  --environment AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
  --environment AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
  --environment QUEUE_URL=$QUEUE_URL \
  --source-path function-for-url-from-mq.zip
Протестируйте функцию.
image
После её выполнения количество сообщений в очереди уменьшится, а в базе данных появится новая таблица с результатами тестирования доступности функции.
image
image

Шаг 6. Создание триггера

Создадим триггер, который будет вызывать функцию обработки сообщений из очереди один раз в минуту. Он будет использовать cron-выражение:
yc serverless trigger create timer \
  --name trigger-for-mq \
  --invoke-function-name function-for-url-from-mq \
  --invoke-function-service-account-id $SERVICE_ACCOUNT_ID \
  --cron-expression '* * * * ? *'
Cron-выражение * * * * ? * означает вызов функции function-for-url-from-mq один раз в минуту. Подробнее про cron-выражения можно прочитать в документации.
image
Теперь у нас есть функция, которая раз в минуту будет пробовать взять из очереди URL и проверить его. Также есть метод REST API, который позволяет записывать URL в очередь независимо от работы обработчика. Мы можем вызывать созданный метод как угодно часто. Очередь будет просто накапливаться, а затем обработчик будет постепенно её разбирать.
В итоге вы получили асинхронную систему проверки доступности URL с доступом по REST API. Вы не создали ни одной виртуальной машины, но решили вопросы масштабирования и отказоустойчивости системы.

Удаление триггера-таймера

По завершении практической работы не забудьте удалить созданный вами триггер trigger-for-mq, иначе он будет работать, пока не исчерпает деньги на аккаунте:
yc serverless trigger delete trigger-for-mq
Не забудьте удалить или остановить все созданные вами ресурсы: триггеры, очереди YMQ и кластер базы данных.
Следующий практический урок завершает тему. Вы попробуете создать онлайн-сервис, конвертирующий произвольные видеофайлы в GIF-анимацию. Для этого вы объедините в одно решение сервисы Yandex Cloud Functions, Yandex Message Queue, YDB и Yandex Object Storage. А заодно закрепите использование консольных инструментов yc и aws.