Использование многопроцессорной обработки с HdfsClient Pyarrows

У меня есть функция верхнего уровня, которая получает кортеж, содержащий путь к файлу паркета и имя столбца.

Функция загружает только столбец из файла, преобразует его в pandas, а затем упаковывает / сериализует его в стандартную форму. Что-то вроде:

import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Pool

def binarizer(file_data_tuple):
   ''' Read a Parquet column a file, binarize and return'''

   path, col_name, col_meta, native = file_data_tuple
   if not native: 
       # Either this or using a top level hdfs_con
       hdfs_con = pa.hdfs.connect(params)     
   read_pq = pq.read_table if native else hdfs_con.read_parquet

   arrow_col = read_pq(filepath, columns = (col_name,))
   bin_col = imported_binarizng_function(arrow_col)
   return bin_col

def read_binarize_parallel(filepaths):
    ''' Setup parallel reading and binarizing of a parquet file'''

    # list of tuples containing the filepath, column name, meta, and mode   
    pool_params = [(),..] 
    pool = Pool()
    for file in filepaths:
        bin_cols = pool.map(binarizer, pool_params)
        chunk =  b''.join(bin_cols)
        send_over_socket(chunk)

Это работает, когда я использую основной режим, то есть чтение файлов из локальной файловой системы.

Однако, если я пытаюсь прочитать hdfs, я получаю странные (для меня) ошибки стрелки, как при открытии соединения внутри каждого процесса, так и при попытке использовать одно и то же. Вот сжатая версия ошибки:

[libprotobuf ОШИБКА google/protobuf/message_lite.cc:123] Не удается проанализировать сообщение типа "Hdfs.Internal.RpcResponseHeaderProto", так как в нем отсутствуют обязательные поля: callId, status [libprotobuf ОШИБКА google/protobuf/message_lite.cc:123] Невозможно проанализировать сообщение типа "Hdfs.Internal.RpcResponseHeaderProto", поскольку в нем отсутствуют обязательные поля: callId, status [libprotobuf ОШИБКА google/protobuf/message_lite.cc:123] Невозможно проанализировать сообщение типа "Hdfs.Internal.RpcResponseHeaderProto "потому что в нем отсутствуют обязательные поля: callId, status [libprotobuf ОШИБКА google/protobuf/message_lite.cc:123] Не удается проанализировать сообщение типа" Hdfs.Internal.RpcResponseHeaderProto ", так как в нем отсутствуют обязательные поля: callId, status 2018-01-09 21:41:47.939006, p10007, th139965275871040, ОШИБКА Не удалось вызвать вызов RPC "getFileInfo" на сервере "192.168.0.101:9000": RpcChannel.cpp: 703: HdfsRpcException: канал RPC для "192.168.0.101000" получено несоответствие протокола: канал RPC не может найти ожидающий вызов: id = 3. @ Неизвестно

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, стрелка::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*) @ Неизвестно @ Неизвестно

@   Unknown

2018-01-09 21:41:47.939103, p10007, th139965275871040, INFO Повторите попытку идемпотентного вызова RPC "getFileInfo" на сервере "192.168.0.101:9000" 2018-01-09 21:41:47.939357, p10010, th139965275871040, Ошибка не выполнена вызовите вызов RPC "getFileInfo" на сервере "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: канал RPC для "192.168.0.101:9000" получил несоответствие протокола: канал RPC не может проанализировать заголовок ответа. @ Неизвестно

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, стрелка::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Неизвестно @ Неизвестно

@   Unknown
@2018-01-09 21:41:47.939406, p10008, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: канал RPC для "192.168.0.101:9000" получил несоответствие протокола: канал RPC не может проанализировать заголовок ответа. @ Неизвестно

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, стрелка::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Неизвестно

@   Unknown 2018-01-09 21:41:47.939422, p10013, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: канал RPC для "192.168.0.101:9000" получил несоответствие протокола: канал RPC не может проанализировать заголовок ответа. @ Неизвестно

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, стрелка::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Неизвестно

@   Unknown
@2018-01-09 21:41:47.939431, p10009, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: канал RPC для "192.168.0.101:9000" получил несоответствие протокола: канал RPC не может проанализировать заголовок ответа. @ Неизвестно

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, стрелка::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Неизвестно

