Параллельное программирование в pyRserve с использованием сельдерея

У меня есть фрейм данных, состоящий из временных рядов.

Указатель даты | Временной ряд 1 | Временной ряд 2 | ... и так далее

Я использовал pyRserve для запуска функции прогнозирования с использованием R.

Я хочу реализовать параллельную обработку с использованием сельдерея. Я написал рабочий код в следующем контексте.

def pipeR(k #input variable):
    conn = pyRserve.connect(host = 'localhost', port = 6311)
    # OPENING THE CONNECTION TO R

    conn.r.i = k
    # ASSIGNING THE PYTHON VARIABLE TO THAT OF IN THE R ENVIRONMENT

    conn.voideval\('''
    WKR_Func <- forecst(a)
    {
    ...# FORECASTS THE TIMESERIES IN COLUMN a OF THE DATAFRAME
    }
    ''')

    conn.eval('forecst(i)')
    # CALLING THE FUNCTION IN R

group(pipeR.s(k) for k in [...list of column headers...])()

Чтобы реализовать параллельную обработку, могу ли я иметь один порт для всех рабочих процессов (как я делал в приведенном выше коде, порт:6311) или у меня должны быть разные порты для разных рабочих процессов?

Я сейчас получаю ошибку

Ошибка в socketConnection("localhost", порт = порт, сервер = ИСТИНА, блокировка = ИСТИНА,: не удается открыть соединение

в Р.

1 ответ

Проблема была решена, когда я открыл разные порты для каждого рабочего процесса...

def pipeR( k, Frequency, Horizon, Split, wd_path):
    # GENERATING A RANDOM PORT
    port = randint(1000,9999)

    # OPENING THE PORT IN THE R ENVIRONMENT
    conn0 = pyRserve.connect(host = 'localhost', port = 6311)
    conn0.r.port = port
    conn0.voidEval\
    ('''
        library(Rserve)
        Rserve(port = port, args = '--no-save')
     ''')

    # OPENING THE PORT IN THE PYTHON ENVIRONMENT
    conn = pyRserve.connect(host = 'localhost', port = port)

    # ASSIGNING THE PYTHON VARIABLE TO THAT OF IN THE R ENVIRONMENT
    conn.r.i = k

    conn.voideval\
    ('''
     WKR_Func <- forecst(a)
     {
     ...# FORECASTS THE TIMESERIES IN COLUMN a OF THE DATAFRAME
     }
     ''')

    conn.eval/('forecst(i)')
    conn0.close()
Другие вопросы по тегам