Как использовать AMQP с Active MQ в Mule
Привет, я работаю с Mule Any Point Studio. Я хочу определить одно имя очереди и из этой очереди я хочу читать данные с помощью AMQP. Это не опрос данных из очереди, о которой я упоминал.
Мой поток мулов:
<amqp:connector name="amqpConnector" doc:name="AMQP Connector" host="localhost" port="5672" username="admin" password="admin" validateConnections="true" ></amqp:connector>
<flow name="mule-ampq" doc:name="mule-ampq">
<amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="newx" queueAutoDelete="true" connector-ref="amqpConnector" doc:name="AMQP" exchangeType="fanout" responseTimeout="10000"/>
<logger message="#[message.payload]" level="INFO" doc:name="Logger"/>
</flow>
Я получаю следующую ошибку:
ERROR 2014-10-16 15:54:44,452 [main] org.mule.module.launcher.DefaultArchiveDeployer:
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ Failed to deploy artifact 'mule-ampq', see below +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
org.mule.module.launcher.DeploymentStartException: EOFException:
at org.mule.module.launcher.application.DefaultMuleApplication.start(DefaultMuleApplication.java:143)
at org.mule.module.launcher.artifact.ArtifactWrapper$4.execute(ArtifactWrapper.java:98)
at org.mule.module.launcher.artifact.ArtifactWrapper.executeWithinArtifactClassLoader(ArtifactWrapper.java:129)
at org.mule.module.launcher.artifact.ArtifactWrapper.start(ArtifactWrapper.java:93)
at org.mule.module.launcher.DefaultArtifactDeployer.deploy(DefaultArtifactDeployer.java:26)
at org.mule.module.launcher.DefaultArchiveDeployer.guardedDeploy(DefaultArchiveDeployer.java:274)
at org.mule.module.launcher.DefaultArchiveDeployer.deployArtifact(DefaultArchiveDeployer.java:294)
at org.mule.module.launcher.DefaultArchiveDeployer.deployExplodedApp(DefaultArchiveDeployer.java:261)
at org.mule.module.launcher.DefaultArchiveDeployer.deployExplodedArtifact(DefaultArchiveDeployer.java:110)
at org.mule.module.launcher.DeploymentDirectoryWatcher.deployExplodedApps(DeploymentDirectoryWatcher.java:287)
at org.mule.module.launcher.DeploymentDirectoryWatcher.start(DeploymentDirectoryWatcher.java:148)
at org.mule.tooling.server.application.ApplicationDeployer.main(ApplicationDeployer.java:130)
Caused by: org.mule.retry.RetryPolicyExhaustedException: null
at org.mule.retry.policies.AbstractPolicyTemplate.execute(AbstractPolicyTemplate.java:101)
at org.mule.transport.AbstractConnector.connect(AbstractConnector.java:1621)
at org.mule.transport.AbstractConnector.start(AbstractConnector.java:424)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.mule.lifecycle.phases.DefaultLifecyclePhase.applyLifecycle(DefaultLifecyclePhase.java:237)
at org.mule.lifecycle.RegistryLifecycleManager$RegistryLifecycleCallback.onTransition(RegistryLifecycleManager.java:273)
at org.mule.lifecycle.RegistryLifecycleManager.invokePhase(RegistryLifecycleManager.java:152)
at org.mule.lifecycle.RegistryLifecycleManager.fireLifecycle(RegistryLifecycleManager.java:123)
at org.mule.registry.AbstractRegistryBroker.fireLifecycle(AbstractRegistryBroker.java:76)
at org.mule.registry.MuleRegistryHelper.fireLifecycle(MuleRegistryHelper.java:136)
at org.mule.lifecycle.MuleContextLifecycleManager$MuleContextLifecycleCallback.onTransition(MuleContextLifecycleManager.java:91)
at org.mule.lifecycle.MuleContextLifecycleManager$MuleContextLifecycleCallback.onTransition(MuleContextLifecycleManager.java:87)
at org.mule.lifecycle.MuleContextLifecycleManager.invokePhase(MuleContextLifecycleManager.java:69)
at org.mule.lifecycle.MuleContextLifecycleManager.fireLifecycle(MuleContextLifecycleManager.java:61)
at org.mule.DefaultMuleContext.start(DefaultMuleContext.java:278)
at org.mule.module.launcher.application.DefaultMuleApplication.start(DefaultMuleApplication.java:123)
... 11 more
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:107)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:259)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:383)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:403)
at org.mule.transport.amqp.AmqpConnector.connectToFirstResponsiveBroker(AmqpConnector.java:443)
at org.mule.transport.amqp.AmqpConnector.doConnect(AmqpConnector.java:365)
at org.mule.transport.AbstractConnector$5.doWork(AbstractConnector.java:1561)
at org.mule.retry.policies.AbstractPolicyTemplate.execute(AbstractPolicyTemplate.java:63)
... 29 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:328)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:244)
... 35 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:104)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:141)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:402)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:430)
INFO 2014-10-16 15:54:44,455 [main] org.mule.module.launcher.DeploymentDirectoryWatcher:
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ Mule is up and kicking (every 5000ms) +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 ответ
Попробуйте этот пример... Он содержит другой тип обмена, используемый AMQP . Для Direct Messaging вам не нужно явно создавать очередь в RabitMQ... она будет создана автоматически:
<http:connector name="HttpConnector" doc:name="HTTP\HTTPS"/>
<amqp:connector name="amqpConnector" activeDeclarationsOnly="true" ackMode="MULE_AUTO" doc:name="AMQP Connector"/>
<amqp:connector name="amqpConnectorManualAck" prefetchCount="1" ackMode="MANUAL" doc:name="AMQP Connector"/>
<amqp:connector name="mandatoryAmqpConnector" mandatory="true" immediate="true" doc:name="AMQP Connector"/>
<!-- Direct Messaging -->
<amqp:connector name="amqp_config" validateConnections="true" virtualHost="/" username="guest" password="guest" doc:name="AMQP Connector"/>
<amqp:endpoint exchangeName="directEx" queueName="directQ" routingKey="routing.key" exchangeType="direct" queueDurable="true" name="amqp_direct_endpoint" responseTimeout="10000" doc:name="AMQP"/>
<!-- Direct Messaging -->
<jbossts:transaction-manager doc:name="Transaction Manager">
<property key="com.arjuna.ats.arjuna.coordinator.defaultTimeout" value="600"></property>
<property key="com.arjuna.ats.arjuna.coordinator.txReaperTimeout" value="1000000"></property>
</jbossts:transaction-manager>
<flow name="DefaultSender" doc:name="DefaultSender" >
<http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="1080" path="orders" doc:name="/orders" doc:description="Process HTTP reqests or responses." connector-ref="HttpConnector"/>
<set-payload value="New Message for Flow1" doc:name="Set Payload"/>
<logger message="Sending Message to Queue inhouseOrder .. Payload is #[message.payload]" level="INFO" category="DefaultSender" doc:name="Payload Logger" />
<amqp:outbound-endpoint exchange-pattern="request-response" exchangeName="directEx" exchangeType="direct" queueDurable="true" queueName="inhouseOrder" connector-ref="amqpConnector" doc:name="Dispatch to inhouseOrder" />
<byte-array-to-object-transformer doc:name="Byte Array to Object"/>
</flow>
<flow name="DefaultReceiver" doc:name="inhouseOrder" processingStrategy="synchronous" >
<amqp:inbound-endpoint queueName="inhouseOrder" connector-ref="amqpConnector" exchangeName="directEx" exchangeType="direct" queueDurable="true" doc:name="inhouseOrder" >
<amqp:transaction action="ALWAYS_BEGIN" recoverStrategy="REQUEUE" />
</amqp:inbound-endpoint>
<byte-array-to-object-transformer doc:name="Byte Array to Object"/>
<logger message="Receiving Message to Queue inhouseOrder .. Payload is #[message.payload]" level="INFO" category="DefaultReceiver" doc:name="Payload Logger" />
</flow>
<flow name="FanoutSenderExample2" doc:name="FanoutSenderExample2">
<http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="1080" path="orders3" doc:name="/orders" doc:description="Process HTTP reqests or responses." connector-ref="HttpConnector"/>
<set-payload value="Fanout Message for Queue accounting" doc:name="Set Payload"/>
<logger message="Sending Payload in FanoutSenderExample2 #[message.payload]" level="INFO" category="FanoutSenderExample2" doc:name="Payload Logger" />
<amqp:outbound-endpoint exchangeName="back-end-processing" exchangeType="fanout" exchangeAutoDelete="false" exchangeDurable="true" queueDurable="true" queueExclusive="false" queueAutoDelete="false" exchange-pattern="one-way" connector-ref="amqpConnector" doc:name="Dispatch to back-end-processing" />
<byte-array-to-object-transformer doc:name="Byte Array to Object"/>
</flow>
<flow name="FanoutReceiverExample2" doc:name="FanoutReceiverExample2">
<amqp:inbound-endpoint exchangeName="back-end-processing" queueName="accounting" exchangeType="fanout" exchangeAutoDelete="false" exchangeDurable="true" queueDurable="true" queueExclusive="false" queueAutoDelete="false" connector-ref="amqpConnector" doc:name="back-end-processing fullfilment queue" />
<byte-array-to-object-transformer doc:name="Byte Array to Object"/>
<logger message="Payload received in FanoutReceiverExample2 is: #[payload]" level="INFO" category="FanoutReceiverExample2" doc:name="Payload Logger" />
</flow>
<!-- Direct Messaging -->
<flow name="Send_Message_Direct" doc:name="Send_Message_Direct">
<http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="1080" doc:name="HTTP" path="orders5"/>
<set-payload value="#['im a Direct message'.getBytes()]" doc:name="Set payload for amqp message as ByteArray"/>
<amqp:outbound-endpoint responseTimeout="10000" doc:name="Send Direct Message" connector-ref="amqp_config" ref="amqp_direct_endpoint"/>
<set-payload value="#['Message Sended']" doc:name="Set payload as String"/>
<logger message="Direct message sended" level="INFO" doc:name="Logger"/>
</flow>
<flow name="Recive_Message_Direct" doc:name="Recive_Message_Direct">
<amqp:inbound-endpoint responseTimeout="10000" doc:name="Recive Direct Message" connector-ref="amqp_config" ref="amqp_direct_endpoint"/>
<byte-array-to-string-transformer doc:name="Transform bytearray message to String"/>
<logger message="I recived a direct message from AMQP: #[payload]" level="INFO" doc:name="Logger"/>
</flow>
<sub-flow name="defaultErrorHandler" doc:name="defaultErrorHandler">
<logger message="Error occurred: #[payload]" level="INFO" doc:name="Log Error"/>
<smtp:outbound-endpoint host="localhost" responseTimeout="10000" doc:name="Send Email to Operations"/>
</sub-flow>
</mule>
Ссылка: - Ссылка: - https://github.com/mulesoft/mule-transport-amqp/blob/master/GUIDE.md#mule-amqp-transport---user-guide