Проблема с реализацией пакетного чтения MongoDB с набором реплик потока изменений

Проблема:
процесс создания вывода записывает около 300 данных вывода в коллекцию MongoDB в секунду. Функция потока изменений в MongoDB используется другим процессом для обратного чтения этих выводов и выполнения постобработки. В настоящее время при вызове API функции потока изменений (mongoc_change_stream_next()) возвращается только один вывод данных. Итак, всего требуется 300 таких вызовов, чтобы все данные логического вывода были сохранены в течение 1 секунды. Однако после каждого считывания требуется около 50 мс времени для выполнения постобработки единичных / множественных данных вывода. Из-за единой модели возврата данных вводится эффективная задержка в 15 раз. Чтобы решить эту проблему, мы пытаемся реализоватьмеханизм пакетного чтения в соответствии с функцией потока изменений MongoDB. Мы пробовали различные варианты реализации того же самого, но по-прежнему получали только одни данные после каждого вызова API потока изменений. Есть ли способ решить эту проблему?

Платформа:
ОС: Ubuntu 16.04
Mongo-c-driver: 1.15.1
Сервер Mongo: 4.0.12

Варианты опробованы:
Установка размера пакета курсора больше 1.

int main(void) {
    const char *uri_string = "mongodb://localhost:27017/replicaSet=set0";
    mongoc_change_stream_t *stream;
    mongoc_collection_t *coll;
    bson_error_t error;
        mongoc_uri_t *uri;
    mongoc_client_t *client;

    /*
    * Add the Mongo DB blocking read and scall the inference parse function with the Json
                 * */
    uri = mongoc_uri_new_with_error (uri_string, &error);
    if (!uri) {
        fprintf (stderr,
        "failed to parse URI: %s\n"
        "error message:       %s\n",
        uri_string,
        error.message);
        return -1;
    }

    client = mongoc_client_new_from_uri (uri);
    if (!client) {
        return -1;
    }

    coll = mongoc_client_get_collection (client,  <DB-NAME>, <collection-name>);
    stream = mongoc_collection_watch (coll, &empty, NULL);
    mongoc_cursor_set_batch_size(stream->cursor, 20);
    while (1){
        while (mongoc_change_stream_next (stream, &doc)) {
            char *as_json = bson_as_relaxed_extended_json (doc, NULL); 
            ............
            ............
            //post processing consuming 50 ms of time
            ............
            ............
        }
        if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
            if (!bson_empty (err_doc)) {
                fprintf (stderr,
                "Server Error: %s\n",
                bson_as_relaxed_extended_json (err_doc, NULL));
            } else {
                fprintf (stderr, "Client Error: %s\n", error.message);
            }
            break;
        }
    }
    return 0;
}

1 ответ

В настоящее время при вызове API функции потока изменений (mongoc_change_stream_next()) возвращается только один вывод данных.

Технически это не означает, что возвращается один документ. Это связано с тем, что mongoc_change_stream_next() выполняет итерацию базового курсора, устанавливая каждыйbsonк следующему документу. Таким образом, даже если возвращаемый размер пакета больше единицы, он все равно должен повторяться для каждого документа.

Вы можете попробовать:

  • Создавайте отдельные потоки для параллельной обработки документов, чтобы вам не приходилось ждать 50 мс на документ или 15 секунд в сумме.

  • Прокрутите пакет документов, т. Е. 50 кешируют их, затем выполните пакетную обработку

  • Пакетная обработка их в отдельных потоках (комбинация двух вышеперечисленных)

Другие вопросы по тегам