Как регулировать импорт данных в Solr с помощью batchSize

У меня есть требование импортировать большой объем данных из базы данных mysql и индексировать документы (около 1000 документов). В процессе индексирования мне нужно выполнить специальную обработку поля, отправив запросы на расширение на внешний сервер Apache Stanbol. Я настроил свой обработчик dataimport в solrconfig.xml для использования StanbolContentProcessor в цепочке обновлений, как показано ниже;

<updateRequestProcessorChain name="stanbolInterceptor">
    <processor class="com.solr.stanbol.processor.StanbolContentProcessorFactory"/>
    <processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>

<requestHandler name="/dataimport" class="solr.DataImportHandler">   
    <lst name="defaults">  
        <str name="config">data-config.xml</str>
        <str name="update.chain">stanbolInterceptor</str>
    </lst>  
</requestHandler>

Мой пример data-config.xml такой, как показано ниже;

<dataConfig>
    <dataSource type="JdbcDataSource" driver="com.mysql.jdbc.Driver" 
                url="jdbc:mysql://localhost:3306/solrTest" 
                user="test" password="test123" batchSize="1" />
    <document name="stanboldata">
        <entity name="stanbolrequest" query="SELECT * FROM documents">
            <field column="id" name="id" />
            <field column="content" name="content" />
            <field column="title" name="title" />
        </entity>
    </document>
</dataConfig>

При выполнении большого импорта с около 1000 документов мой сервер stanbol выходит из строя, я подозреваю, из-за большой нагрузки от вышеуказанного Solr Stanbolnterceptor. Я хотел бы регулировать импорт данных в пакетном режиме, чтобы Stanbol мог обрабатывать управляемое количество запросов одновременно.

Это достижимо с помощью параметра batchSize в элементе dataSource в data-config?

Может кто-нибудь, пожалуйста, дайте несколько идей по регулированию загрузки данных в Solr?

Это мой пользовательский класс UpdateProcessor, обрабатывающий запросы Stanbol во время /dataimport

public class StanbolContentProcessorFactory extends
        UpdateRequestProcessorFactory {

    public static final String NLP_ORGANIZATION = "nlp_organization";
    public static final String NLP_PERSON = "nlp_person";
    public static final String[] STANBOL_REQUEST_FIELDS = { "title", "content" };
    public static final String STANBOL_ENDPOINT = "http://localhost:8080/enhancer";

    @Override
    public UpdateRequestProcessor getInstance(SolrQueryRequest req,
            SolrQueryResponse res, UpdateRequestProcessor next) {

        return new StanbolContentProcessor(next);
    }

    class StanbolContentProcessor extends UpdateRequestProcessor {

        public StanbolContentProcessor(UpdateRequestProcessor next) {
            super(next);
        }

        @Override
        public void processAdd(AddUpdateCommand cmd) throws IOException {
            SolrInputDocument doc = cmd.getSolrInputDocument();
            String request = "";
            for (String field : STANBOL_REQUEST_FIELDS) {
                if (null != doc.getFieldValue(field)) {
                    request += (String) doc.getFieldValue(field) + ". ";
                }

            }
            try {
                EnhancementResult result = stanbolPost(request, getBaseURI());
                Collection<TextAnnotation> textAnnotations = result
                        .getTextAnnotations();
                // extracting text annotations
                Set<String> personSet = new HashSet<String>();
                Set<String> orgSet = new HashSet<String>();
                for (TextAnnotation text : textAnnotations) {
                    String type = text.getType();
                    String selectedText = text.getSelectedText();

                    if (null != type && null != selectedText) {
                        if (type.equalsIgnoreCase(StanbolConstants.DBPEDIA_PERSON)
                                || type.equalsIgnoreCase(StanbolConstants.FOAF_PERSON)) {
                            personSet.add(selectedText);

                        } else if (type
                                .equalsIgnoreCase(StanbolConstants.DBPEDIA_ORGANIZATION)
                                || type.equalsIgnoreCase(StanbolConstants.FOAF_ORGANIZATION)) {
                            orgSet.add(selectedText);

                        }
                    }
                }
                for (String person : personSet) {
                    doc.addField(NLP_PERSON, person);
                }
                for (String org : orgSet) {
                    doc.addField(NLP_ORGANIZATION, org);
                }
                cmd.solrDoc = doc;
                super.processAdd(cmd);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }

    }

    private EnhancementResult stanbolPost(String request, URI uri) {
        Client client = Client.create();
        WebResource webResource = client.resource(uri);
        ClientResponse response = webResource.type(MediaType.TEXT_PLAIN)
                .accept(new MediaType("application", "rdf+xml"))
                .entity(request, MediaType.TEXT_PLAIN)
                .post(ClientResponse.class);

        int status = response.getStatus();
        if (status != 200 && status != 201 && status != 202) {
            throw new RuntimeException("Failed : HTTP error code : "
                    + response.getStatus());
        }
        String output = response.getEntity(String.class);
        // Parse the RDF model

        Model model = ModelFactory.createDefaultModel();
        StringReader reader = new StringReader(output);
        model.read(reader, null);
        return new EnhancementResult(model);

    }


    private static URI getBaseURI() {
        return UriBuilder.fromUri(STANBOL_ENDPOINT).build();
    }

}

2 ответа

batchSize Параметр используется для получения строк таблицы базы данных в пакетном режиме, чтобы уменьшить использование памяти (часто используется для предотвращения нехватки памяти при запуске обработчика импорта данных). Хотя меньший размер пакета может быть медленнее, этот параметр не имеет намерения влиять на скорость процесса импорта.

Мое предложение было бы ограничить запросы другим способом, таким как использование правила брандмауэра. Если вы используете Linux и имеете доступ к Netfilter, вы можете выполнить что-то вроде следующей команды:

iptables -A INPUT -p tcp --dport 12345 -m limit --limit 10/s -j ACCEPT

Где "12345" - это порт Stanbol, а "10/s" - это количество пакетов в секунду, которое необходимо принять.

Маугли прав, batchsizeне поможет вам с этим. Поскольку большинство людей получили проблему наоборот My dataimport is too slow, please help) в Solr нет ничего подобного. По крайней мере, я ничего не знаю.


