Можно ли автоматически добавлять новые строки в записи AWS Firehose?

Я пытаюсь настроить приложение Kinesis Analytics со следующими параметрами:

  • Входным потоком является Kinesis Firehose, который принимает строковые значения JSON
  • SQL является простым проходом (позже он должен быть более сложным, но для тестирования он просто передает данные)
  • Выходной поток - это второй Kinesis Firehose, который доставляет записи в корзину S3

Позже я импортирую содержимое корзины S3, используя Hive + JSONSERDE, который ожидает, что каждая запись JSON будет жить в отдельной строке. Вывод Firehose просто добавляет все записи JSON, которые нарушают JSONSERDE.

Я мог бы присоединить формататор данных AWS Lambda к выходному потоку, но это кажется дорогим. Все, что я хочу, это разделить каждую запись с помощью новой строки.

Если бы я обходился без приложения Analytics, я бы добавил новую строку к каждой записи Firehose. Кажется странным, что нет способа сделать это в приложении SQL:

CREATE OR REPLACE STREAM "STREAM_OUT" (
  a VARCHAR(4),
  b VARCHAR(4),
  c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  INSERT INTO "STREAM_OUT"
    SELECT STREAM
      "a",
      "b",
      "c"
    FROM "SOURCE_SQL_STREAM_001";

Это лучший ответ, чтобы добавить форматировщик данных Lambda? Я бы очень хотел этого избежать.

4 ответа

Я отправляю ответ, чтобы держать вопрос в курсе последних объявлений AWS. AWS недавно объявила о доступности динамического разбиения на разделы в потоках Kinesis Firehose Delivery. Он поддерживает добавление символа новой строки для каждой записи. Для получения дополнительной информации см. This и this.

У меня было похожее требование добавить новые строки в файлы, сгенерированные пожарным шлангом. В нашем приложении пожарный шланг вызывается через API Gateway.

Это указано в шаблонах отображения тела в разделе запроса на интеграцию.

Следующая команда в шлюзе API создает новые строки в записях пожарных шлангов Kinesis.

Способ 1:

    #set($payload="$input.path('$.Record.Data')
")
        {
            "DeliveryStreamName": "$input.path('$.DeliveryStreamName')",
            "Record": {
            "Data": "$util.base64Encode($payload)"
        }
        }

Это прекрасно работает, если вы вызываете firehose через API Gateway.

Спасибо и С уважением, Шривиньеш К.Н.

Решение с использованием Python или Node.js

Я использую DynamoDB Streams, и мне нужно было сохранить эти записи в S3. Я реализовал поток Kinesis Firehose вместе с функцией Lambda. Это сработало для передачи моих записей в S3 в виде строк JSON, однако каждая запись, которая была сохранена в файле в S3, была встроенной, то есть в одной непрерывной строке, и поэтому мне нужно было добавить новую строку в конце каждой записи. это было добавлено так, чтобы каждая запись находилась в отдельной строке. Для моего решения мне пришлось выполнить декодирование / кодирование base64.

Вот как я это сделал:

  1. При создании потока Kinesis Firehose включите «Преобразование
    исходных записей с помощью AWS Lambda» (выберите «Включено»). Если вы уже создали свой поток, вы все равно можете включить эту функцию, отредактировав существующий поток.
  2. На этом этапе вам нужно будет выбрать другую лямбда-функцию, которая выполняет это преобразование. В моем случае мне нужно было добавить новую строку в конце каждой записи, чтобы при открытии файла в текстовом редакторе и его просмотре каждая запись находилась в отдельной строке.

Ниже приведен проверенный код решения для Python и Node.js, который я использовал для этой второй Lambda:

Решение Python для добавления новой строки:

      import json
import boto3
import base64

output = []

def lambda_handler(event, context):
    
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        print('payload:', payload)
        
        row_w_newline = payload + "\n"
        print('row_w_newline type:', type(row_w_newline))
        row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
        
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': row_w_newline
        }
        output.append(output_record)

    print('Processed {} records.'.format(len(event['records'])))
    
    return {'records': output}

Решение Node.js для добавления новой строки:

      'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {

   
    /* Process the list of records and transform them */
    const output = event.records.map((record) => {
        
        let entry = (new Buffer(record.data, 'base64')).toString('utf8');
        let result = entry + "\n"
        const payload = (new Buffer(result, 'utf8')).toString('base64');
            
            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
            
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};

Несколько хороших ссылок, которые помогли мне собрать версию Python воедино:

В исходном вопросе выше MrHen хотел сделать это без использования второй лямбды. Мне удалось заставить это работать в первой Lambda, вместо использования функции исходных записей преобразования Kinesis Firehose. Я сделал это, взяв newImage из DynamoDB и выполнив его в следующем порядке: кодировать, декодировать, добавлять новую строку ("\n"), кодировать, декодировать. Вероятно, есть более чистый способ. Я решил использовать функцию преобразования исходных записей, используя вторую функцию Lambda, поскольку в настоящее время она кажется мне более чистой.

В моем случае единственное решение Lambda выглядело так:

       # Not pretty, but it works! Successfully adds new line to record.
 # newImage comes from the DynamoDB Stream as a Python dictionary object,
 # I convert it to a string before running the code below.

    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')
    newImage = newImage + "\n"
    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')

Основной пример здесь, в пути, который мы реализовали. Мы использовали javascript для помещения записей в Kinesis Stream и Firehose для перенаправления в папку s3 со сжатием gzip. Позже Афина будет запрашивать местоположение из s3 для получения записей из s3.

Ниже приведен код для добавления новой строки перед отправкой в ​​Kinesis Stream с использованием кода JavaScript.

var payload = JSON.parse(payload);  
finalData = JSON.stringify(payload)+"\n";

var kinesisPayload = {};    
kinesisPayload.Data = finalData;    
kinesisPayload.StreamName = "kinesisStreamName");    
kinesisPayload.PartitionKey = "124";
Другие вопросы по тегам