StreamSets HTTP-клиент
Я работаю с StreamSets на дистрибутиве Cloudera, пытаюсь получить некоторые данные с этого сайта http://files.data.gouv.fr/sirene/
Я столкнулся с некоторыми проблемами при выборе параметров как HTTP-клиента, так и Hadoop FS Destination.
https://image.noelshack.com/fichiers/2017/44/2/1509457504-streamsets-f.jpg
Я получаю эту ошибку: HTTP_00 - Невозможно проанализировать запись: java.io.IOException: org.apache.commons.compress.archivers.ArchiveException: не найден архиватор для подписи потока
Я покажу вам мою конфигурацию.
HTTP-клиент:
генеральный
Имя: HTTP-клиент INSEE
Описание: Клиент HTTP SIRENE
Ошибка записи: отправить в ошибку
HTTP
URL ресурса: http://files.data.gouv.fr/sirene/
Заголовки: sirene_: sirene_
Режим: потоковое
Действия по статусу
Код статистики HTTP: 500 | Действие для статуса: Повторить с экспоненциальной отсрочкой |
Базовый интервал отката (мс): 1000 | Макс. Повторов: 10
HTTP метод: GET
Часовой пояс тела: UTC (UTC)
Кодирование запроса Transfert: BUFFERED
Сжатие HTTP: нет
Время ожидания подключения: 0
Тайм-аут чтения: 0
Тип аутентификации: Нет
Используйте OAuth 2
Используй прокси
Максимальный размер партии (записей): 1000
Время пакетного ожидания (мс): 2000
пагинация
Режим пагинации: нет
TLS
UseTLS
Обработка тайм-аута
Действие для тайм-аута: повторите попытку
Макс. Повторов: 10
Формат данных
Формат даты: с разделителями
Формат сжатия: архив
Шаблон имени файла в сжатом каталоге: *.csv
Тип формата разделителя: Пользовательский
Заголовок: с заголовком
Максимальная длина записи (символов): 1024
Разрешить дополнительные столбцы
Разделитель символов: точка с запятой
Побег Персонаж: Другое \
Цитата Характер: Другое "
Тип корневого поля: List-Map
Строки для пропуска: 0
Разбор NULL
Чарсет: UTF-8
Игнорировать контрольные символы
Hadoop FS Назначение:
генеральный
Название: Hadoop FS 1
Описание: Запись в HDFS
Библиотека этапов: CDH 5.7.6
Производить события
Обязательные поля
Предпосылками
Ошибка записи: отправить в ошибку
Выходные файлы
Тип файла: весь файл
Префикс файлов
Директория в заголовке
Шаблон каталога: / user / pap / StreamSets / sirene /
Часовой пояс данных: UTC (UTC)
Базис времени: ${time:now()}
Использовать атрибут Roll
Проверьте разрешения HDFS: ON
Пропустить восстановление файла: ON
Поздние записи
Предел времени поздней записи (сек): ${1 * HOURS}
Поздняя обработка записи: отправить на ошибку
Формат данных
Формат данных: весь файл
Выражение имени файла: ${record:value('/fileInfo/filename')}
Выражение разрешений: 777
Файл существует: перезаписать
Включить контрольную сумму в события
... так что я делаю не так?:(
2 ответа
Похоже, что http://files.data.gouv.fr/sirene/ возвращает список файлов, а не сжатый архив. Это сложный вопрос, так как не существует стандартного способа перебора такого списка. Возможно, вы сможете прочитать http://files.data.gouv.fr/sirene/ как текст, а затем использовать оценщик Jython для анализа URL-адресов zip-файлов, извлечения, распаковки и синтаксического анализа, добавляя проанализированные записи в пакет, Я думаю, что у вас будут проблемы с этим методом, так как все записи окажутся в одной и той же партии, что приведет к потере памяти.
Другая идея может заключаться в том, чтобы использовать два конвейера - первый будет использовать клиентское происхождение HTTP и оценщик сценариев для загрузки сжатых файлов и записи их в локальный каталог. Затем второй конвейер будет считывать в заархивированном CSV через источник Справочника как обычно.
Если вы решили пойти, пожалуйста, пообщайтесь с сообществом StreamSets по одному из наших каналов - см. https://streamsets.com/community
Я пишу оценщик Jython. Я не знаком с доступными константами / объектами / записями, представленными в комментариях. Я попытался адаптировать этот скрипт на языке Python в оценщик Jython:
import re
import itertools
import urllib2
data = [re.findall(r'(sirene\w+.zip)', line) for line in open('/home/user/Desktop/filesdatatest.txt')]
data_list = filter(None, data)
data_brackets = list(itertools.chain(*data_list))
data_clean = ["http://files.data.gouv.fr/sirene/" + url for url in data_brackets]
for url in data_clean:
urllib2.urlopen(url)
records = [re.findall (r '(sirene \ w +.zip)', record) для записи в записях] выдало мне это сообщение об ошибке SCRIPTING_05 - Ошибка сценария при обработке записи: javax.script.ScriptException: TypeError: ожидаемая строка или буфер, но получил в строке номер 50
filesdatatest.txt содержит такие вещи, как:
Listing of /v1/AUTH_6032cb4c2159474684c8df1da2e2b642/storage/sirene/
Name Size Date
../
README.txt 2Ki 2017-10-11 03:31:57
sirene_201612_L_M.zip 1Gi 2017-01-05 00:12:08
sirene_2017002_E_Q.zip 444Ki 2017-01-05 00:44:58
sirene_2017003_E_Q.zip 6Mi 2017-01-05 00:45:01
sirene_2017004_E_Q.zip 2Mi 2017-01-05 03:37:42
sirene_2017005_E_Q.zip 2Mi 2017-01-06 03:40:47
sirene_2017006_E_Q.zip 2Mi 2017-01-07 05:04:04
так что я умею разбирать записи.