KTABLE не обнаруживает те же ключи. (Вставляет запись вместо обновления)

Случай использования

Цель состоит в том, чтобы идентифицировать входящие события / строки, чтобы проверить, является ли это новой строкой или обновлением. Новая строка перейдет к другой теме, а строка обновления перейдет к другой теме.

Подход: создайте таблицу поиска (KTABLE) и выполните две операции соединения 1. Внутреннее соединение для обнаружения обновления. 2. Левое соединение, когда правая клавиша таблицы пуста, чтобы обнаружить строку Вставить / Новая. Создайте два потока из результата вышеупомянутых двух операций. Запустите вставку в запрос к потоку, который вставит записи в таблицу поиска.

Шаги, чтобы повторить проблему здесь: (Занимает 7 минут)

Шаг 1 docker-compose up Последняя платформа Confluent 5.1.0 с докером.

Шаг 2 docker ps

Примечание: убедитесь, что брокер работает. Брокер часто останавливался в моем регионе.

Шаг 3

Зайдите в bash реестра схемы в новом терминале.(Это легко контролировать, если оставить этот терминал открытым).

docker run -it --net=cp-all-in-one_default --rm confluentinc/cp-schema-registry:5.1.0 bash

Шаг 4

Создать таблицу поиска. С темой LOAD.TEST.LOCAL.LOOKUP.TABLE. Моя схема имеет ключ типа String. Три образца записи ниже. Сначала вы заполняете таблицу поиска первичными 3 фиктивными записями.

kafka-avro-console-producer --broker-list broker:9092 --topic LOAD.TEST.LOCAL.LOOKUP.TABLE \
    --property schema.registry.url=http://schema-registry:8081 \
    --property parse.key=true \
    --property key.separator=, \
    --property key.schema='{"type":"string"}' \
    --property value.schema='{"name":"LOAD.TEST.LOCAL.LOOKUP.TABLE","type":"record","namespace":"example.sender.batch","fields":[{"name":"SENDER_CODE","type":"string"},{"name":"SENDER_NAME","type":"string"},{"name":"SENDER_CATEGORY_CODE","type":"string"},{"name":"SENDER_AGENCY_CODE","type":"string"},{"name":"SENDER_SUB_AGENCY_CODE","type":"string"},{"name":"SENDER_FOREIGN_IND","type":"string"},{"name":"SENDER_FOREIGN_COUNTRY","type":"string"},{"name":"SENDER_NAME_ALTERNATE","type":"string"},{"name":"PARENT_SENDER_CODE","type":"string"},{"name":"CHANGE_DATE","type":"string"},{"name":"REQUESTING_LOCATION","type":"string"},{"name":"REQUEST_DATE","type":"string"},{"name":"REPLACEMENT_SENDER_CODE","type":"string"},{"name":"SENDER_STATUS","type":"string"},{"name":"SENDER_DUNS","type":"string"},{"name":"ADDRESSLINE1","type":"string"},{"name":"ADDRESSLINE2","type":"string"},{"name":"ADDRESSLINE3","type":"string"},{"name":"ADDRESS4","type":"string"},{"name":"CITY","type":"string"},{"name":"STATE","type":"string"},{"name":"POSTAL_CODE","type":"string"},{"name":"URL","type":"string"},{"name":"SENDER_ACRONYM","type":"string"},{"name":"DEACTIVATED_DATE","type":"string"},{"name":"Kafka_TimeEvent","type":"string"}]}'

Теперь вы можете вставить записи ниже. Просто вставьте 3 записи ниже. Если вы нажимаете клавишу возврата несколько раз и получаете исключение, просто запустите ту же команду выше и вставьте ее после нажатия return один раз.

