Исключение выдается из Apache Camel Netty Consumer, когда данные отправляются несколькими клиентами

Я создал клиенты для Android, которые будут захватывать записи данных вызовов (CDR) и загружать их в Netty Consumer. По сути, клиент записывает все детали входящего и исходящего вызова, например, когда был принят вызов и когда был сделан вызов.

Эти записи хранятся на устройстве, и у меня есть запланированная служба, которая затем проверяет сетевое подключение и загружает эти записи в созданную мной службу Netty. Протокол представляет собой простой текст и, таким образом, использует текстовый кодек по умолчанию. Сообщение в основном представляет собой строку с разделителем канала, которая содержит запись CDR.

Пример сообщения выглядит так:

UPF|355015019794972|6|OUTGOING|VOICECALL|+123456789|20120422151557670

Это сообщение принимается потребителем netty, а затем разделяется на различные поля и вставляется в базу данных MySQL. Все это рабочий пример работы вставки в журнале выглядит так

Setting values for SQL statement
Setting values for SQL statement 1: 355015019794972
Setting values for SQL statement 2: 6
Setting values for SQL statement 3: OUTGOING
Setting values for SQL statement 4: VOICECALL
Setting values for SQL statement 5: +123456789
Setting values for SQL statement 6: 20120422151557670
Preparing to Execute SQL statement
Executed SQL statement
SENDING OK

Я проверил это на одном устройстве, и оно работает очень хорошо. Однако, когда я подключаю второе устройство, и оба устройства пытаются загрузить одновременно, выдается следующее исключение:

[    New I/O server worker #1-2] ServerChannelHandler           WARN  Closing channel            as an exception was thrown from Netty
java.lang.IndexOutOfBoundsException
    at org.jboss.netty.buffer.AbstractChannelBuffer.checkReadableBytes(AbstractChannelBuffer.java:657)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
    at org.jboss.netty.buffer.AbstractChannelBuffer.readBytes(AbstractChannelBuffer.java:302)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
    at org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:188)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:214)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)[org.apache.servicemix.bundles.netty-3.2.6.Final_1.jar:]
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_22]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_22]
    at java.lang.Thread.run(Thread.java:662)[:1.6.0_22]

Это происходит только тогда, когда более чем одно устройство пытается загрузить одновременно, т.е. одновременные загрузки. По существу, когда это происходит, служба загрузки на обоих устройствах перестает работать. Я пробовал это с физическими устройствами и эмулятором Android, и я получаю те же результаты. Из моих ограниченных знаний о компоненте Netty и Netty в целом, кажется, что входные потоки с обоих устройств как-то перепутаны, когда оба загружают.

Вот мой верблюжий маршрут.

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="
   http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
   http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
   <package>com.hia.mamtomcomserver</package>
 <errorHandler id="defaultEH" type="DefaultErrorHandler" >
            <redeliveryPolicy
                 maximumRedeliveries="5"
                 retryAttemptedLogLevel="WARN"
                 backOffMultiplier="1"
                 useExponentialBackOff="true"
             />
 </errorHandler>
 <threadPool id="messageToDatabasePool" threadName="msgToDBThread" poolSize="1" maxPoolSize="1" maxQueueSize="1024"/>
 <route>
    <from uri="netty:tcp://192.168.2.104:5150?textline=true&amp;sync=true&amp;transferExchange=true;&amp;receiveBufferSize=262144000&amp;decoderMaxLineLength=262144000"/>


   <!--<threads executorServiceRef="messageToDatabasePool">-->
        <to uri="log://exchange.getIn().getHeaders()"/>
        <to uri="bean://streamtodb"/>
   <!-- </threads >-->
</route>
 </camelContext>
  <bean id="streamtodb" class="com.hia.mamtomcomserver.StreamToDatabase">
      <property name="dataSource" ref="dataSource"/>
   </bean>
   <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource">
     <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
     <property name="url" value="jdbc:mysql://127.0.0.1:3306/MamTomDB"/>
     <property name="username" value="canttellyou"/>
     <property name="password" value="canttellyou"/>
     <property name="initialSize" value="1"/>
     <property name="maxActive" value="4"/>
   </bean>
  </beans>

Bean-компонент streamtodb просто разбивает строку на поля, а затем вставляет данные в базу данных. После загрузки он отправит обратно сообщение об отказе на устройство Android, код, который отвечает, выглядит следующим образом

if(Complete==true){
  exchange.getOut().setBody("O|"+Fields[2]);
  System.out.println("SENDING OK");
}
else
{
   exchange.getOut().setBody("F|"+Fields[2]);
   System.out.println("SENDING FAILED");
}

Итак, я думаю, что мой вопрос заключается в том, как я могу загрузить более одного устройства одновременно в сервис Netty? Я подозреваю, что не очень хорошо понимаю основную концепцию.

ОБНОВИТЬ:

Я заменил компонент Netty на компонент MINA, и сервер будет выполнять одновременную загрузку. Поэтому, когда я меняю маршрут на:

<from uri="mina:tcp://192.168.2.104:5150?sync=true&amp;textline=true&amp;decoderMaxLineLength=262144000"/>

Оно работает. Я бы изменил на MINA, но мне нужна поддержка SSL в Netty, чтобы мои данные были в безопасности.

ОБНОВЛЕНИЕ #2 Перечисление моих зависимостей POM:

org.apache.camel camel-core 2.9.0 org.apache.camel camel-spring 2.9.0

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-netty</artifactId>
    <version>2.9.0</version>
    <!-- use the same version as your Camel core version -->
</dependency>
    <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-mina</artifactId>
    <version>2.9.0</version>
    <!-- use the same version as your Camel core version -->
</dependency>

0 ответов

Другие вопросы по тегам