StreamSets HTTP-клиент

Я работаю с StreamSets на дистрибутиве Cloudera, пытаюсь получить некоторые данные с этого сайта http://files.data.gouv.fr/sirene/

Я столкнулся с некоторыми проблемами при выборе параметров как HTTP-клиента, так и Hadoop FS Destination.

https://image.noelshack.com/fichiers/2017/44/2/1509457504-streamsets-f.jpg

Я получаю эту ошибку: HTTP_00 - Невозможно проанализировать запись: java.io.IOException: org.apache.commons.compress.archivers.ArchiveException: не найден архиватор для подписи потока

Я покажу вам мою конфигурацию.

HTTP-клиент:

генеральный

Имя: HTTP-клиент INSEE

Описание: Клиент HTTP SIRENE

Ошибка записи: отправить в ошибку

HTTP

URL ресурса: http://files.data.gouv.fr/sirene/

Заголовки: sirene_: sirene_

Режим: потоковое

Действия по статусу

Код статистики HTTP: 500 | Действие для статуса: Повторить с экспоненциальной отсрочкой |

Базовый интервал отката (мс): 1000 | Макс. Повторов: 10

HTTP метод: GET

Часовой пояс тела: UTC (UTC)

Кодирование запроса Transfert: BUFFERED

Сжатие HTTP: нет

Время ожидания подключения: 0

Тайм-аут чтения: 0

Тип аутентификации: Нет

Используйте OAuth 2

Используй прокси

Максимальный размер партии (записей): 1000

Время пакетного ожидания (мс): 2000

пагинация

Режим пагинации: нет

TLS

UseTLS

Обработка тайм-аута

Действие для тайм-аута: повторите попытку

Макс. Повторов: 10

Формат данных

Формат даты: с разделителями

Формат сжатия: архив

Шаблон имени файла в сжатом каталоге: *.csv

Тип формата разделителя: Пользовательский

Заголовок: с заголовком

Максимальная длина записи (символов): 1024

Разрешить дополнительные столбцы

Разделитель символов: точка с запятой

Побег Персонаж: Другое \

Цитата Характер: Другое "

Тип корневого поля: List-Map

Строки для пропуска: 0

Разбор NULL

Чарсет: UTF-8

Игнорировать контрольные символы

Hadoop FS Назначение:

генеральный

Название: Hadoop FS 1

Описание: Запись в HDFS

Библиотека этапов: CDH 5.7.6

Производить события

Обязательные поля

Предпосылками

Ошибка записи: отправить в ошибку

Выходные файлы

Тип файла: весь файл

Префикс файлов

Директория в заголовке

Шаблон каталога: / user / pap / StreamSets / sirene /

Часовой пояс данных: UTC (UTC)

Базис времени: ${time:now()}

Использовать атрибут Roll

Проверьте разрешения HDFS: ON

Пропустить восстановление файла: ON

Поздние записи

Предел времени поздней записи (сек): ${1 * HOURS}

Поздняя обработка записи: отправить на ошибку

Формат данных

Формат данных: весь файл

Выражение имени файла: ${record:value('/fileInfo/filename')}

Выражение разрешений: 777

Файл существует: перезаписать

Включить контрольную сумму в события

... так что я делаю не так?:(

2 ответа

Похоже, что http://files.data.gouv.fr/sirene/ возвращает список файлов, а не сжатый архив. Это сложный вопрос, так как не существует стандартного способа перебора такого списка. Возможно, вы сможете прочитать http://files.data.gouv.fr/sirene/ как текст, а затем использовать оценщик Jython для анализа URL-адресов zip-файлов, извлечения, распаковки и синтаксического анализа, добавляя проанализированные записи в пакет, Я думаю, что у вас будут проблемы с этим методом, так как все записи окажутся в одной и той же партии, что приведет к потере памяти.

Другая идея может заключаться в том, чтобы использовать два конвейера - первый будет использовать клиентское происхождение HTTP и оценщик сценариев для загрузки сжатых файлов и записи их в локальный каталог. Затем второй конвейер будет считывать в заархивированном CSV через источник Справочника как обычно.

Если вы решили пойти, пожалуйста, пообщайтесь с сообществом StreamSets по одному из наших каналов - см. https://streamsets.com/community

Я пишу оценщик Jython. Я не знаком с доступными константами / объектами / записями, представленными в комментариях. Я попытался адаптировать этот скрипт на языке Python в оценщик Jython:

import re
import itertools
import urllib2
data = [re.findall(r'(sirene\w+.zip)', line) for line in open('/home/user/Desktop/filesdatatest.txt')]
data_list = filter(None, data)
data_brackets = list(itertools.chain(*data_list))
data_clean = ["http://files.data.gouv.fr/sirene/" + url for url in data_brackets]
for url in data_clean:
    urllib2.urlopen(url)

records = [re.findall (r '(sirene \ w +.zip)', record) для записи в записях] выдало мне это сообщение об ошибке SCRIPTING_05 - Ошибка сценария при обработке записи: javax.script.ScriptException: TypeError: ожидаемая строка или буфер, но получил в строке номер 50

filesdatatest.txt содержит такие вещи, как:

Listing of /v1/AUTH_6032cb4c2159474684c8df1da2e2b642/storage/sirene/  
Name    Size    Date  
../            
README.txt  2Ki     2017-10-11 03:31:57  
sirene_201612_L_M.zip   1Gi     2017-01-05 00:12:08  
sirene_2017002_E_Q.zip  444Ki   2017-01-05 00:44:58  
sirene_2017003_E_Q.zip  6Mi     2017-01-05 00:45:01  
sirene_2017004_E_Q.zip  2Mi     2017-01-05 03:37:42  
sirene_2017005_E_Q.zip  2Mi     2017-01-06 03:40:47  
sirene_2017006_E_Q.zip  2Mi     2017-01-07 05:04:04  

так что я умею разбирать записи.

Другие вопросы по тегам