Можно ли автоматически добавлять новые строки в записи AWS Firehose?
Я пытаюсь настроить приложение Kinesis Analytics со следующими параметрами:
- Входным потоком является Kinesis Firehose, который принимает строковые значения JSON
- SQL является простым проходом (позже он должен быть более сложным, но для тестирования он просто передает данные)
- Выходной поток - это второй Kinesis Firehose, который доставляет записи в корзину S3
Позже я импортирую содержимое корзины S3, используя Hive + JSONSERDE, который ожидает, что каждая запись JSON будет жить в отдельной строке. Вывод Firehose просто добавляет все записи JSON, которые нарушают JSONSERDE.
Я мог бы присоединить формататор данных AWS Lambda к выходному потоку, но это кажется дорогим. Все, что я хочу, это разделить каждую запись с помощью новой строки.
Если бы я обходился без приложения Analytics, я бы добавил новую строку к каждой записи Firehose. Кажется странным, что нет способа сделать это в приложении SQL:
CREATE OR REPLACE STREAM "STREAM_OUT" (
a VARCHAR(4),
b VARCHAR(4),
c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "STREAM_OUT"
SELECT STREAM
"a",
"b",
"c"
FROM "SOURCE_SQL_STREAM_001";
Это лучший ответ, чтобы добавить форматировщик данных Lambda? Я бы очень хотел этого избежать.
4 ответа
Я отправляю ответ, чтобы держать вопрос в курсе последних объявлений AWS. AWS недавно объявила о доступности динамического разбиения на разделы в потоках Kinesis Firehose Delivery. Он поддерживает добавление символа новой строки для каждой записи. Для получения дополнительной информации см. This и this.
У меня было похожее требование добавить новые строки в файлы, сгенерированные пожарным шлангом. В нашем приложении пожарный шланг вызывается через API Gateway.
Это указано в шаблонах отображения тела в разделе запроса на интеграцию.
Следующая команда в шлюзе API создает новые строки в записях пожарных шлангов Kinesis.
Способ 1:
#set($payload="$input.path('$.Record.Data')
")
{
"DeliveryStreamName": "$input.path('$.DeliveryStreamName')",
"Record": {
"Data": "$util.base64Encode($payload)"
}
}
Это прекрасно работает, если вы вызываете firehose через API Gateway.
Спасибо и С уважением, Шривиньеш К.Н.
Решение с использованием Python или Node.js
Я использую DynamoDB Streams, и мне нужно было сохранить эти записи в S3. Я реализовал поток Kinesis Firehose вместе с функцией Lambda. Это сработало для передачи моих записей в S3 в виде строк JSON, однако каждая запись, которая была сохранена в файле в S3, была встроенной, то есть в одной непрерывной строке, и поэтому мне нужно было добавить новую строку в конце каждой записи. это было добавлено так, чтобы каждая запись находилась в отдельной строке. Для моего решения мне пришлось выполнить декодирование / кодирование base64.
Вот как я это сделал:
- При создании потока Kinesis Firehose включите «Преобразование
исходных записей с помощью AWS Lambda» (выберите «Включено»). Если вы уже создали свой поток, вы все равно можете включить эту функцию, отредактировав существующий поток. - На этом этапе вам нужно будет выбрать другую лямбда-функцию, которая выполняет это преобразование. В моем случае мне нужно было добавить новую строку в конце каждой записи, чтобы при открытии файла в текстовом редакторе и его просмотре каждая запись находилась в отдельной строке.
Ниже приведен проверенный код решения для Python и Node.js, который я использовал для этой второй Lambda:
Решение Python для добавления новой строки:
import json
import boto3
import base64
output = []
def lambda_handler(event, context):
for record in event['records']:
payload = base64.b64decode(record['data']).decode('utf-8')
print('payload:', payload)
row_w_newline = payload + "\n"
print('row_w_newline type:', type(row_w_newline))
row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': row_w_newline
}
output.append(output_record)
print('Processed {} records.'.format(len(event['records'])))
return {'records': output}
Решение Node.js для добавления новой строки:
'use strict';
console.log('Loading function');
exports.handler = (event, context, callback) => {
/* Process the list of records and transform them */
const output = event.records.map((record) => {
let entry = (new Buffer(record.data, 'base64')).toString('utf8');
let result = entry + "\n"
const payload = (new Buffer(result, 'utf8')).toString('base64');
return {
recordId: record.recordId,
result: 'Ok',
data: payload,
};
});
console.log(`Processing completed. Successful records ${output.length}.`);
callback(null, { records: output });
};
Несколько хороших ссылок, которые помогли мне собрать версию Python воедино:
- https://www.youtube.com/watch?v=wRGd2G82Opo&t=242s
- https://www.youtube.com/watch?v=6_03i26_DrQ
В исходном вопросе выше MrHen хотел сделать это без использования второй лямбды. Мне удалось заставить это работать в первой Lambda, вместо использования функции исходных записей преобразования Kinesis Firehose. Я сделал это, взяв newImage из DynamoDB и выполнив его в следующем порядке: кодировать, декодировать, добавлять новую строку ("\n"), кодировать, декодировать. Вероятно, есть более чистый способ. Я решил использовать функцию преобразования исходных записей, используя вторую функцию Lambda, поскольку в настоящее время она кажется мне более чистой.
В моем случае единственное решение Lambda выглядело так:
# Not pretty, but it works! Successfully adds new line to record.
# newImage comes from the DynamoDB Stream as a Python dictionary object,
# I convert it to a string before running the code below.
newImage = base64.b64encode(newImage.encode('utf-8'))
newImage = base64.b64decode(newImage).decode('utf-8')
newImage = newImage + "\n"
newImage = base64.b64encode(newImage.encode('utf-8'))
newImage = base64.b64decode(newImage).decode('utf-8')
Основной пример здесь, в пути, который мы реализовали. Мы использовали javascript для помещения записей в Kinesis Stream и Firehose для перенаправления в папку s3 со сжатием gzip. Позже Афина будет запрашивать местоположение из s3 для получения записей из s3.
Ниже приведен код для добавления новой строки перед отправкой в Kinesis Stream с использованием кода JavaScript.
var payload = JSON.parse(payload);
finalData = JSON.stringify(payload)+"\n";
var kinesisPayload = {};
kinesisPayload.Data = finalData;
kinesisPayload.StreamName = "kinesisStreamName");
kinesisPayload.PartitionKey = "124";