Исключение выдается из Apache Camel Netty Consumer, когда данные отправляются несколькими клиентами
Я создал клиенты для Android, которые будут захватывать записи данных вызовов (CDR) и загружать их в Netty Consumer. По сути, клиент записывает все детали входящего и исходящего вызова, например, когда был принят вызов и когда был сделан вызов.
Эти записи хранятся на устройстве, и у меня есть запланированная служба, которая затем проверяет сетевое подключение и загружает эти записи в созданную мной службу Netty. Протокол представляет собой простой текст и, таким образом, использует текстовый кодек по умолчанию. Сообщение в основном представляет собой строку с разделителем канала, которая содержит запись CDR.
Пример сообщения выглядит так:
UPF|355015019794972|6|OUTGOING|VOICECALL|+123456789|20120422151557670
Это сообщение принимается потребителем netty, а затем разделяется на различные поля и вставляется в базу данных MySQL. Все это рабочий пример работы вставки в журнале выглядит так
Setting values for SQL statement
Setting values for SQL statement 1: 355015019794972
Setting values for SQL statement 2: 6
Setting values for SQL statement 3: OUTGOING
Setting values for SQL statement 4: VOICECALL
Setting values for SQL statement 5: +123456789
Setting values for SQL statement 6: 20120422151557670
Preparing to Execute SQL statement
Executed SQL statement
SENDING OK
Я проверил это на одном устройстве, и оно работает очень хорошо. Однако, когда я подключаю второе устройство, и оба устройства пытаются загрузить одновременно, выдается следующее исключение:
[ New I/O server worker #1-2] ServerChannelHandler WARN Closing channel as an exception was thrown from Netty
java.lang.IndexOutOfBoundsException
at org.jboss.netty.buffer.AbstractChannelBuffer.checkReadableBytes(AbstractChannelBuffer.java:657)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
at org.jboss.netty.buffer.AbstractChannelBuffer.readBytes(AbstractChannelBuffer.java:302)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
at org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:188)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:214)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_22]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_22]
at java.lang.Thread.run(Thread.java:662)[:1.6.0_22]
Это происходит только тогда, когда более чем одно устройство пытается загрузить одновременно, т.е. одновременные загрузки. По существу, когда это происходит, служба загрузки на обоих устройствах перестает работать. Я пробовал это с физическими устройствами и эмулятором Android, и я получаю те же результаты. Из моих ограниченных знаний о компоненте Netty и Netty в целом, кажется, что входные потоки с обоих устройств как-то перепутаны, когда оба загружают.
Вот мой верблюжий маршрут.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelContext xmlns="http://camel.apache.org/schema/spring">
<package>com.hia.mamtomcomserver</package>
<errorHandler id="defaultEH" type="DefaultErrorHandler" >
<redeliveryPolicy
maximumRedeliveries="5"
retryAttemptedLogLevel="WARN"
backOffMultiplier="1"
useExponentialBackOff="true"
/>
</errorHandler>
<threadPool id="messageToDatabasePool" threadName="msgToDBThread" poolSize="1" maxPoolSize="1" maxQueueSize="1024"/>
<route>
<from uri="netty:tcp://192.168.2.104:5150?textline=true&sync=true&transferExchange=true;&receiveBufferSize=262144000&decoderMaxLineLength=262144000"/>
<!--<threads executorServiceRef="messageToDatabasePool">-->
<to uri="log://exchange.getIn().getHeaders()"/>
<to uri="bean://streamtodb"/>
<!-- </threads >-->
</route>
</camelContext>
<bean id="streamtodb" class="com.hia.mamtomcomserver.StreamToDatabase">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3306/MamTomDB"/>
<property name="username" value="canttellyou"/>
<property name="password" value="canttellyou"/>
<property name="initialSize" value="1"/>
<property name="maxActive" value="4"/>
</bean>
</beans>
Bean-компонент streamtodb просто разбивает строку на поля, а затем вставляет данные в базу данных. После загрузки он отправит обратно сообщение об отказе на устройство Android, код, который отвечает, выглядит следующим образом
if(Complete==true){
exchange.getOut().setBody("O|"+Fields[2]);
System.out.println("SENDING OK");
}
else
{
exchange.getOut().setBody("F|"+Fields[2]);
System.out.println("SENDING FAILED");
}
Итак, я думаю, что мой вопрос заключается в том, как я могу загрузить более одного устройства одновременно в сервис Netty? Я подозреваю, что не очень хорошо понимаю основную концепцию.
ОБНОВИТЬ:
Я заменил компонент Netty на компонент MINA, и сервер будет выполнять одновременную загрузку. Поэтому, когда я меняю маршрут на:
<from uri="mina:tcp://192.168.2.104:5150?sync=true&textline=true&decoderMaxLineLength=262144000"/>
Оно работает. Я бы изменил на MINA, но мне нужна поддержка SSL в Netty, чтобы мои данные были в безопасности.
ОБНОВЛЕНИЕ #2 Перечисление моих зависимостей POM:
org.apache.camel camel-core 2.9.0 org.apache.camel camel-spring 2.9.0
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-netty</artifactId>
<version>2.9.0</version>
<!-- use the same version as your Camel core version -->
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-mina</artifactId>
<version>2.9.0</version>
<!-- use the same version as your Camel core version -->
</dependency>