Преобразование сообщений AvroSchema в приложении Spring Cloud Dataflow Sink
Я пытаюсь использовать Преобразование сообщений на основе схемы Avro в приложении Spring Cloud Stream, которое является приемником. Из ошибок не могу понять, зачем нужен дополнительный канал, потребитель 'redis-sink:0.input'.
Я развернул его с помощью scdf-сервера в Куберне, у меня есть другие приложения, которые не используют преобразование схемы avro, которые работают без каких-либо проблем.
@Configuration
@EnableConfigurationProperties({ RedisSinkProperties.class, EventAggregatorProperties.class })
@EnableBinding(Sink.class)
public class RedisSinkConfiguration {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private RedisSinkProperties redisSinkProperties;
@Autowired
private EventAggregatorProperties eventAggregatorProperties;
// @StreamListener(Sink.INPUT)
// public void receive(Object arg0) throws Exception {
// System.out.println(eventAggregatorProperties);
// System.out.println("this is working finally" + arg0);
//
// }
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
private String endpoint;
@Bean
@Order(-100)
public ConfluentSchemaRegistryClient confluentSchemaRegistryClient() {
System.out.println("registry being initialized");
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
@Bean
@ServiceActivator(inputChannel = Sink.INPUT)
public MessageHandler redisSinkMessageHandler() {
System.out.println(redisSinkProperties);
System.out.println(redisConnectionFactory);
System.out.println(eventAggregatorProperties);
System.out.println("my thing is being picked now again");
EventAggregatorMessageHandler eventHandler = new EventAggregatorMessageHandler(redisConnectionFactory,
eventAggregatorProperties, confluentSchemaRegistryClient());
eventHandler.setLoggingEnabled(true);
return eventHandler;
// }
// if (redisSinkProperties.isKey()) {
// RedisStoreWritingMessageHandler redisStoreWritingMessageHandler = new
// RedisStoreWritingMessageHandler(
// this.redisConnectionFactory);
// redisStoreWritingMessageHandler.setKeyExpression(this.redisSinkProperties.keyExpression());
// return redisStoreWritingMessageHandler;
// }
// else if (this.redisSinkProperties.isQueue()) {
// return new
// RedisQueueOutboundChannelAdapter(this.redisSinkProperties.queueExpression(),
// this.redisConnectionFactory);
// } else { // must be topic
// RedisPublishingMessageHandler redisPublishingMessageHandler = new
// RedisPublishingMessageHandler(
// this.redisConnectionFactory);
// redisPublishingMessageHandler.setTopicExpression(this.redisSinkProperties.topicExpression());
// return redisPublishingMessageHandler;
// }
// }
}
@Bean
public MessageConverter customMessageConverter() throws IOException {
ConfluentSchemaRegistryClientMessageConverter converter = new ConfluentSchemaRegistryClientMessageConverter(
(ConfluentSchemaRegistryClient) confluentSchemaRegistryClient);
converter.setDynamicSchemaGenerationEnabled(true);
return converter;
}
}
Свойства, переданные в приложение:
app.eventaggregator.spring.cloud.stream.kafka.bindings.input.consumer.resetOffsets=true
app.eventaggregator.spring.cloud.stream.kafka.bindings.input.consumer.startOffset=latest
app.eventaggregator.spring.cloud.stream.schemaRegistryClient.endpoint=http://stage3-avro-schema-registry-schema-registry.kafka:8081
app.eventaggregator.spring.redis.host=redis.prod-environment
app.eventaggregator.spring.redis.database=1
app.eventaggregator.redis.isEventStream=TRUE
app.eventaggregator.redis.topic=test
app.eventaggregator.logging.level.=ERROR
app.eventaggregator.stage3.eventsToFilter=APP_LAUNCH,PAGE_VIEW
app.eventaggregator.stage3.ttlMs=36000,36000
app.eventaggregator.stage3.initialRandomValueStart=0,222
app.eventaggregator.stage3.initialRandomValueEnd=0,300
app.eventaggregator.stage3.redisKeyNames=APP_LAUNCH,PAGE_VIEW
app.eventaggregator.spring.cloud.stream.default.contentType=application/vnd.stagingClickStream.v1+avro
app.log.spring.cloud.stream.kafka.bindings.input.consumer.resetOffsets=true
app.log.spring.cloud.stream.kafka.bindings.input.consumer.startOffset=earliest
app.log.spring.cloud.stream.kafka.bindings.schemaRegistryClient.endpoint=http://stage3-avro-schema-registry-schema-registry.kafka:8081
app.log.logging.level.=ERROR
И я получаю следующую ошибку в журнале:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.5.2.RELEASE)
registry being initialized
RedisSinkProperties(parser=org.springframework.expression.spel.standard.SpelExpressionParser@34c01041, topicExpression=null, queueExpression=null, keyExpression=null, key=null, queue=null, topic=test, isEventStream=false)
org.springframework.data.redis.connection.jedis.JedisConnectionFactory@536f2a7e
EventAggregatorProperties(eventsToFilter=[APP_LAUNCH, PAGE_VIEW], ttlMs=[36000, 36000], initialRandomValueStart=[0, 222], initialRandomValueEnd=[0, 300], redisKeyNames=[APP_LAUNCH, PAGE_VIEW])
my thing is being picked
2018-05-21 08:47:56,699 ERROR -kafka-listener-1 o.s.k.l.LoggingErrorHandler:37 - Error while processing: ConsumerRecord(topic = stagingClickStream, partition = 1, offset = 0, key = null, value = [B@38002450)
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'redis-sink:0.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=[Payload byte[193]][Headers={kafka_offset=0, id=2e3128ed-bca6-761a-49e8-44db263ca439, kafka_receivedPartitionId=1, kafka_receivedTopic=stagingClickStream, contentType=application/vnd.stagingClickStream.v1+avro, timestamp=1526892476669}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:286) ~[spring-retry-1.2.0.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:179) ~[spring-retry-1.2.0.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:286) ~[spring-retry-1.2.0.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:179) ~[spring-retry-1.2.0.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597) [spring-kafka-1.0.5.RELEASE.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1800(KafkaMessageListenerContainer.java:222) [spring-kafka-1.0.5.RELEASE.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:772) [spring-kafka-1.0.5.RELEASE.jar!/:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
... 29 common frames omitted
1 ответ
Там происходит что-то странное; это не похоже, что структура обнаруживает @ServiceActivator
аннотаций.
Вскоре после
Creating shared instance of singleton bean 'redisSinkMessageHandler'
мы должны увидеть
Invoking afterPropertiesSet() on bean with name 'redisSinkConfiguration.redisSinkMessageHandler.serviceActivator'
Который является конечной точкой потребителя для обработчика (который подписывается на канал).
Я вижу постпроцессор бина, который создает этот бин, загружаемый в контекст, поэтому сейчас я понятия не имею, почему он не находит аннотацию.
Можете ли вы попробовать переместить этот боб в свой @Configuration
класс, без других аннотаций? А пока попробую повторить.
РЕДАКТИРОВАТЬ
Я пытался (и не смог) воспроизвести его с...
@SpringBootApplication
public class So50445264Application {
public static void main(String[] args) {
SpringApplication.run(So50445264Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel input) {
return args -> {
input.send(new GenericMessage<>("foo"));
};
}
}
а также
@Configuration
@EnableBinding(Sink.class)
@EnableConfigurationProperties(MyProperties.class)
public class MyConfig {
@Autowired
private MyProperties myProps;
@ServiceActivator(inputChannel = Sink.INPUT)
@Bean
public MessageHandler mh() {
return new AbstractReplyProducingMessageHandler() {
@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
System.out.println(requestMessage.getPayload() + ":" + myProps.getFoo());
return null;
}
};
}
}
а также
@ConfigurationProperties(prefix = "foo")
public class MyProperties {
private String foo;
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
}
а также
foo.foo=bar
но это работало нормально...
foo:bar
Я тестировал с загрузкой 1.5.12 и <spring-cloud.version>Edgware.SR3</spring-cloud.version>
(а также 2.0.2 и Finchley.BUILD-SNAPSHOT).
Я вижу, вы используете довольно старую загрузку (и, вероятно, облако тоже). Можете ли вы попробовать с более новыми версиями?