@   Unknown
@   @   Unknown
Unknown 2018-01-09 21:41:47.939457, p10012, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: канал RPC для "192.168.0.101:9000" получил несоответствие протокола: канал RPC не может проанализировать заголовок ответа. @ Неизвестно

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, стрелка::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Неизвестно @ Неизвестно

@   Unknown
@   Unknown
Unknown
@   Unknown binarizing process filepath: /parquet_430mb/5e6.parquet
@   Unknown
Unknown
@   Unknown

@   Unknown


@   Unknown

2018-01-09 21:41:47.939854, p10010, th139965275871040, INFO Повторите попытку идемпотентного вызова RPC "getFileInfo" на сервере "192.168.0.101:9000"

2018-01-09 21: 41: 47.939864, p10013, th139965275871040, INFO Повторите идемпотентный вызов RPC "getFileInfo" на сервере "192.168.0.101:9000" 2018-01-09 21:41:47.939866, p10008, th139965275871040, INFO Retry idempote RPC-вызов "getFileInfo" на сервере "192.168.0.101:9000" 2018-01-09 21:41:47.939868, p10012, th139965275871040, INFO Повторите идемпотентный RPC-вызов "getFileInfo" на сервере "192.168.0.101:9000" 2018-01-09 21:41:47.939868, p10009, th139965275871040, INFO Повторите идемпотентный вызов RPC "getFileInfo" на сервере "192.168.0.101:9000" 2018-01-09 21:41:47.940813, p10014, th139965275871040, Ошибка не удалось вызвать RPC getFileInfo "на сервере" 192.168.0.101:9000 ": RpcChannel.cpp: 780: HdfsRpcException: канал RPC для" 192.168.0.101:9000 "получил несоответствие протокола: канал RPC не может проанализировать заголовок ответа. @ Неизвестно

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, стрелка::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Неизвестно

@   Unknown

2018-01-09 21:41:47.940937, p10014, th139965275871040, INFO Повторите идемпотентный вызов RPC "getFileInfo" на сервере "192.168.0.101:9000" 2018-01-09 21:41:47.944352, p10011, th139965275871040, Ошибка не выполнена вызовите вызов RPC "getFileInfo" на сервере "192.168.0.101:9000": RpcChannel.cpp: 393: HdfsRpcException: не удалось вызвать вызов RPC "getFileInfo" на сервере "192.168.0.101:9000" @ Unknown @ Unknown

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, стрелка::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Неизвестно

@   Unknown Caused by TcpSocket.cpp: 127: HdfsNetworkException: Write 124 bytes failed to "192.168.0.101:9000": (errno: 32) Broken

труба @ Неизвестно @ Неизвестно

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, стрелка::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Неизвестно @ Неизвестно

@   Unknown

2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO Повторите попытку идемпотентного вызова RPC "getFileInfo" на сервере "192.168.0.101:9000" --------------------------------------------------------------------------- ArrowIOError Traceback (последний вызов был последним)

/home/parquet_sender.pyc в insert_files_parallel(self) 374 # print ('372 sqparquet filepath:', filepath) 375 params_with_path_and_mode = [col_params+(filepath, native) для col_params в pool_params] -> 376 bin_col = self.pool (read_binarize, params_with_path_and_mode) 377 получил ('карта завершена') 378 num_rows = bin_col [0] [2]

/usr/lib/python2.7/multiprocessing/pool.pyc в карте (self, func, iterable, chunksize) 249 ''' 250 assert self._state == RUN -> 251 return self.map_async (func, iterable, chunksize).get () 252 253 def imap (self, func, iterable, chunksize = 1):

/usr/lib/python2.7/multiprocessing/pool.pyc в get(self, timeout) 556 return self._value 557 else: -> 558 повысить self._value 559 560 def _set (self, i, obj):

ArrowIOError: HDFS: сбой GetPathInfo

Я был бы рад получить какие-либо отзывы о причине этой ошибки и о том, как я должен использовать параллельную загрузку паркета.

1 ответ

Решение

Эта ошибка связана с деталями многопроцессорной сериализации. Я открыл отчет об ошибке здесь https://issues.apache.org/jira/browse/ARROW-1986

Другие вопросы по тегам