Как одновременно выполнять запросы с помощью Impala с кодом Python?

Контекст

Я использую Python (3.7) для выполнения нескольких запросов на сервере Hadoop.

После нескольких тестов я считаю, что Impala - самый эффективный движок для запросов к базе данных. Поэтому я установил соединение с использованием инфраструктуры Ibis, чтобы принудительно использовать Impala (по умолчанию используется Hive).

Учитывая количество запросов, я пытаюсь одновременно выполнять эти запросы.

Я думаю, что приближаюсь, но у меня возникла проблема при попытке совместного использования соединения с сервером с помощью Ibis и нескольких запускаемых мной процессов.

Я новичок в Python, но я сделаю все возможное, чтобы четко объяснить свою проблему и использовать правильный словарь. Пожалуйста, заранее простите меня за любую ошибку...!

Как отправляются запросы

Для отправки моих запросов код выглядит так:

  • Подключение к базе данных:

hdfs = ibis.hdfs_connect(host='XXXX', port=Y) client = ibis.impala.connect(host='XXXX',port=Y,hdfs_client=hdfs)

  • Создание запроса (выполняется несколько раз):

query = "ВЫБРАТЬ... ОТ... ГДЕ..."

  • Отправьте запрос и получите результаты (выполняется для каждого запроса):

query = self.client.sql (запрос) data = query.execute(ограничение = Нет)

Что было сделано для одновременного выполнения этих запросов

На данный момент я создал класс Process, использующий многопроцессорность, и передаю ему параметр клиента, который позволит установить соединение (по крайней мере, я думал), и список, содержащий информацию, необходимую для настройки запросов для запуска на сервер:

import multiprocessing

class ParallelDataRetrieving(multiprocessing.Process):

    """Process in charge of retrieving the data."""
    
    def __init__(self,client,someInformations):
    
    multiprocessing.Process.__init__(self)
    
    self.client = client
    self.someInformations = someInformations
    
def run(self):
    
    """Code to run during the execution of the process."""
    
    cpt1 = 0
    
    while cpt1 < len(someInformations):
        
        query = Use someInformations[cpt1] to create the query.
    
    query = self.client.sql(query)
    data = query.execute(limit = None)

    Some work on the data...
    
    return 0

Затем из основного скрипта я (пытаюсь) установить соединение и запускаю несколько процессов, используя это соединение:

hdfs = ibis.hdfs_connect(host='X.X.X.X', port=Y)
client = ibis.impala.connect(host='X.X.X.X',port=Y,hdfs_client=hdfs)

process_1 = ParallelDataRetrieving(client,someInformations)
process_1.start()
process_2 = ...

Но этот код не работает. Я получаю сообщение об ошибке "TypeError: не удается обработать объекты _thread.lock".

Насколько я понимаю, это происходит из-за того, что многопроцессорность использует Pickle для "инкапсуляции" параметров и передачи их процессам (чья память работает отдельно в Windows). Да и параметр "клиент" мариновать вроде не возможно.

Затем я нашел в Интернете несколько идей, которые пытались решить эту проблему, но ни одна из них, похоже, не применима к моему конкретному случаю (Ibis, Impala...):

  • Я попытался создать соединение непосредственно в методе run объекта Process (что означает одно соединение для каждого процесса): это приводит к "BrokenPipeError: [Errno 32] Broken pipe"

  • Я попытался использовать multiprocessing.sharedctypes.RawValue, но если это правильное решение, я не очень уверен, что правильно реализовал его в своем коде...

Вот в основном моя ситуация на данный момент. Я буду продолжать попытки решить эту проблему, но, как своего рода "новичок" в Python и многопроцессорной обработке запросов к базе данных, я подумал, что более продвинутый пользователь, вероятно, может мне помочь!

Заранее благодарим вас за время, которое вы уделите этому запросу!

0 ответов

Другие вопросы по тегам