Драйвер Python Cassandra такая же производительность вставки, как копия
Я пытаюсь использовать Python async с Cassandra, чтобы проверить, могу ли я записывать записи в Cassandra быстрее, чем команда CQL COPY.
Мой код Python выглядит так:
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
cluster = Cluster(['1.2.1.4'])
session = cluster.connect('test')
with open('dataImport.txt') as f:
for line in f:
query = SimpleStatement (
"INSERT INTO tstTable (id, accts, info) VALUES (%s) " %(line),
consistency_level=ConsistencyLevel.ONE)
session.execute_async (query)
но это дает мне ту же производительность, что и команда COPY... около 2700 строк / сек.... это должно быть быстрее с асинхронным?
Нужно ли использовать многопоточность в Python? Просто читать об этом, но не уверен, как это вписывается в это...
РЕДАКТИРОВАТЬ:
так что я нашел в Интернете что-то, что я пытаюсь изменить, но не могу заставить его работать... У меня пока есть это. Также я разбил файл на 3 файла в /Data/toImport/ dir:
import multiprocessing
import time
import os
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
cluster = Cluster(['1.2.1.4'])
session = cluster.connect('test')
def mp_worker(inputArg):
with open(inputArg[0]) as f:
for line in f:
query = SimpleStatement (
"INSERT INTO CustInfo (cust_id, accts, offers) values (%s)" %(line),
consistency_level=ConsistencyLevel.ONE)
session.execute_async (query)
def mp_handler(inputData, nThreads = 8):
p = multiprocessing.Pool(nThreads)
p.map(mp_worker, inputData, chunksize=1)
p.close()
p.join()
if __name__ == '__main__':
temp_in_data = file_list
start = time.time()
in_dir = '/Data/toImport/'
N_Proc = 8
file_data = [(in_dir) for i in temp_in_data]
print '----------------------------------Start Working!!!!-----------------------------'
print 'Number of Processes using: %d' %N_Proc
mp_handler(file_data, N_Proc)
end = time.time()
time_elapsed = end - start
print '----------------------------------All Done!!!!-----------------------------'
print "Time elapsed: {} seconds".format(time_elapsed)
но получите эту ошибку:
Traceback (most recent call last):
File "multiCass.py", line 27, in <module>
temp_in_data = file_list
NameError: name 'file_list' is not defined
2 ответа
Получил это работает так:
import multiprocessing
import time
import os
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
def mp_worker(inputArg):
cluster = Cluster(['1.2.1.4'])
session = cluster.connect('poc')
with open(inputArg[0]) as f:
for line in f:
query = SimpleStatement (
"INSERT INTO testTable (cust_id, accts, offers) values (%s)" %(line),
consistency_level=ConsistencyLevel.ONE)
session.execute_async (query)
def mp_handler(inputData, nThreads = 8):
p = multiprocessing.Pool(nThreads)
p.map(mp_worker, inputData, chunksize=1)
p.close()
p.join()
if __name__ == '__main__':
temp_in_data = ['/toImport/part-00000', '/toImport/part-00001', '/toImport/part-00002']
start = time.time()
N_Proc = 3
file_data = [(i,) for i in temp_in_data]
print '----------------------------------Start Working!!!!-----------------------------'
print 'Number of Processes using: %d' %N_Proc
mp_handler(file_data, N_Proc)
end = time.time()
time_elapsed = end - start
print '----------------------------------All Done!!!!-----------------------------'
print "Time elapsed: {} seconds".format(time_elapsed)
В этом посте "Пример многопроцессорной обработки для повышения пропускной способности массовых данных" представлены все детали, необходимые для повышения производительности приема больших объемов данных. В основном есть 3 механизма, и дополнительная настройка может быть выполнена на основе вашего варианта использования & hw:
- один процесс (это так в вашем примере)
- мультиобработка отдельных запросов
- одновременная обработка нескольких запросов
Размер пакетов и параллелизм - вот переменные, с которыми вам придется играть самостоятельно.