Лично я бы не стал настраивать вашу систему Linux, чтобы справиться с регулированием для вас. Если вы переходите от этапа к этапу или когда-то переходите на другой сервер, то вам необходимо об этом помнить. И если люди изменятся в течение жизни вашей системы, они этого не узнают.

Итак, я не знаю код вашего StanbolContentProcessorFactory, но, как уже упоминалось в вашем другом вопросе, это, кажется, пользовательский код. Так как это ваш собственный код, вы можете добавить туда механизм газа. Чтобы более подробно остановиться на этом, мне понадобится немного кода, чтобы посмотреть.


Обновить

У Solr есть гуава от Google, поэтому я бы использовал RateLimiter, как предложено здесь. Если вы строите с Maven, это будет означать, что вы можете использовать provided, Если вы не используете Maven, вам не нужно делать fatjar или помещать гуаву в папку lib Solr.

import com.google.common.util.concurrent.RateLimiter;

public class StanbolContentProcessorFactory extends
    UpdateRequestProcessorFactory {

    // ...

    // add a rate limiter to throttle your requests
    // this setting would allow 10 requests per second
    private RateLimiter throttle = RateLimiter.create(0.1);

    // ...

    private EnhancementResult stanbolPost(String request, URI uri) {
        Client client = Client.create();

        // this will throttle your requests
        throttle.acquire();

        WebResource webResource = client.resource(uri);
        ClientResponse response = webResource.type(MediaType.TEXT_PLAIN)
            .accept(new MediaType("application", "rdf+xml"))
            .entity(request, MediaType.TEXT_PLAIN)
            .post(ClientResponse.class);

        int status = response.getStatus();
        if (status != 200 && status != 201 && status != 202) {
            throw new RuntimeException("Failed : HTTP error code : "
                + response.getStatus());
        }
        String output = response.getEntity(String.class);
        // Parse the RDF model
        Model model = ModelFactory.createDefaultModel();
        StringReader reader = new StringReader(output);
        model.read(reader, null);
        return new EnhancementResult(model);
}
Другие вопросы по тегам