Атмосферный ресурс за сессию

Я не могу установить вещательную компанию Atmosphere для каждого сеанса пользователя для ресурса Atmosphere. Все, что я могу почерпнуть из документации, - это как создавать приложения для чата, которые передают одно и то же сообщение каждому пользователю.

Возможно ли, чтобы среда Atmosphere установила канал для сеанса пользователя или мне нужно что-то делать и обрабатывать эти соединения самостоятельно с помощью карты в памяти?

Это ресурс, который я хочу:

/websockets/notifications

Я хочу, чтобы пользователи A и B подключались к этому каналу из разных браузеров, а затем имели возможность независимо передавать их сообщения. Я должен иметь возможность использовать их идентификаторы сессий, чтобы атмосфера понимала, кому отправлять ответ.

Атмосфера поддерживает это?

Соответствующий POM.xml

<spring-boot-starter-web.version>1.3.3.RELEASE</spring-boot-starter-web.version>
<atmosphere-runtime.version>2.4.4</atmosphere-runtime.version>
<atmosphere-javascript.version>2.3.0</atmosphere-javascript.version>

Конфигурация атмосферы

package com.hello;

import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRegistration;

import org.atmosphere.cache.UUIDBroadcasterCache;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereServlet;
import org.atmosphere.cpr.MetaBroadcaster;
import org.springframework.boot.context.embedded.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AtmosphereConfiguration implements ServletContextInitializer {

    @Bean
    public AtmosphereServlet atmosphereServlet() {
        return new AtmosphereServlet();
    }

    @Bean
    public AtmosphereFramework atmosphereFramework() {
        return atmosphereServlet().framework();
    }

    @Bean
    public MetaBroadcaster metaBroadcaster() {
        AtmosphereFramework framework = atmosphereFramework();
        return framework.metaBroadcaster();
    }

    @Override
    public void onStartup(ServletContext servletContext) throws ServletException {
        configureAthmosphere(atmosphereServlet(), servletContext);
    }

    private void configureAthmosphere(AtmosphereServlet servlet, ServletContext servletContext) {
        ServletRegistration.Dynamic atmosphereServlet = servletContext.addServlet("atmosphereServlet", servlet);
        atmosphereServlet.setInitParameter(ApplicationConfig.ANNOTATION_PACKAGE, "com.hello");
        atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_CACHE, UUIDBroadcasterCache.class.getName());
        atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_SHARABLE_THREAD_POOLS, "true");
        atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_MESSAGE_PROCESSING_THREADPOOL_MAXSIZE, "10");
        atmosphereServlet.setInitParameter(ApplicationConfig.BROADCASTER_ASYNC_WRITE_THREADPOOL_MAXSIZE, "10");
        servletContext.addListener(new org.atmosphere.cpr.SessionSupport());
        atmosphereServlet.addMapping("/websocket/*");
        atmosphereServlet.setLoadOnStartup(0);
        atmosphereServlet.setAsyncSupported(true);
    }

}

Атмосферный ресурс

package com.hello;

import java.nio.charset.StandardCharsets;

import org.atmosphere.config.service.Get;
import org.atmosphere.config.service.Disconnect;
import org.atmosphere.config.service.ManagedService;
import org.atmosphere.config.service.Ready;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ManagedService(path = NotificationAtmosphereResource.PATH)
public class NotificationAtmosphereResource {

    public static final String PATH = "/websocket/notifications";

    private Logger logger = LoggerFactory.getLogger(NotificationAtmosphereResource.class);

    @Get        
    public void init(AtmosphereResource resource){
        resource.getResponse().setCharacterEncoding(StandardCharsets.UTF_8.name());
    }

    @Ready
    public void onReady(final AtmosphereResource resource) {
        logger.info("Connected {}", resource.uuid());
    }

    @Disconnect
    public void onDisconnect(AtmosphereResourceEvent event) {
        logger.info("Client {} disconnected [{}]", event.getResource().uuid(),
                (event.isCancelled() ? "cancelled" : "closed"));
    }

}

Служба, с которой я посылаю сообщения

package com.hello;

import org.atmosphere.cpr.MetaBroadcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class NotificationEmitterBean implements NotificationEmitter {

    private Logger logger = LoggerFactory.getLogger(NotificationEmitterBean.class);

    @Autowired 
    private MetaBroadcaster metaBroadcaster;

    @Autowired
    private NotificationService notificationService;

    @Autowired
    private JsonMapper jsonMapper;

    @Override
    public void emitNotification(String sessionId, String msg) {

   // This will broadcast to all users on /websocket/notifications
   // How can I use sessionId to broadcast to the respective browser?
   metaBroadcaster.broadcastTo(NotificationAtmosphereResource.PATH, 
                    jsonMapper.toJson(msg));        
        }

    }

}

1 ответ

