Сбой ансамбля zookeeper распределенного конечного автомата при обработке параллельных областей с ошибкой KeeperErrorCode = BadVersion

Фон:

Диаграмма: диаграмма состояний Statemachine uml

У нас есть обычный конечный автомат, как показано на диаграмме, который отслеживает микросервис Spring-BATCH (развернутый на схеме "источник / процессор / приемник" потоков) для каждого запускаемого пакета.

Мы получаем последовательность вызовов REST для внутреннего запуска событий для каждого идентификатора партии на машинном объекте соответствующей партии. т. е. для идентификатора пакета создается новый объект конечного автомата.

И каждая машина имеет n количество параллельных областей (представляющих куски пружинной партии) также, как показано на диаграмме.

Выполненные вызовы REST используют многопоточную среду, в которой 2 одновременных вызова одного и того же batchId могут приходить для разных региональных идентификаторов состояния BATCHPROCESSING.

До сих пор у нас был один узел (одиночная установка), на котором работал этот микросервис конечного автомата, но теперь мы хотим развернуть его в нескольких экземплярах; принимать REST звонки. Для этого мы хотим представить распределенный конечный автомат. Ниже приведена конфигурация для запуска распределенного конечного автомата.

@Configuration
@EnableStateMachine
public  class StateMachineUMLWayConfiguration extends 
StateMachineConfigurerAdapter<String, String> {

..
..

@Override
public void configure(StateMachineModelConfigurer<String,String> model) 
throws Exception {
    model
        .withModel()
            .factory(stateMachineModelFactory());
}

@Bean
public StateMachineModelFactory<String,String> stateMachineModelFactory() {

    StorehubBatchUmlStateMachineModelFactory factory =null;

    try {
    factory = new StorehubBatchUmlStateMachineModelFactory
    (templateUMLInClasspath,stateMachineEnsemble());
    } catch (Exception e) {
    LOGGER.info("Config's State machine factory got exception 
    :"+factory);
    }
    LOGGER.info("Config's State machine factory method Called:"+factory);

factory.setStateMachineComponentResolver(stateMachineComponentResolver());
    return factory;
}


    @Override
    public void configure(StateMachineConfigurationConfigurer<String, 
String> 
    config) throws Exception {
    config
        .withDistributed()
            .ensemble(stateMachineEnsemble());
}

@Bean
public StateMachineEnsemble<String, String> stateMachineEnsemble() throws 
Exception {
    return new ZookeeperStateMachineEnsemble<String, String>(curatorClient(), "/batchfoo1", true, 512);
}

@Bean
    public CuratorFramework curatorClient() throws Exception {
        CuratorFramework client = 
CuratorFrameworkFactory.builder().defaultData(new byte[0])
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .connectString("localhost:2181").build();
        client.start();
        return client;
    }

Метод сборки StorehubBatchUmlStateMachineModelFactory:

    @Override
    public StateMachineModel<String, String> build(String batchChunkId) {

    Model model = null;
    try {
        model = UmlUtils.getModel(getResourceUri(resolveResource(batchChunkId)).getPath());
    } catch (IOException e) {
        throw new IllegalArgumentException("Cannot build model from resource " + resource + " or location " + location, e);
    }
    UmlModelParser parser = new UmlModelParser(model, this);
    DataHolder dataHolder = parser.parseModel();
    ConfigurationData<String, String> configurationData = new ConfigurationData<String, String>( null, new SyncTaskExecutor(),
            new ConcurrentTaskScheduler() , false, stateMachineEnsemble,
            new ArrayList<StateMachineListener<String, String>>(), false,
            null, null,
            null, null, false,
            null , batchChunkId, null,
            null ) ;
    return new DefaultStateMachineModel<String, String>(configurationData, dataHolder.getStatesData(), dataHolder.getTransitionsData());
}

Создан новый метод уровня пользовательского интерфейса службы вместо DefaultStateMachineService.acquireStateMachine(machineId)

@Override
public StateMachine<String, String> acquireDistributedStateMachine(String machineId, boolean start) {

    synchronized (distributedMachines) {
        DistributedStateMachine<String,String> distributedStateMachine = distributedMachines.get(machineId); 
        StateMachine<String,String> distMachineDelegateX = null;
        if (distributedStateMachine == null) { 

            StateMachine<String, String> machine = stateMachineFactory.getStateMachine(machineId);
            distributedStateMachine = (DistributedStateMachine<String, String>) machine;

        }
        distributedMachines.put(machineId, distributedStateMachine);

        return handleStart(distributedStateMachine, start);
    }
}

Проблема:

Теперь проблема заключается в том, что микросервис, развернутый на одном экземпляре, успешно выполняется даже для событий, полученных им из многопоточной среды, где один поток выполняет вызов события REST, принадлежащий области 1, и одновременно другой поток приходит для области 2 того же пакета. Машина работает синхронно, с успешной параллельной обработкой областей, до последнего состояния, т.е. BATCHCOMPLETED . Также мы проверили на стороне zookeeper, что, наконец, BATCHCOMPLETED STATE записывается в текущей версии узла.

Но, кроме 1-го экземпляра, когда мы храним тот же самый app-jar микро-сервиса, развернутый в каком-то другом месте, чтобы рассматривать его как 2-й экземпляр микро-сервиса, который также теперь работает, чтобы принимать вызовы REST события (скажем, прослушивая другой порт tomcat 9002); это где-то случайно в середине. Этот сбой происходит случайным образом после запуска любого из событий среди параллельных областей и когда ensemble.setState() вызывается изнутри при изменении состояния этого события.

Это дает следующую ошибку:

    [36mo.s.s.support.AbstractStateMachine      [0;39m [2m:[0;39m Interceptors threw exception, skipping state change

org.springframework.statemachine.StateMachineException: Error persisting data; nested exception is org.springframework.statemachine.StateMachineException: Error persisting data; nested exception is org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion
        at org.springframework.statemachine.zookeeper.ZookeeperStateMachineEnsemble.setState(ZookeeperStateMachineEnsemble.java:241) ~[spring-statemachine-zookeeper-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
        at org.springframework.statemachine.ensemble.DistributedStateMachine$LocalStateMachineInterceptor.preStateChange(DistributedStateMachine.java:209) ~[spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.StateMachineInterceptorList.preStateChange(StateMachineInterceptorList.java:101) ~[spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine.callPreStateChangeInterceptors(AbstractStateMachine.java:859) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine.switchToState(AbstractStateMachine.java:880) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine.access$500(AbstractStateMachine.java:81) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine$3.transit(AbstractStateMachine.java:335) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.handleTriggerTrans(DefaultStateMachineExecutor.java:286) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.handleTriggerTrans(DefaultStateMachineExecutor.java:211) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.processTriggerQueue(DefaultStateMachineExecutor.java:449) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.access$200(DefaultStateMachineExecutor.java:65) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor$1.run(DefaultStateMachineExecutor.java:323) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-4.3.13.RELEASE.jar!/:4.3.13.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.scheduleEventQueueProcessing(DefaultStateMachineExecutor.java:352) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.DefaultStateMachineExecutor.execute(DefaultStateMachineExecutor.java:163) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine.sendEventInternal(AbstractStateMachine.java:603) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.support.AbstractStateMachine.sendEvent(AbstractStateMachine.java:218) [spring-statemachine-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
        at org.springframework.statemachine.ensemble.DistributedStateMachine.sendEvent(DistributedStateMachine.java:108) 
..skipping Lines....
Caused by: org.springframework.statemachine.StateMachineException: Error persisting data; nested exception is org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion
    at org.springframework.statemachine.zookeeper.ZookeeperStateMachinePersist.write(ZookeeperStateMachinePersist.java:113) ~[spring-statemachine-zookeeper-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.statemachine.zookeeper.ZookeeperStateMachinePersist.write(ZookeeperStateMachinePersist.java:50) ~[spring-statemachine-zookeeper-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.statemachine.zookeeper.ZookeeperStateMachineEnsemble.setState(ZookeeperStateMachineEnsemble.java:235) ~[spring-statemachine-zookeeper-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    ... 73 common frames omitted
Caused by: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion
at org.apache.zookeeper.KeeperException.create(KeeperException.java:115) ~[zookeeper-3.4.8.jar!/:3.4.8--1]
at org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1006) ~[zookeeper-3.4.8.jar!/:3.4.8--1]
at org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:910) ~[zookeeper-3.4.8.jar!/:3.4.8--1]
at org.apache.curator.framework.imps.CuratorTransactionImpl.doOperation(CuratorTransactionImpl.java:159)

Вопрос:
1. Так как вышеупомянутой конфигурации нужно что-то большее, чтобы ее можно было настроить, чтобы избежать упомянутого выше исключения? Потому что оба экземпляра микросервиса конечного автомата были протестированы в том случае, когда они оба подключались к одному экземпляру, т.е. к одной и той же строке .connectString("localhost:2181").build() или случай, когда они были созданы для подключения к различным экземплярам zookeeper (то есть "localhost:2181", "localhost:2182").

Одно и то же исключение BAD VERSION происходит при обработке ансамбля конечного автомата в обоих случаях.

2. Также, если Пакеты будут работать параллельно, так что их соответствующие машины должны быть созданы для параллельной работы на стороне микро-сервиса конечного автомата. Итак, вот, технически новый State machine нам нужен для нового batchId, работающего одновременно. Но, глядя на ZookeeperStateMachineEnsemble, кажется, что один путь znode ассоциируется с одним ансамблем всякий раз, когда объект ансамбля создается один раз в основном классе конфигурации ("StateMachineUMLWayConfiguration") .

Таким образом, ожидается ли использование только этого экземпляра синглтон-ансамбля? Разве нельзя создавать несколько ансамблей во время выполнения, ссылаясь на разные пути znode, запущенные параллельно, чтобы записывать их соответствующие состояния распределенного автомата в их соответствующие пути znode??

а. Поскольку пакеты, работающие параллельно, потребуют создания отдельных путей znode. Таким образом, из-за нашей попытки сохранить отдельный путь znode для каждой партии нам нужен отдельный ансамбль для каждой машины пакета. Но похоже, что он попадает в состояние блокировки при получении соединения с znode через кураторский клиент.

б. Вызов REST, инициированный для запуска события, не завершается, поскольку полученная машина застревает в ансамбле для соединения.

Заранее спасибо.

0 ответов

Другие вопросы по тегам