Интеграция IBM Message Hub с IBM BPM
Я слежу за сообщением java sample from github. Я создал свою собственную версию производителя Kafka, и она отлично работает на J2Se env, используя основное приложение java. Фрагмент кода ниже -
String jasslocation = "C:\\HOME\\Technical\\bluemix\\git\\message-hub-samples\\java\\message-hub-kafka-ssl\\resources" + File.separator + "jaas.conf";
System.setProperty(JAAS_CONFIG_PROPERTY, jasslocation);
Properties props = new Properties();
props.put("key.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("bootstrap.servers","kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093");
props.put("acks","all");
// props.put("block.on.bu","true");
props.put("batch.size","1");
props.put("security.protocol","SASL_SSL");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
props.put("ssl.truststore.location","C:\\IBM\\BPM857\\java_1.7_64\\jre\\lib\\security\\cacerts");
//props.put("ssl.truststore.location","C:\\IBM\\IID\\v8.5\\jdk\\jre\\lib\\security\\cacerts");
props.put("ssl.truststore.password","changeit");
props.put("ssl.truststore.type","JKS");
props.put("ssl.endpoint.identification.algorithm","HTTPS");
String topic="mytopic";
String message="publish message to a topic";
try {
KafkaProducer<byte[], byte[]> kafkaProducer;
kafkaProducer = new KafkaProducer<byte[], byte[]>(props);
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<byte[], byte[]>(topic,message.getBytes("UTF-8"));
RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
//getting RecordMetadata is possible to validate topic, partition and offset
System.out.println("topic where message is published : " + recordMetadata.topic());
System.out.println("partition where message is published : " + recordMetadata.partition());
System.out.println("message offset # : " + recordMetadata.offset());
kafkaProducer.close();
клиентская версия kafka - 0.90
Когда я обернуть этот код в J2EE MDB и развернуть его на BPM-сервере. Когда запускается MDB, поток продолжает ожидать send (): -
RecordMetadata recordMetadata = kafkaProducer.send (providerRecord).get();
и время ожидания через некоторое время. Это не вызывает никаких других исключений или ошибок.
Мой вопрос заключается в том, что для того, чтобы использовать клиентский API kafka из BPM, нужно ли выполнять какие-либо дополнительные действия, такие как настройка ssl и т. Д.
Пожалуйста, порекомендуйте.
Благодарю.