Единственный способ, которым я смог добиться этой работы, - это создать свой собственный вещательный вещатель. Я использовал ExcludeSessionBroadcaster, написанный Жанфрансуа Аркандом, в качестве основы.

IncludeSessionBroadcaster.java

package com.hello;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Future;

import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterFuture;
import org.atmosphere.cpr.DefaultBroadcaster;
import org.atmosphere.cpr.Deliver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An implementation of {@link DefaultBroadcaster} that include one or more {@link AtmosphereResource}
 *
 * Based on ExcludeSessionBroadcaster written by Jeanfrancois Arcand
 *
 * @author Steven Zgaljic 
 */
public class IncludeSessionBroadcaster extends DefaultBroadcaster {

    private static final Logger logger = LoggerFactory.getLogger(IncludeSessionBroadcaster.class);

    public IncludeSessionBroadcaster(){}

    public Broadcaster initialize(String id, AtmosphereConfig config) {
        return super.initialize(id, config);
    }

    /**
     * the AtmosphereResource r will be include for this broadcast
     *
     * @param msg
     * @param r
     * @return
     */
    @Override
    public Future<Object> broadcast(Object msg, AtmosphereResource r) {

        if (destroyed.get()) {
            throw new IllegalStateException("This Broadcaster has been destroyed and cannot be used");
        }

        Set<AtmosphereResource> sub = new HashSet<AtmosphereResource>();
        sub.removeAll(resources);
        sub.add(r);
        start();
        Object newMsg = filter(msg);
        if (newMsg == null) {
            return null;
        }

        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, sub.size());
        dispatchMessages(new Deliver(newMsg, sub, f, msg));
        return f;
    }


    /**
     * the AtmosphereResources subset will be include for this broadcast
     *
     * @param msg
     * @param subset
     * @return
     */
    @Override
    public Future<Object> broadcast(Object msg, Set<AtmosphereResource> subset) {

        if (destroyed.get()) {
            return futureDone(msg);
        }

        subset.retainAll(resources);
        start();
        Object newMsg = filter(msg);
        if (newMsg == null) {
            return futureDone(msg);
        }

        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, subset.size());
        dispatchMessages(new Deliver(newMsg, subset, f, msg));
        return f;
    }

    /**
     * session will be include for this broadcast
     *
     * @param msg
     * @param s
     * @return
     */
    public Future<Object> broadcast(Object msg, String sessionId) {

        if (destroyed.get()) {
            return futureDone(msg);
        }

        Set<AtmosphereResource> subset = new HashSet<AtmosphereResource>();

        for (AtmosphereResource r : resources) {
            if (!r.getAtmosphereResourceEvent().isCancelled() &&
                    sessionId.equals(r.getRequest().getSession().getId())) {
                subset.add(r);
                break;
            }
        }

        start();
        Object newMsg = filter(msg);
        if (newMsg == null) {
            return futureDone(msg);
        }

        BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg, subset.size());
        dispatchMessages(new Deliver(newMsg, subset, f, msg));
        return f;
    }
}

Затем я назначил этого вещателя на ресурс Атмосфера.

NotificationAtmosphereResource.java

package com.hello;

import java.nio.charset.StandardCharsets;

import org.atmosphere.config.service.Get;
import org.atmosphere.config.service.Disconnect;
import org.atmosphere.config.service.ManagedService;
import org.atmosphere.config.service.Ready;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ManagedService(path = NotificationAtmosphereResource.PATH, 
    broadcaster=IncludeSessionBroadcaster.class)
public class NotificationAtmosphereResource {

    public static final String PATH = "/websocket/notifications";

    private Logger logger = LoggerFactory.getLogger(NotificationAtmosphereResource.class);

    @Get        
    public void init(AtmosphereResource resource){
        resource.getResponse().setCharacterEncoding(StandardCharsets.UTF_8.name());
    }

    @Ready
    public void onReady(final AtmosphereResource resource) {
        logger.info("Connected {}", resource.uuid());
    }

    @Disconnect
    public void onDisconnect(AtmosphereResourceEvent event) {
        logger.info("Client {} disconnected [{}]", event.getResource().uuid(),
                (event.isCancelled() ? "cancelled" : "closed"));
    }

}

Тогда я мог бы отправить сообщение только браузеру (sessionId), который я хочу.

NotificationEmitterBean.java

package com.hello;

import org.atmosphere.cpr.MetaBroadcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class NotificationEmitterBean implements NotificationEmitter {

    private Logger logger = LoggerFactory.getLogger(NotificationEmitterBean.class);

    @Autowired 
    private BroadcasterFactory factory;

    @Override
    public void emitNotification(String sessionId, String msg) {

           ((IncludeSessionBroadcaster)factory.lookup(NotificationAtmosphereResource.PATH)).broadcast(msg, sessionId);      
        }

    }

}
Другие вопросы по тегам