Как расшифровать пользовательское зашифрованное сообщение в Python Confluent_Kafka с помощью key.deserializer и value.deserializer?

Моя команда использует confluent_kafka для потоковой передачи в реальном времени и помещается в очередь с использованием настраиваемого шифрования. В основе технологии лежит Java.

Я пытаюсь получить сообщения с помощью python из темы и могу это сделать, но не могу расшифровать их с помощью python.

from confluent_kafka import *
import os
from confluent_kafka.serialization import StringDeserializer

os.environ['HTTPS_PROXY'] = "http://username:password@proxy.com:8080"


c = DeserializingConsumer({
    'bootstrap.servers': 'kafka.abcd.com:1010',
    'group.id':'SharedKafka2',
    'auto.offset.reset': 'latest',
    'security.protocol':'SSL',
    'ssl.ca.location':'CARoot.pem',
    'ssl.certificate.location':'certificate.pem',
    'ssl.key.location':'key.pem',
    'value.deserializer': StringDeserializer()
})

c.subscribe(['SharedKafka2_Topic'])


print("ConsumerStarted")
while True:
    msg = c.poll()
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print(msg.value())

Выход =b'\x85s\n\x0e{!G\xc7\xdfaV\x8e\x03a\xc4\x8e\xe1\xbc\xf4\xf4F\xb5<{\xecD\xc2\xf8jNN

Я не могу предоставить специальный ключ шифрования в десериализаторе значений. Ниже показано, как это делается в java:

props.put("key.deserializer","com.aexp.sec.kafka.serializers.CryptoDeSerializer");
props.put("value.deserializer","com.aexp.sec.kafka.serializers.CryptoDeSerializer");
props.put("CRYPTO_KEY_DESERIALIZER", StringDeserializer.class.getName());
props.put("CRYPTO_VALUE_DESERIALIZER", StringDeserializer.class.getName());
**props.put(String.format("%s_APIGEE_ENDPOINT", topic), apigeeUrl);
props.put("ENVIRONMENT", environment);
props.put(String.format("%s_KEY_IDENTIFIER", topic), keyIdentifier);
props.put(String.format("%s_APIGEE_CLIENT_ID", topic), clientID);
props.put(String.format("%s_APIGEE_CLIENT_SECRET", topic), clientSecret);**

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                buff.write(record.value()+"\n");
                buff.flush();
            }
        }

Как расшифровать строковый формат с помощью пользовательской расшифровки с помощью Python?

1 ответ

Это не ограничение Confluent Python. Программа Java полагается на этот специальный десериализатор, называемыйcom.aexp.sec.kafka.serializers.CryptoDeSerializerи, таким образом, здесь никто не может помочь. Что вам нужно сделать, так это придумать аналогичную реализацию на Python, которая выполняет ту же логику, реализованную в настраиваемом десериализаторе, а затем зарегистрировать ее при создании своего потребителя:

c = DeserializingConsumer({
'bootstrap.servers': 'kafka.abcd.com:1010',
'group.id':'SharedKafka2',
'auto.offset.reset': 'latest',
'security.protocol':'SSL',
'ssl.ca.location':'CARoot.pem',
'ssl.certificate.location':'certificate.pem',
'ssl.key.location':'key.pem',
'value.deserializer': YourCustomDeserializerInPython()

})

Если возможно, попытайтесь связаться с тем, кто реализовал CryptoDeSerializer класс на Java, чтобы понять, как должен выглядеть ваш код на Python.