Scala - Как читать сообщение MQ, длина которого превышает 4096 символов
Информация о приложении IBM MQ 9.2Cloudera CDP 7.1.6 Spark 2.4.5
Я обновляю искровый код со Spark 1.6 до Spark 2.4.5. У меня есть json-контент (сложная схема), отправляемый в очередь MQ, длина сообщения которого превышает 4096. Я могу напрямую читать json-файл с тем же содержимым, но когда тот же контент отправляется в MQ, я получил поврежденную запись, когда я попробуйте распечатать схему, используя приведенный ниже код.
val myMsg = JmsStreamUtils.createAsynchronousJmsQueueStream(ssc, MQConsumerFactory(host,port.toInt, qm, qn, user, credentials, qc), converter, Session.AUTO_ACKNOWLEDGE, StorageLevel.MEMORY_AND_DISK_SER)
myMsg.foreachRDD(rdd => {
val sqlContext = SparkSession.builder.getOrCreate()
val myDS = sqlContext.createDataset(rdd)
val readJson = sqlContext.read.json(myDS)
readJson.printSchema()
rdd.collect().foeach(println)
}
Когда я выдаю
rdd.collect().foreach(println)
, в файле журнала отображается только 4095 символов.
Есть ли какая-нибудь подсказка, что могло быть причиной испорченной записи?
Мой run.sh
APPNAME="$(basename "$PWD")"
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
CDPPATH="/opt/cloudera/parcels/CDH/lib"
MQJARPH="/spark/mqjars"
LOGPH="/sparklogs"
JARLIST="$MQJARPH/MQCredentialUtil.jar,$MQJARPH/spark-core_2.11-1.5.2.logging.jar,$MQJARPH/config-1.3.0.jar,$MQJARPH/com.ibm.dhbcore.jar,$MQJARPH/com.ibm.mq.commonservices.jar,$MQJARPH/com.ibm.mq.headers.jar,$MQJARPH/com.ibm.mq.jar,$MQJARPH/com.ibm.mq.jmqi.jar,$MQJARPH/com.ibm.mqjms.jar,$MQJARPH/com.ibm.mq.pcf.jar,$MQJARPH/connector.jar,$MQJARPH/fscontext.jar,$MQJARPH/guava-15.0-rc1.jar,$MQJARPH/javax.jms.jar,$MQJARPH/jta.jar,$MQJARPH/spark-jms-receiver-0.1.2-s_2.11.jar,$MQJARPH/spark-mq-jms-receiver_2.11-0.0.1-SNAPSHOT.jar"
$CDPPATH/spark/bin/spark-submit --master local[2] --conf spark.ui.enabled=false --jars $JARLIST --packages com.databricks:spark-csv_2.11:1.5.0 --class sparkintegration.SparkMQ "$DIR/target/scala-2.11/spark-mq-jms_2.11-0.0.1-SNAPSHOT.jar" >> $LOGPH/"$APPNAME-application-log.out" 2>> $LOGPH/"$APPNAME-log.out"
1 ответ
Я абсолютно ничего не знаю о Scala или Spark, но г-н Google говорит: Scala работает на JVM, поэтому стеки Java и Scala можно свободно смешивать для полной бесшовной интеграции.
Итак, вы используете файлы Java / MQ JAR ??? Правда???
IBM MQ Labs проделала действительно очень странные вещи с клиентскими библиотеками Java / MQ и JMS / MQ. Клиентская библиотека MQ изначально будет использовать буфер размером 4 КБ для получения сообщения. Если ему не удается получить все сообщение, он увеличит размер буфера до размера сообщения и выполнит получение снова.
Летом 2019 года я написал много-много сообщений об этом в блогах. Это сообщения, связанные с Java / MQ, и есть еще один набор для JMS / MQ.
- https://www.capitalware.com/rl_blog/?p=5619
- https://www.capitalware.com/rl_blog/?p=5598
- https://www.capitalware.com/rl_blog/?p=5647
Попробуйте установить для следующего параметра JVM значение, превышающее размер сообщения, которое вы пытаетесь получить.
т.е.
java -Dcom.ibm.mq.jmqi.defaultMaxMsgSize=250000 blah blah blah
где 250000 - это максимальный размер ваших сообщений. Вы можете использовать любое значение, которое хотите.
Вы должны указать, какую версию файлов MQ / Java JAR вы используете. Вы можете попробовать другой выпуск файлов MQ / Java JAR, если в тех, которые вы используете, есть ошибка.