"SVI6FQ",{"SENDER_CODE":"SVI6FQ","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FR",{"SENDER_CODE":"SVI6FR","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"374 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 973","ADDRESSLINE3":"MAILBOXC","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FN",{"SENDER_CODE":"SVI6FN","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"372 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXA","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"} 

Нажмите ⌘+c выходить.

Шаг 5 На другом терминале откройте интерфейс командной строки KSQL

docker run --network cp-all-in-one_default --interactive --tty --rm confluentinc/cp-ksql-cli:latest http://ksql-server:8088

Шаг 6 Создайте KTABLE.

create table load_test_local_lookup_table with (KAFKA_TOPIC='LOAD.TEST.LOCAL.LOOKUP.TABLE',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');

Шаг 7 Обязательно установите свойство ниже, чтобы вы могли видеть результаты с начального смещения. Запустите это в KSQL.

ksql> SET 'auto.offset.reset'='earliest';

Вы увидите следующее сообщение. Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

Шаг 8 Теперь создайте тему, где ваши события будут транслироваться. Используйте bash реестра схем из шага 4. Кроме того, заполните те же записи в основной теме.

 kafka-avro-console-producer --broker-list broker:9092 --topic LOAD.TEST.LOCAL.EVENT.STREAM \
    --property schema.registry.url=http://schema-registry:8081 \
    --property parse.key=true \
    --property key.separator=, \
    --property key.schema='{"type":"string"}' \
    --property value.schema='{"name":"LOAD.TEST.LOCAL.EVENT.STREAM","type":"record","namespace":"example.sender.batch","fields":[{"name":"SENDER_CODE","type":"string"},{"name":"SENDER_NAME","type":"string"},{"name":"SENDER_CATEGORY_CODE","type":"string"},{"name":"SENDER_AGENCY_CODE","type":"string"},{"name":"SENDER_SUB_AGENCY_CODE","type":"string"},{"name":"SENDER_FOREIGN_IND","type":"string"},{"name":"SENDER_FOREIGN_COUNTRY","type":"string"},{"name":"SENDER_NAME_ALTERNATE","type":"string"},{"name":"PARENT_SENDER_CODE","type":"string"},{"name":"CHANGE_DATE","type":"string"},{"name":"REQUESTING_LOCATION","type":"string"},{"name":"REQUEST_DATE","type":"string"},{"name":"REPLACEMENT_SENDER_CODE","type":"string"},{"name":"SENDER_STATUS","type":"string"},{"name":"SENDER_DUNS","type":"string"},{"name":"ADDRESSLINE1","type":"string"},{"name":"ADDRESSLINE2","type":"string"},{"name":"ADDRESSLINE3","type":"string"},{"name":"ADDRESS4","type":"string"},{"name":"CITY","type":"string"},{"name":"STATE","type":"string"},{"name":"POSTAL_CODE","type":"string"},{"name":"URL","type":"string"},{"name":"SENDER_ACRONYM","type":"string"},{"name":"DEACTIVATED_DATE","type":"string"},{"name":"Kafka_TimeEvent","type":"string"}]}'

"SVI6FQ",{"SENDER_CODE":"SVI6FQ","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FR",{"SENDER_CODE":"SVI6FR","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"374 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 973","ADDRESSLINE3":"MAILBOXC","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FN",{"SENDER_CODE":"SVI6FN","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"372 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXA","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}

Шаг 9

Создать поток для этой темы события.

create stream load_test_local_event_stream with (KAFKA_TOPIC='LOAD.TEST.LOCAL.EVENT.STREAM',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');

Шаг 10

Получите поток после объединения Stream-Table, который будет использоваться для обнаружения уже существующей строки. Мы называем это update_stream. Тема, созданная для этого потока ниже, будет иметь только обновления. Это один из моих вариантов использования. Я должен фильтровать сообщения, которые являются обновлениями.

create stream load_test_update_stream as select event.*  FROM load_test_local_event_stream event JOIN  load_test_local_lookup_table lookup ON event.sender_code = lookup.sender_Code;

Шаг 11

Создайте поток с темой, которая используется для таблицы поиска. Так что, если вы хотите обновить таблицу поиска, вы можете вставить в этот поток.(Если я не ошибаюсь: вы не можете напрямую вставить в ktable из потока.). Так и делаю.

create stream load_test_lookup_feed_stream with (KAFKA_TOPIC='LOAD.TEST.LOCAL.LOOKUP.TABLE',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');

Шаг 12 Запустите вставку в запрос. Этот запрос будет вставлен в поток канала поиска в таблице, который обновит таблицу поиска, когда сообщение будет доступно в update_stream.

Insert into load_test_lookup_feed_stream  select EVENT_SENDER_CODE AS SENDER_CODE, EVENT_SENDER_NAME AS SENDER_NAME, EVENT_SENDER_CATEGORY_CODE AS SENDER_CATEGORY_CODE , EVENT_SENDER_AGENCY_CODE AS SENDER_AGENCY_CODE , EVENT_SENDER_SUB_AGENCY_CODE AS SENDER_SUB_AGENCY_CODE, EVENT_SENDER_FOREIGN_IND AS SENDER_FOREIGN_IND, EVENT_SENDER_FOREIGN_COUNTRY AS SENDER_FOREIGN_COUNTRY  , EVENT_SENDER_NAME_ALTERNATE AS SENDER_NAME_ALTERNATE, EVENT_PARENT_SENDER_CODE AS PARENT_SENDER_CODE ,EVENT_CHANGE_DATE AS CHANGE_DATE, EVENT_REQUESTING_LOCATION AS REQUESTING_LOCATION , EVENT_REQUEST_DATE AS REQUEST_DATE, EVENT_REPLACEMENT_SENDER_CODE AS REPLACEMENT_SENDER_CODE  , EVENT_SENDER_STATUS AS SENDER_STATUS, EVENT_SENDER_DUNS AS SENDER_DUNS , EVENT_ADDRESSLINE1 AS ADDRESSLINE1 , EVENT_ADDRESSLINE2 AS ADDRESSLINE2, EVENT_ADDRESSLINE3 AS ADDRESSLINE3 , EVENT_ADDRESS4 AS ADDRESS4  , EVENT_CITY AS CITY , EVENT_STATE AS STATE, EVENT_POSTAL_CODE AS POSTAL_CODE, EVENT_URL AS URL, EVENT_SENDER_ACRONYM AS SENDER_ACRONYM , EVENT_DEACTIVATED_DATE AS DEACTIVATED_DATE, EVENT_KAFKA_TIMEEVENT AS KAFKA_TIMEEVENT from load_test_update_stream partition by SENDER_CODE ;

: exclamation: Проблема: Хотя это обновляет мою таблицу поиска, она обновляется как новая запись. Не как обновление. повторить эту проблему (выполните шаги 15А).

Шаг 13 Очень похоже на обновление, создайте поток, который будет обнаруживать новые записи в событии.

create stream load_test_insert_stream as select event.*  FROM load_test_local_event_stream event left JOIN  load_test_local_lookup_table lookup ON event.sender_code = lookup.sender_Code where lookup.sender_Code is null ;

Проверка. При желании можно выполнить только запрос выбора, чтобы понять, что происходит. Если ваш bash схемы-реестра открыт, вставьте новую запись со своим собственным ключом.(Попробуйте вставить новую запись, как в разделе 15А). Это новое сообщение будет доступно в этом потоке.

Шаг 14 Создайте вставку в запрос, как раньше. Это вставка обратно в таблицу поиска. Теперь ваша таблица поиска заполнена новым сообщением.

Insert into load_test_lookup_feed_stream select EVENT_SENDER_CODE AS SENDER_CODE, EVENT_SENDER_NAME AS SENDER_NAME, EVENT_SENDER_CATEGORY_CODE AS SENDER_CATEGORY_CODE , EVENT_SENDER_AGENCY_CODE AS SENDER_AGENCY_CODE , EVENT_SENDER_SUB_AGENCY_CODE AS SENDER_SUB_AGENCY_CODE, EVENT_SENDER_FOREIGN_IND AS SENDER_FOREIGN_IND, EVENT_SENDER_FOREIGN_COUNTRY AS SENDER_FOREIGN_COUNTRY  , EVENT_SENDER_NAME_ALTERNATE AS SENDER_NAME_ALTERNATE, EVENT_PARENT_SENDER_CODE AS PARENT_SENDER_CODE ,EVENT_CHANGE_DATE AS CHANGE_DATE, EVENT_REQUESTING_LOCATION AS REQUESTING_LOCATION , EVENT_REQUEST_DATE AS REQUEST_DATE, EVENT_REPLACEMENT_SENDER_CODE AS REPLACEMENT_SENDER_CODE  , EVENT_SENDER_STATUS AS SENDER_STATUS, EVENT_SENDER_DUNS AS SENDER_DUNS , EVENT_ADDRESSLINE1 AS ADDRESSLINE1 , EVENT_ADDRESSLINE2 AS ADDRESSLINE2, EVENT_ADDRESSLINE3 AS ADDRESSLINE3 , EVENT_ADDRESS4 AS ADDRESS4  , EVENT_CITY AS CITY , EVENT_STATE AS STATE, EVENT_POSTAL_CODE AS POSTAL_CODE, EVENT_URL AS URL, EVENT_SENDER_ACRONYM AS SENDER_ACRONYM , EVENT_DEACTIVATED_DATE AS DEACTIVATED_DATE, EVENT_KAFKA_TIMEEVENT AS KAFKA_TIMEEVENT from load_test_insert_stream partition by SENDER_CODE ;

Шаг 15

В чем проблема: как тиражировать.

Шаг 15А. Как вставить новый образец записи

Запустите команду на шаге 8 (со схемой). Вставьте / вставьте новую запись, как показано ниже. Обратите внимание, я изменил и ключ сообщения, и код отправителя. Всегда ваш ключ сообщения и ключ строки должны совпадать. например: "SVI6FW","SENDER_CODE":"SVI6FW

"SVI6FW",{"SENDER_CODE":"SVI6FW","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}

Шаг 15Б. Как обновить образец записи

Очень похоже на предыдущую (15А) вставку новой записи, но используйте тот же ключ сообщения и просто обновите имя или какое-либо значение. Например, 'SAM II' стал 'SAM III'

"SVI6FW",{"SENDER_CODE":"SVI6FW","SENDER_NAME":"SENDER SAM III","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}

Проблема Если вы видите, Моя справочная таблица не обновляется, она обрабатывает каждое сообщение как новое, даже если оно отправлено с тем же ключом. Из-за этого я не могу обнаружить обновления. Каждое сообщение идет как новое сообщение.

Вы можете проверить, выполнив следующие действия.

  1. Отправьте новое сообщение с вашим собственным ключом (15А). Это будет доступно в load_test_insert_stream.
  2. Отправить обновленное сообщение с тем же ключом, как 15B. Он должен быть доступен в load_test_update_stream, но он собирается загрузить load_test_insert_stream. и таблица поиска обрабатывает его как новое сообщение.

Любой новый подход / предложения приветствуются!

0 ответов

Я предполагаю, что ваш дизайн в основном близок к:

-- stream of inputs:
CREATE STREAM INPUT (ID INT KEY, V0 INT) WITH (kafka_topic='test_topic', value_format='JSON', PARTITIONS=1);

-- table built from the stream of inserts:
CREATE TABLE EXISTING (ID INT PRIMARY KEY, IGNORED INT) WITH (kafka_topic='INSERTS', value_format='JSON', PARTITIONS=1);

-- stream of inserts:
CREATE STREAM INSERTS AS SELECT INPUT.ID, INPUT.V0 AS V0 FROM INPUT LEFT JOIN EXISTING ON INPUT.ID = EXISTING.ID WHERE EXISTING.ID IS NULL;

-- stream of updates:
CREATE STREAM UPDATES AS SELECT INPUT.ID, INPUT.V0 AS V0 FROM INPUT JOIN EXISTING ON INPUT.ID = EXISTING.ID;

Затем вы вставляете несколько записей:

INSERT INTO INPUT VALUES (1, 3);
INSERT INTO INPUT VALUES (2, 4);
INSERT INTO INPUT VALUES (1, 5);

И ожидайте, что первые две строки будут выведены в INSERTS поток, а последняя строка в OUTPUTS строка.

Я тестировал вышеизложенное на версии 0.11 ksqlDB, и он действительно работает.... иш.

Если вы вставляете каждую запись из интерфейса командной строки одну за другой, то результат будет таким, как и следовало ожидать. Однако, если вы вставите все три строки вместе, например, запустив их все в одной строке в CLI:

INSERT INTO INPUT VALUES (1, 3);INSERT INTO INPUT VALUES (2, 4);INSERT INTO INPUT VALUES (1, 5);

Тогда все три ряда попадают в INSERTSручей. Зачем? вы можете спросить.

tl; dr; раствор хрупкий. Не получится, если обновления приблизятся к вставкам.

В конструкции присутствует состояние гонки. Если объединение обрабатывает вторую входную строку до того, как первые строки выводятся вINSERTSтема и соединение опрошено, чтобы прочитать эту строку, затемEXISTING таблица не будет включать строку, поэтому вторая строка будет неправильно отправлена ​​в INSERTS а не `ОБНОВЛЕНИЯ.

Есть несколько конфигураций, с которыми вы можете поэкспериментировать, чтобы увидеть, сможете ли вы заставить это работать для вашего варианта использования.

  • Настройка max.task.idle.ms выше означает, что соединение будет ждать дольше, пока данные не появятся на стороне таблицы соединения. Однако это не поможет, если обновления и вставки выполняются за одну и ту же миллисекунду, а их увеличение снизит пропускную способность и задержку.
  • Настройка cache.max.bytes.buffering до нуля отключит буферизацию в библиотеке Streams, что может помочь.
  • Настройка linger.ms к нулю будет означать, что производитель Kafka не откладывает отправку сообщений.

Даже при всем этом система работает асинхронно, и ваши результаты могут отличаться. Если обновления никогда не происходят рядом со вставками, система будет работать. Однако, если обновления могут приближаться к вставкам, вы можете обнаружить, что они неправильно отнесены к категории вставок.

Другие вопросы по тегам