Извлечение данных из базы данных Mule в куски
Мы пытаемся извлечь ок. 40 ГБ данных из базы данных и хотите создать несколько файлов CSV. Мы использовали соединитель базы данных mule потоковым способом, который возвращает ResultSetIterator.
Q1) Как конвертировать это ResultSetIterator
Arraylist? или любой читаемый формат, который мы можем использовать в дальнейшем для создания файлов
Q2) Мы пытались использовать компонент For-Each для разделения этих данных на куски, он работает для ограниченного набора данных и для предоставления огромных данных SerializationException
В приведенном ниже фрагменте ввода мы создаем фрагменты данных, используя for-each, и предоставляем их для пакетной обработки нескольких файлов.
<batch:job name="testBatchWithDBOutside">
<batch:input>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
</batch:input>
<batch:process-records>
<batch:step name="Batch_Step">
<batch:commit size="10" doc:name="Batch Commit">
<object-to-string-transformer doc:name="Object to String"/>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
<file:outbound-endpoint path="C:\output" outputPattern="#[message.id].txt" responseTimeout="10000" doc:name="File"/>
</batch:commit>
</batch:step>
</batch:process-records>
</batch:job>
<flow name="testBatchWithDBOutsideFlow" processingStrategy="synchronous">
<file:inbound-endpoint path="C:\input" responseTimeout="10000" doc:name="File"/>
<db:select config-ref="MySQL_Configuration" streaming="true" fetchSize="10" doc:name="Database">
<db:parameterized-query><![CDATA[select * from classicmodels]]></db:parameterized-query>
</db:select>
<foreach batchSize="5" doc:name="For Each">
<batch:execute name="testBatchWithDBOutside" doc:name="testBatchWithDBOutside"/>
</foreach>
</flow>
3 ответа
Q1. Вы не хотите конвертировать Iterator в список, так как это лишит цели потоковой передачи из коннектора DB и загрузит все записи в память. В любом случае Mule одинаково обрабатывает итераторы и списки.
Q2. Пакетный модуль подразумевает для каждой операции. Выходные данные команды batch:input должны быть списком или итератором. Вы должны быть в состоянии упростить это
<batch:job name="testBatch">
<batch:input>
<db:select config-ref="MySQL_Configuration" streaming="true" fetchSize="10" doc:name="Database">
<db:parameterized-query><![CDATA[select * from classicmodels]]></db:parameterized-query>
</db:select>
</batch:input>
<batch:process-records>
<batch:step name="Batch_Step">
<object-to-string-transformer doc:name="Object to String"/>
<file:outbound-endpoint path="C:\output" outputPattern="#[message.id].txt" responseTimeout="10000" doc:name="File"/>
</batch:step>
</batch:process-records>
</batch:job>
Вам также нужно будет заменить объект-в-строку-преобразователь компонентом, который преобразует запись базы данных (полезной нагрузкой в этой точке будет карта, где ключ - это имя столбца, а значение - значение записи) в CSV линия.
Вы можете найти достойный пример в блоге Mule здесь: https://blogs.mulesoft.com/dev/anypoint-platform-dev/batch-module-reloaded/
Другой вариант - удалить пакетный процессор и использовать DataWeave, чтобы сгенерировать вывод в формате csv и передать его в файл. Это может быть полезно: https://docs.mulesoft.com/mule-user-guide/v/3.7/dataweave-streaming
Затем Dataweave будет вызывать ResultSetIterator по мере обработки каждой записи, и этот итератор будет обрабатывать выбор фрагментов записей из базовой базы данных, поэтому между этапами нет очереди или загрузка полного набора данных в память.
<flow name="batchtestFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/batch" allowedMethods="GET" doc:name="HTTP"/>
<db:select config-ref="Generic_Database_Configuration" streaming="true" doc:name="Database">
<db:parameterized-query><![CDATA[select * from Employees]]></db:parameterized-query>
</db:select>
<dw:transform-message doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%input payload application/java
%output application/csv streaming=true, header=true, quoteValues=true
---
payload map ((e, i) -> {
surname: e.SURNAME,
firstname: e.FIRST_NAME
})]]></dw:set-payload>
</dw:transform-message>
<file:outbound-endpoint path="C:/tmp" outputPattern="testbatchfile.csv" connector-ref="File" responseTimeout="10000" doc:name="File"/>
</flow>
Вы хотите использовать OutputHander. Убедитесь, что потоковая передача включена, а затем используйте компонент сценария, например, выберите groovy и обрабатывайте каждую строку по очереди, например, так:
// script.groovy
return {evt, out ->
payload.each { row ->
out << row.SOMECOLUMN.... }
} as OutputHandler
И компонент в вашем XML
<scripting:transformer returnClass="TODO" doc:name="ScriptComponent">
<scripting:script engine="Groovy" file="script.groovy" />
</scripting:transformer>
Если вы хотите вернуть какой-то вывод. Однако, если вы хотите записать в файл в вашем случае, вы бы не использовали переменную out
но вместо этого пишите в свои файлы.
Я нашел простой и быстрый способ, как показано ниже:
Здесь коннектор БД находится в режиме потоковой передачи, а For-Each разделяет записи в заданном размере пакета.
<flow name="testFlow" processingStrategy="synchronous">
<composite-source doc:name="Composite Source">
<quartz:inbound-endpoint jobName="test" cronExpression="0 48 13 1/1 * ? *" repeatInterval="0" connector-ref="Quartz" responseTimeout="10000" doc:name="Quartz">
<quartz:event-generator-job/>
</quartz:inbound-endpoint>
<http:listener config-ref="HTTP_Listener_Configuration" path="/hit" doc:name="HTTP"/>
</composite-source>
<db:select config-ref="MySQL_Configuration" streaming="true" fetchSize="10000" doc:name="Database">
<db:parameterized-query><![CDATA[SELECT * FROM tblName]]></db:parameterized-query>
</db:select>
<foreach batchSize="10000" doc:name="For Each">
<dw:transform-message doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/csv
---
payload map {
field1:$.InterfaceId,
field2:$.Component
}]]></dw:set-payload>
</dw:transform-message>
<file:outbound-endpoint path="F:\output" outputPattern="#[message.id].csv" responseTimeout="10000" doc:name="File"/>
</foreach>
<set-payload value="*** Success ***" doc:name="Set Payload"/>
</flow>