Как удалить постоянных подписчиков на тему в ActiveMQ?
Мой JMS Config указан ниже. Как я могу отписаться от постоянного абонента?
На основе значения, указанного в файле env.properties, я создаю N постоянных подписчиков.
Этот код ниже является конфигурацией для создания и запуска брокера. Также есть конфигурация о DefaultMessageListenerContainer(DMLC).
Я знаю, как создать надежного подписчика, что я и сделал в приведенном ниже коде, но я не знаю, как я могу удалить существующих постоянных подписчиков / потребителей в тему. Как я могу это сделать?
@Configuration
public class JMSConfig {
private static final Logger log = Logger.getLogger(JMSConfig.class);
@Value("${publisherV3.embedded.activemq.broker-url}")
private String DEFAULT_BROKER_URL;//tcp://localhost:61616
@Value("${publisherV3.embedded.activemq.topic}")
private String TOPIC;
@Value("${publisherV3.listeners.value}")
private String value;
@Value("${publisher.KahaDBStore.location}")
private String path;
/**
* @return
* @throws Exception
**/
TaskExecutor s = new SimpleAsyncTaskExecutor("Listener-Thread");
@Bean
public BrokerService createBrokerService() throws IOException, Exception{
return createBroker();
}
private BrokerService createBroker() throws IOException, Exception {
BrokerService broker = new BrokerService();KahaDBStore kaha=new KahaDBStore();
File file =new File(path);
TransportConnector connector = new TransportConnector();
connector.setUri(new URI(DEFAULT_BROKER_URL));
kaha.setDirectory(file);
broker.addConnector(connector);
broker.setPersistenceAdapter(kaha);
PolicyEntry policy = new PolicyEntry();
policy.setDeadLetterStrategy(new DiscardingDeadLetterStrategy());
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
broker.start();
return broker;
}
@Bean
@Scope("prototype")
public AcListenerPublisherImpl listener(){
AcListenerPublisherImpl listener= new AcListenerPublisherImpl();
return listener;
}
@Bean
public ActiveMQConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
return connectionFactory;
}
@Bean
@Scope("prototype")
public DefaultMessageListenerContainer listenerContainer() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestinationName(TOPIC);
container.setMessageListener(listener());
container.setPubSubDomain(true);
container.setSessionTransacted(true);
container.setTaskExecutor(s);
container.setSubscriptionDurable(true);
return container;
}
@Bean(destroyMethod="stopContainers")
public SetWrapperDmlc containers() {
Set<DefaultMessageListenerContainer> containers = new HashSet<DefaultMessageListenerContainer>();
for(int i=0;i<Integer.parseInt(value);i++){
DefaultMessageListenerContainer container = listenerContainer();
container.setClientId("subscriber"+i);
container.setDurableSubscriptionName("subscriber"+i+i);
container.start();
containers.add(container);
}
SetWrapperDmlc setWrapper = new SetWrapperDmlc(containers);
return setWrapper;
}
}
Код ведет себя нерегулярно, удерживая некоторые сообщения в подписчике и теме, дающей больше, чем требуется. Я думал об удалении подписчиков и перезапуске брокера. Вот почему я просил об этом