Как удалить постоянных подписчиков на тему в ActiveMQ?

Мой JMS Config указан ниже. Как я могу отписаться от постоянного абонента?

На основе значения, указанного в файле env.properties, я создаю N постоянных подписчиков.

Этот код ниже является конфигурацией для создания и запуска брокера. Также есть конфигурация о DefaultMessageListenerContainer(DMLC).

Я знаю, как создать надежного подписчика, что я и сделал в приведенном ниже коде, но я не знаю, как я могу удалить существующих постоянных подписчиков / потребителей в тему. Как я могу это сделать?

@Configuration
public class JMSConfig {
    private static final Logger log = Logger.getLogger(JMSConfig.class);

    @Value("${publisherV3.embedded.activemq.broker-url}")
    private String DEFAULT_BROKER_URL;//tcp://localhost:61616

    @Value("${publisherV3.embedded.activemq.topic}")
    private String TOPIC;

    @Value("${publisherV3.listeners.value}")
    private String value;

    @Value("${publisher.KahaDBStore.location}")
    private String path;

    /**
     * @return
     * @throws Exception
    **/ 

    TaskExecutor s = new SimpleAsyncTaskExecutor("Listener-Thread");

    @Bean 
    public BrokerService createBrokerService() throws IOException, Exception{
        return createBroker();
    }


    private BrokerService createBroker() throws IOException, Exception {
         BrokerService broker = new BrokerService();KahaDBStore kaha=new KahaDBStore();
         File file =new File(path);
         TransportConnector connector = new TransportConnector();
         connector.setUri(new URI(DEFAULT_BROKER_URL));
         kaha.setDirectory(file);
         broker.addConnector(connector);
         broker.setPersistenceAdapter(kaha);
         PolicyEntry policy = new PolicyEntry();
         policy.setDeadLetterStrategy(new DiscardingDeadLetterStrategy());

         PolicyMap pMap = new PolicyMap();
         pMap.setDefaultEntry(policy);

         broker.setDestinationPolicy(pMap);
         broker.start();
         return broker;

    }

    @Bean
    @Scope("prototype")
    public AcListenerPublisherImpl listener(){
        AcListenerPublisherImpl listener= new AcListenerPublisherImpl();
        return listener;
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
        return connectionFactory;
    }

    @Bean
    @Scope("prototype")
    public DefaultMessageListenerContainer listenerContainer() {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setDestinationName(TOPIC);
        container.setMessageListener(listener());
        container.setPubSubDomain(true);
        container.setSessionTransacted(true);
        container.setTaskExecutor(s);
        container.setSubscriptionDurable(true);
        return container;
    }

    @Bean(destroyMethod="stopContainers")
    public SetWrapperDmlc containers() {
        Set<DefaultMessageListenerContainer> containers = new HashSet<DefaultMessageListenerContainer>();

        for(int i=0;i<Integer.parseInt(value);i++){
            DefaultMessageListenerContainer container = listenerContainer();
            container.setClientId("subscriber"+i);
            container.setDurableSubscriptionName("subscriber"+i+i);
            container.start();
            containers.add(container);
        }

        SetWrapperDmlc setWrapper = new SetWrapperDmlc(containers);
        return setWrapper;
    }

}

Код ведет себя нерегулярно, удерживая некоторые сообщения в подписчике и теме, дающей больше, чем требуется. Я думал об удалении подписчиков и перезапуске брокера. Вот почему я просил об этом

0 ответов

Другие вопросы по тегам