Попытка загрузить сообщения Kafka в режиме реального времени в S3 Bucket
Я уже настроил свои конфигурации Kafka S3, однако я пытаюсь использовать сообщения в реальном времени в S3 Bucket, когда сообщения создаются. Пока что, когда я захожу внутрь модуля и тестирую, вводится только тестируемое мной сообщение. Однако я пытаюсь принять все сообщения, которые создаются на стороне процессора Kafka.
curl -X POST $host -H "Content-Type: application/json" --data '{
"name": "'"$connector"'",
"config": {
"tasks.max":"9",
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"aws.access.key.id":"access-key-id",
"aws.secret.access.key":"secret-access-key",
"s3.credentials.provider.class":"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"flush.size":"5",
"topics":"topic name",
"s3.region": "us-west-2",
"s3.bucket.name": "bucket name",
"s3.part.size": "5242880",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"locale":"en",
"timezone":"America/Los_Angeles",
"path.format":"YYYY-MM-dd-HH",
"partition.duration.ms":"360000",
"rotate.schedule.interval.ms":"300000",
"schema.compatibility": "NONE",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true,
"name": "'"$connector"'"
}
}'
И я запускаю свой файл развертывания с помощью этой команды
command:
- bash
- -c
- |
aws configure set aws_access_key_id access-key-id &
aws configure set aws_secret_access_key secret-access-key &
aws configure set default.region us-west-2 &
/etc/confluent/docker/run &
bin/connect-distributed.sh config/worker.properties &
echo "Waiting for Kafka Connect to start listening on localhost"
while [ $$(curl -s -o /dev/null -w %{http_code} http://localhost:8084/connectors) -eq 000 ] ; do
echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://localhost:8084/connectors) " (waiting for 200)"
sleep 5
done
nc -vz localhost 8084
bash create-connector.sh
echo -e "\n--\n+> Created S3 Bucket container"
sleep infinity
Is there more config i have to enter
1 ответ
В
config/worker.properties
, вам нужно добавить
consumer.auto.offset.reset=earliest
утопить все существующие сообщения
Вы также можете попробовать то же свойство, но с префиксом
consumer.override
в самой конфигурации коннектора