Добавление собственных заголовков ко всем запросам boto3

Мне нужно добавить несколько настраиваемых заголовков к каждому отправляемому запросу boto3. Есть ли способ управлять самим подключением для добавления этих заголовков?

Для boto2 в https://kite.com/python/docs/boto.connection.AWSAuthConnection есть метод build_base_http_request, который оказался полезным. Однако мне еще не удалось найти аналогичную функцию в документации по boto3.

3 ответа

Это довольно устарело, но мы столкнулись с той же проблемой, поэтому я публикую наше решение.

Я хотел добавить пользовательские заголовки в boto3 для конкретных запросов. Я нашел это: https://github.com/boto/boto3/issues/2251 и использовал систему событий для добавления заголовка

      def _add_header(request, **kwargs):
    request.headers.add_header('x-trace-id', 'trace-trace')
    print(request.headers)  # for debug


some_client = boto3.client(service_name=SERVICE_NAME)
event_system = some_client.meta.events
event_system.register_first('before-sign.EVENT_NAME.*', _add_header)

Вы можете попробовать использовать подстановочный знак для всех запросов:

      event_system.register_first('before-sign.*.*', _add_header)

*SERVICE_NAME- вы можете найти все доступные услуги здесь: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

Для получения дополнительной информации о регистрации функции на определенное событие: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/events.html

Ответ от @May Yaari отлично подходит для добавления постоянного заголовка к каждому отправляемому вами запросу. Однако иногда вам нужно отправлять разные заголовки с каждым отправляемым запросом, например, чтобы использовать условные размещения Cloudflare R2. Для таких случаев использования вы можете сделать следующее:

      event_system = client.meta.events

# Moves the custom headers from the parameters to the request context
# This is done because later in the processing of the request, there is
# a parameter validation step, which doesn't allow for custom arguments.
def process_custom_arguments(params, context, **kwargs):
    if (custom_headers := params.pop("custom_headers", None)):
        context["custom_headers"] = custom_headers

# Here we extract the headers from the request context and actually set them
def add_custom_headers(params, context, **kwargs):
    if (custom_headers := context.get("custom_headers")):
        params["headers"].update(custom_headers)

event_system.register('before-parameter-build.s3.PutObject', process_custom_arguments)
event_system.register('before-call.s3.PutObject', add_custom_headers)

custom_headers = {'If-Match' : '"3858f62230ac3c9ff15f300c664312c63f"'}

client.put_object(Bucket="my_bucket", Key="my_key", Body="my_data", custom_headers=custom_headers)

При этом в системе событий регистрируются два обработчика: один для перемещения пользовательских заголовков из параметров запроса в контекст запроса, а другой для его установки. Это сделано для того, чтобы обойти проверку параметров запроса, которую выполняет boto3.

Ответ от @May Yaari довольно потрясающий. На вопрос, поднятый @arainchi:

Это работает, нет возможности передать пользовательские данные обработчикам событий, в настоящее время мы должны делать это не на языке Python, используя глобальные переменные/очереди :( Я открыл вопрос с разработчиками Boto3 именно для этого случая

На самом деле, мы могли бы использовать свойство функционального программирования Python: returning a function inside a functionобойти:

В случае, если мы хотим добавить пользовательское значение custom_variableв заголовок, мы могли бы сделать

      some_client = boto3.client(service_name=SERVICE_NAME)
event_system = some_client.meta.events
event_system.register_first('before-sign.EVENT_NAME.*', _register_callback(custom_variable))

def _register_callback(custom_variable):
    def _add_header(request, **kwargs):
        request.headers.add_header('header_name_you_want', custom_variable)
    return _add_header

Или более питонический способ использования lambda

      some_client = boto3.client(service_name=SERVICE_NAME)
event_system = some_client.meta.events
event_system.register_first('before-sign.EVENT_NAME.*', lambda request, **kwargs: _add_header(request, custom_variable))

def _add_header(request, custom_variable):
    request.headers.add_header('header_name_you_want', custom_variable)
Другие вопросы по тегам