Spring Data Mongodb - Как сделать Bulk Upsert

У меня есть список объектов / доменов из той же коллекции, которые должны быть вставлены, если они отсутствуют в базе данных Mongodb, в противном случае он должен обновить фильтр существующих записей путем _id,

Хотя это может быть сделано с использованием данных Spring MongoRepositories, но, похоже, это:

  1. Слишком медленно!!! (Может быть, это сохраняет записи по одной!!! Эта операция выполняется навалом???)
  2. Если есть DuplicateKeyException, он прекращает выполнение (не сохраняя следующие записи, хотя я хочу игнорировать такие исключения)

Теперь в случае, если я использую BulkOperations bulkOps = mongoTemplate.bulkOps(BulkMode.UNORDERED, collectionName), он все еще вызывает то же исключение, если нарушается целостность!!! Хотя, согласно Spring Doc, используя BulkMode.UNORDERED:

Выполняйте массовые операции параллельно. Обработка будет продолжена при ошибках.

Рассмотрим ниже код:

BulkOperations bulkOps = mongoTemplate.bulkOps(BulkMode.UNORDERED, className);

        for (T entry : entries) {
            DBObject dbDoc = new BasicDBObject();
            mongoTemplate.getConverter().write(entry, dbDoc);
            Update update = Update.fromDBObject(dbDoc);
            bulkOps.updateOne(new Query(Criteria.where("_id").is(dbDoc.get("_id"))), update);
        }
        BulkWriteResult result = bulkOps.execute();

Когда я выполняю код выше, происходит следующее исключение:

java.lang.IllegalArgumentException: Invalid BSON field name _class
at org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:516)
at com.mongodb.DBObjectCodec.encode(DBObjectCodec.java:129)
at com.mongodb.DBObjectCodec.encode(DBObjectCodec.java:61)
at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63)
at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)
at com.mongodb.connection.UpdateCommandMessage.writeTheWrites(UpdateCommandMessage.java:85)
at com.mongodb.connection.UpdateCommandMessage.writeTheWrites(UpdateCommandMessage.java:43)
at com.mongodb.connection.BaseWriteCommandMessage.encodeMessageBodyWithMetadata(BaseWriteCommandMessage.java:129)
at com.mongodb.connection.RequestMessage.encodeWithMetadata(RequestMessage.java:160)
at com.mongodb.connection.WriteCommandProtocol.sendMessage(WriteCommandProtocol.java:220)
at com.mongodb.connection.WriteCommandProtocol.execute(WriteCommandProtocol.java:101)
at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:64)
at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:37)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:168)
at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:289)
at com.mongodb.connection.DefaultServerConnection.updateCommand(DefaultServerConnection.java:143)
at com.mongodb.operation.MixedBulkWriteOperation$Run$3.executeWriteCommandProtocol(MixedBulkWriteOperation.java:490)
at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:656)
at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:409)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:177)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:426)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:417)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
at com.mongodb.Mongo.execute(Mongo.java:845)
at com.mongodb.Mongo$2.execute(Mongo.java:828)
at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:2309)
at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:2302)
at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:121)
at org.springframework.data.mongodb.core.DefaultBulkOperations.execute(DefaultBulkOperations.java:278)
at com.nayapay.biller.service.UtilityService.saveInBulk(UtilityService.java:122)
at com.nayapay.biller.service.xxx.xxx(xxx.java:447)
at com.nayapay.biller.service.xxx.xxx(xxx.java:172)
at com.nayapay.biller.controller.xxx.updateBatch(xxx.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:967)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:901)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:872)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:661)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:317)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:214)
at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:177)
at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:478)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:80)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:799)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1457)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)


at java.lang.Thread.run(Unknown Source)
[ERROR] 2017-11-16 10:52:26.904 [http-nio-127.0.0.1-8084-exec-10] ExceptionController - Error occured: Invalid BSON field name _class

Если я исключу _id и или _classТакое же исключение произойдет для других полей!!

Итак, мои вопросы:

  1. Метод MongoRepository save(итеративный список) выполняется как массив??
  2. Как сохранить список навалом(не глядя, если документ уже существует)
  3. Как игнорировать исключения целостности данных, такие как DuplicateKeyException

Примечание. Идентификатор документа устанавливается приложением, а не базой данных (это поможет в целях фильтрации)

1 ответ

Решение

Вы можете попробовать это, надеюсь, это сработает!

final BulkWriteOperation bulkOps = mongoTemplate.getCollection(className).initializeUnorderedBulkOperation();

        entries.stream().filter(entry -> entry != null).forEach(entry -> {
            DBObject dbDoc = new BasicDBObject();
            mongoTemplate.getConverter().write(entry, dbDoc);
            dbDoc.removeField("_id");
            BulkWriteRequestBuilder bulkWriteRequestBuilder = bulkOps.find(criteria.apply(dbDoc).getCriteriaObject());
            BulkUpdateRequestBuilder upsertReq = bulkWriteRequestBuilder.upsert();
            upsertReq.replaceOne(dbDoc);
        });

        BulkWriteResult result = bulkOps.execute();
Другие вопросы по тегам