Попытка загрузить сообщения Kafka в режиме реального времени в S3 Bucket

Я уже настроил свои конфигурации Kafka S3, однако я пытаюсь использовать сообщения в реальном времени в S3 Bucket, когда сообщения создаются. Пока что, когда я захожу внутрь модуля и тестирую, вводится только тестируемое мной сообщение. Однако я пытаюсь принять все сообщения, которые создаются на стороне процессора Kafka.

      curl -X POST $host -H "Content-Type: application/json" --data '{
  "name": "'"$connector"'",
  "config": {
  "tasks.max":"9",
  "connector.class":"io.confluent.connect.s3.S3SinkConnector",
  "aws.access.key.id":"access-key-id",
  "aws.secret.access.key":"secret-access-key",
  "s3.credentials.provider.class":"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
  "flush.size":"5",
 "topics":"topic name",
  "s3.region": "us-west-2",
  "s3.bucket.name": "bucket name",
  "s3.part.size": "5242880",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "locale":"en",
  "timezone":"America/Los_Angeles",
  "path.format":"YYYY-MM-dd-HH",
  "partition.duration.ms":"360000",
  "rotate.schedule.interval.ms":"300000",
  "schema.compatibility": "NONE",
  "errors.tolerance": "all",
  "errors.log.enable":true,
  "errors.log.include.messages":true,
  "name": "'"$connector"'"
  }
  }'

И я запускаю свой файл развертывания с помощью этой команды

      command:
  - bash
  - -c
  - |
  aws configure set aws_access_key_id access-key-id &
  aws configure set aws_secret_access_key secret-access-key &
  aws configure set default.region us-west-2 &
  /etc/confluent/docker/run &
  bin/connect-distributed.sh config/worker.properties &
  echo "Waiting for Kafka Connect to start listening on localhost"
  while [ $$(curl -s -o /dev/null -w %{http_code} http://localhost:8084/connectors) -eq 000 ] ; do
  echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://localhost:8084/connectors) " (waiting for 200)"
  sleep 5
  done
  nc -vz localhost 8084
  bash create-connector.sh
  echo -e "\n--\n+> Created S3 Bucket container"
  sleep infinity

Is there more config i have to enter

1 ответ

В config/worker.properties, вам нужно добавить consumer.auto.offset.reset=earliest утопить все существующие сообщения

Вы также можете попробовать то же свойство, но с префиксом consumer.override в самой конфигурации коннектора

Другие вопросы по тегам