Многопоточность Bigquery Python API не повышает производительность

Я использую Python BigQuery API, чтобы вывести список содержимого таблицы, а затем обработать данный ответ JSON.

  • 100 000 записей с основным потоком занимает около 30 секунд
  • 10000 записей с основным потоком занимает около 4 секунд
  • 100 000 записей с 10 потоками или 5 занимает примерно 20 секунд.

другая полезная информация.

на 100 000 записей с основным потоком.

  • Результат выборки элемента списка (вызовы REST) ​​- 25 секунд
  • результат разбора - 5 секунд
  • результат записи - 2 секунды

Разве это не займет меньше времени?

Кто-нибудь, пожалуйста, дайте мне знать, в чем причина этого отставания?

def _get_values(val):
    if isinstance(val, datetime.datetime):
        return str(val)
    else:
        return val

def map_schema(row):
    row_dict = {}
    values = row.values()
    field_to_index = row._xxx_field_to_index
    for field, index in field_to_index.iteritems():
        row_dict[str(field)] = _get_values(values[index])
    return row_dict

def write_json(file, row):
    file.write(json.dumps(row))


def _save_rows(table, start_index, max_row, file):
    rows = client.list_rows(table, max_results=max_row, start_index=start_index)
    for row in rows:
        processedRow = map_schema(row)
        write_json(file, processedRow)

def run():
    threads = []
    dataset_ref = client.dataset('hacker_news', project='bigquery-public-data')
    table_ref = dataset_ref.table('comments')
    table = client.get_table(table_ref)  # API call
    import time
    start = time.time()
    output_file = open("temp_t.json", "a")

    total_rows = 100000
    total_threads = 10
    max_row = total_rows/total_threads

    # 10 threads takes ~ 20 seconds
    # 5 threads takes the same ~ 20 seconds
    files = []
    for index in range(0, total_rows, max_row):
        file_name = "%s.json" % index
        files.append(open(file_name, "a"))
        threads.append(threading.Thread(target=_save_rows, args=(table, index, max_row, output_file)))

    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    for file in files:
        file.close()


    # takes ~ 30 seconds
    # _save_rows(table, 0, 100000, output_file)

    # takes ~ 4 seconds
    # _save_rows(table, 0, 10000, output_file)

    output_file.close()
    print "total time = %f" % (time.time() - start)

run()

0 ответов

Другие вопросы по тегам