Извлечение данных из базы данных Mule в куски

Мы пытаемся извлечь ок. 40 ГБ данных из базы данных и хотите создать несколько файлов CSV. Мы использовали соединитель базы данных mule потоковым способом, который возвращает ResultSetIterator.

Q1) Как конвертировать это ResultSetIterator Arraylist? или любой читаемый формат, который мы можем использовать в дальнейшем для создания файлов

Q2) Мы пытались использовать компонент For-Each для разделения этих данных на куски, он работает для ограниченного набора данных и для предоставления огромных данных SerializationException

В приведенном ниже фрагменте ввода мы создаем фрагменты данных, используя for-each, и предоставляем их для пакетной обработки нескольких файлов.

  <batch:job name="testBatchWithDBOutside">
        <batch:input>
            <logger message="#[payload]" level="INFO" doc:name="Logger"/>
        </batch:input>
        <batch:process-records>
            <batch:step name="Batch_Step">
                <batch:commit size="10" doc:name="Batch Commit">
                    <object-to-string-transformer doc:name="Object to String"/>
                    <logger message="#[payload]" level="INFO" doc:name="Logger"/>
                    <file:outbound-endpoint path="C:\output" outputPattern="#[message.id].txt" responseTimeout="10000" doc:name="File"/>
                </batch:commit>
            </batch:step>
        </batch:process-records>
    </batch:job>
    <flow name="testBatchWithDBOutsideFlow" processingStrategy="synchronous">
        <file:inbound-endpoint path="C:\input" responseTimeout="10000" doc:name="File"/>
        <db:select config-ref="MySQL_Configuration" streaming="true" fetchSize="10" doc:name="Database">
            <db:parameterized-query><![CDATA[select * from classicmodels]]></db:parameterized-query>
        </db:select>
        <foreach batchSize="5" doc:name="For Each">
            <batch:execute name="testBatchWithDBOutside" doc:name="testBatchWithDBOutside"/>
        </foreach>
    </flow>

3 ответа

Q1. Вы не хотите конвертировать Iterator в список, так как это лишит цели потоковой передачи из коннектора DB и загрузит все записи в память. В любом случае Mule одинаково обрабатывает итераторы и списки.

Q2. Пакетный модуль подразумевает для каждой операции. Выходные данные команды batch:input должны быть списком или итератором. Вы должны быть в состоянии упростить это

<batch:job name="testBatch">
    <batch:input>
        <db:select config-ref="MySQL_Configuration" streaming="true" fetchSize="10" doc:name="Database">
            <db:parameterized-query><![CDATA[select * from classicmodels]]></db:parameterized-query>
        </db:select>
    </batch:input>
    <batch:process-records>
        <batch:step name="Batch_Step">
            <object-to-string-transformer doc:name="Object to String"/>
            <file:outbound-endpoint path="C:\output" outputPattern="#[message.id].txt" responseTimeout="10000" doc:name="File"/>
        </batch:step>
    </batch:process-records>
</batch:job>

Вам также нужно будет заменить объект-в-строку-преобразователь компонентом, который преобразует запись базы данных (полезной нагрузкой в ​​этой точке будет карта, где ключ - это имя столбца, а значение - значение записи) в CSV линия.

Вы можете найти достойный пример в блоге Mule здесь: https://blogs.mulesoft.com/dev/anypoint-platform-dev/batch-module-reloaded/

Другой вариант - удалить пакетный процессор и использовать DataWeave, чтобы сгенерировать вывод в формате csv и передать его в файл. Это может быть полезно: https://docs.mulesoft.com/mule-user-guide/v/3.7/dataweave-streaming

Затем Dataweave будет вызывать ResultSetIterator по мере обработки каждой записи, и этот итератор будет обрабатывать выбор фрагментов записей из базовой базы данных, поэтому между этапами нет очереди или загрузка полного набора данных в память.

<flow name="batchtestFlow">
    <http:listener config-ref="HTTP_Listener_Configuration" path="/batch" allowedMethods="GET" doc:name="HTTP"/>
    <db:select config-ref="Generic_Database_Configuration" streaming="true" doc:name="Database">
        <db:parameterized-query><![CDATA[select * from Employees]]></db:parameterized-query>
    </db:select>
    <dw:transform-message doc:name="Transform Message">
        <dw:set-payload><![CDATA[%dw 1.0
            %input payload application/java
            %output application/csv streaming=true, header=true, quoteValues=true
            ---
            payload map ((e, i) -> {
                surname: e.SURNAME,
                firstname: e.FIRST_NAME
            })]]></dw:set-payload>
    </dw:transform-message>
    <file:outbound-endpoint path="C:/tmp" outputPattern="testbatchfile.csv" connector-ref="File" responseTimeout="10000" doc:name="File"/>
</flow>

Вы хотите использовать OutputHander. Убедитесь, что потоковая передача включена, а затем используйте компонент сценария, например, выберите groovy и обрабатывайте каждую строку по очереди, например, так:

// script.groovy
return {evt, out ->

 payload.each { row ->
  out << row.SOMECOLUMN....  }

} as OutputHandler

И компонент в вашем XML

    <scripting:transformer returnClass="TODO" doc:name="ScriptComponent">
        <scripting:script engine="Groovy" file="script.groovy" />
    </scripting:transformer>

Если вы хотите вернуть какой-то вывод. Однако, если вы хотите записать в файл в вашем случае, вы бы не использовали переменную out но вместо этого пишите в свои файлы.

Я нашел простой и быстрый способ, как показано ниже:

Здесь коннектор БД находится в режиме потоковой передачи, а For-Each разделяет записи в заданном размере пакета.

<flow name="testFlow" processingStrategy="synchronous">
        <composite-source doc:name="Composite Source">
            <quartz:inbound-endpoint jobName="test" cronExpression="0 48 13 1/1 * ? *" repeatInterval="0" connector-ref="Quartz" responseTimeout="10000" doc:name="Quartz">
                <quartz:event-generator-job/>
            </quartz:inbound-endpoint>
            <http:listener config-ref="HTTP_Listener_Configuration" path="/hit" doc:name="HTTP"/>
        </composite-source>
        <db:select config-ref="MySQL_Configuration" streaming="true" fetchSize="10000" doc:name="Database">
            <db:parameterized-query><![CDATA[SELECT * FROM tblName]]></db:parameterized-query>
        </db:select>
         <foreach batchSize="10000" doc:name="For Each">

                    <dw:transform-message doc:name="Transform Message">
                                           <dw:set-payload><![CDATA[%dw 1.0
                   %output application/csv
                   ---
                   payload map {
                    field1:$.InterfaceId,
                    field2:$.Component

                   }]]></dw:set-payload>
                    </dw:transform-message>
               <file:outbound-endpoint path="F:\output" outputPattern="#[message.id].csv" responseTimeout="10000" doc:name="File"/>
        </foreach>
        <set-payload value="*** Success ***" doc:name="Set Payload"/>

    </flow>
Другие вопросы по тегам