Безопасен ли этот поток вызовов клиента JAX-WS?
Поскольку инициализация клиентской службы и порта WS занимает много времени, мне нравится инициализировать их один раз при запуске и повторно использовать один и тот же экземпляр порта. Инициализация будет выглядеть примерно так:
private static RequestContext requestContext = null;
static
{
MyService service = new MyService();
MyPort myPort = service.getMyServicePort();
Map<String, Object> requestContextMap = ((BindingProvider) myPort).getRequestContext();
requestContextMap = ((BindingProvider)myPort).getRequestContext();
requestContextMap.put(BindingProvider.USERNAME_PROPERTY, uName);
requestContextMap.put(BindingProvider.PASSWORD_PROPERTY, pWord);
rc = new RequestContext();
rc.setApplication("test");
rc.setUserId("test");
}
Звонок где-то в моем классе:
myPort.someFunctionCall(requestContext, "someValue");
Мой вопрос: будет ли этот вызов потокобезопасным?
4 ответа
Согласно CXF FAQ:
Являются ли клиентские прокси JAX-WS потокобезопасными?
Официальный ответ JAX-WS: Нет. В соответствии со спецификацией JAX-WS прокси клиента НЕ являются поточно-ориентированными. Чтобы написать переносимый код, вы должны обращаться с ним как с безопасностью, не связанной с потоками, и синхронизировать доступ или использовать пул экземпляров или аналогичный.
Ответ CXF: прокси-серверы CXF поточно-ориентированы для многих случаев использования. Исключения:
Использование
((BindingProvider)proxy).getRequestContext()
- согласно спецификации JAX-WS контекст запроса - PER INSTANCE. Таким образом, все, что там установлено, повлияет на запросы в других потоках. С CXF вы можете сделать:((BindingProvider)proxy).getRequestContext().put("thread.local.request.context","true");
и будущие вызовы getRequestContext() будут использовать локальный контекст запроса потока. Это позволяет контексту запроса быть потокобезопасным. (Примечание: контекст ответа всегда является локальным в CXF)
Настройки в канале - если вы используете код или конфигурацию для прямого манипулирования каналом (например, для установки настроек TLS или аналогичных), они не являются поточно-ориентированными. Канал для каждого экземпляра и, таким образом, эти параметры будут общими. Кроме того, если вы используете FailoverFeature и LoadBalanceFeatures, канал будет заменен на лету. Таким образом, настройки, установленные на кабелепроводе, могут быть потеряны перед использованием в потоке настройки.
- Поддержка сессий - если вы включаете поддержку сессий (см. Спецификацию jaxws), куки-файл сессии сохраняется в канале. Таким образом, он попадет в приведенные выше правила по настройке канала и, таким образом, будет разделен между потоками.
- Токены WS-Security. Если используется WS-SecureConversation или WS-Trust, извлеченный токен кэшируется в конечной точке / прокси-сервере, чтобы избежать дополнительных (и дорогостоящих) вызовов STS для получения токенов. Таким образом, несколько потоков будут совместно использовать токен. Если каждый поток имеет разные учетные данные или требования безопасности, вам нужно использовать отдельные экземпляры прокси.
В случае проблем с каналом вы МОЖЕТЕ установить новый ConduitSelector, использующий локальный поток или аналогичный. Это немного сложно, хотя.
Для большинства "простых" вариантов использования вы можете использовать прокси CXF в нескольких потоках. Выше изложены обходные пути для других.
В общем нет.
Согласно CXF FAQ http://cxf.apache.org/faq.html
Официальный ответ JAX-WS: Нет. В соответствии со спецификацией JAX-WS прокси клиента НЕ являются поточно-ориентированными. Чтобы написать переносимый код, вы должны обращаться с ним как с безопасностью, не связанной с потоками, и синхронизировать доступ или использовать пул экземпляров или аналогичный.
Ответ CXF: прокси-серверы CXF поточно-ориентированы для многих случаев использования.
Список исключений см. В FAQ.
Как видно из приведенного выше ответа, клиентские прокси-серверы JAX-WS не являются поточно-ориентированными, поэтому я просто хотел поделиться своей реализацией с целью кэширования прокси-серверов клиентов другими. Я на самом деле столкнулся с той же проблемой и решил создать пружинный компонент, который выполняет кэширование прокси-серверов JAX-WS Client. Вы можете увидеть более подробную информацию http://programtalk.com/java/using-spring-and-scheduler-to-store/
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Component;
/**
* This keeps the cache of MAX_CUNCURRENT_THREADS number of
* appConnections and tries to shares them equally amongst the threads. All the
* connections are created right at the start and if an error occurs then the
* cache is created again.
*
*/
/*
*
* Are JAX-WS client proxies thread safe? <br/> According to the JAX-WS spec,
* the client proxies are NOT thread safe. To write portable code, you should
* treat them as non-thread safe and synchronize access or use a pool of
* instances or similar.
*
*/
@Component
public class AppConnectionCache {
private static final Logger logger = org.apache.logging.log4j.LogManager.getLogger(AppConnectionCache.class);
private final Map<Integer, MyService> connectionCache = new ConcurrentHashMap<Integer, MyService>();
private int cachedConnectionId = 1;
private static final int MAX_CUNCURRENT_THREADS = 20;
private ScheduledExecutorService scheduler;
private boolean forceRecaching = true; // first time cache
@PostConstruct
public void init() {
logger.info("starting appConnectionCache");
logger.info("start caching connections"); ;;
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("appconnectioncache-scheduler-thread-%d").build();
scheduler = Executors.newScheduledThreadPool(1, factory);
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
initializeCache();
}
}, 0, 10, TimeUnit.MINUTES);
}
public void destroy() {
scheduler.shutdownNow();
}
private void initializeCache() {
if (!forceRecaching) {
return;
}
try {
loadCache();
forceRecaching = false; // this flag is used for initializing
logger.info("connections creation finished successfully!");
} catch (MyAppException e) {
logger.error("error while initializing the cache");
}
}
private void loadCache() throws MyAppException {
logger.info("create and cache appservice connections");
for (int i = 0; i < MAX_CUNCURRENT_THREADS; i++) {
tryConnect(i, true);
}
}
public MyPort getMyPort() throws MyAppException {
if (cachedConnectionId++ == MAX_CUNCURRENT_THREADS) {
cachedConnectionId = 1;
}
return tryConnect(cachedConnectionId, forceRecaching);
}
private MyPort tryConnect(int threadNum, boolean forceConnect) throws MyAppException {
boolean connect = true;
int tryNum = 0;
MyPort app = null;
while (connect && !Thread.currentThread().isInterrupted()) {
try {
app = doConnect(threadNum, forceConnect);
connect = false;
} catch (Exception e) {
tryNum = tryReconnect(tryNum, e);
}
}
return app;
}
private int tryReconnect(int tryNum, Exception e) throws MyAppException {
logger.warn(Thread.currentThread().getName() + " appservice service not available! : " + e);
// try 10 times, if
if (tryNum++ < 10) {
try {
logger.warn(Thread.currentThread().getName() + " wait 1 second");
Thread.sleep(1000);
} catch (InterruptedException f) {
// restore interrupt
Thread.currentThread().interrupt();
}
} else {
logger.warn(" appservice could not connect, number of times tried: " + (tryNum - 1));
this.forceRecaching = true;
throw new MyAppException(e);
}
logger.info(" try reconnect number: " + tryNum);
return tryNum;
}
private MyPort doConnect(int threadNum, boolean forceConnect) throws InterruptedException {
MyService service = connectionCache.get(threadNum);
if (service == null || forceConnect) {
logger.info("app service connects : " + (threadNum + 1) );
service = new MyService();
connectionCache.put(threadNum, service);
logger.info("connect done for " + (threadNum + 1));
}
return service.getAppPort();
}
}
Общее решение для этого состоит в том, чтобы использовать несколько клиентских объектов в пуле, а затем использовать прокси, который действует как фасад.
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
class ServiceObjectPool<T> extends GenericObjectPool<T> {
public ServiceObjectPool(java.util.function.Supplier<T> factory) {
super(new BasePooledObjectFactory<T>() {
@Override
public T create() throws Exception {
return factory.get();
}
@Override
public PooledObject<T> wrap(T obj) {
return new DefaultPooledObject<>(obj);
}
});
}
public static class PooledServiceProxy<T> implements InvocationHandler {
private ServiceObjectPool<T> pool;
public PooledServiceProxy(ServiceObjectPool<T> pool) {
this.pool = pool;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
T t = null;
try {
t = this.pool.borrowObject();
return method.invoke(t, args);
} finally {
if (t != null)
this.pool.returnObject(t);
}
}
}
@SuppressWarnings("unchecked")
public T getProxy(Class<? super T> interfaceType) {
PooledServiceProxy<T> handler = new PooledServiceProxy<>(this);
return (T) Proxy.newProxyInstance(interfaceType.getClassLoader(),
new Class<?>[]{interfaceType}, handler);
}
}
Чтобы использовать прокси:
ServiceObjectPool<SomeNonThreadSafeService> servicePool = new ServiceObjectPool<>(createSomeNonThreadSafeService);
nowSafeService = servicePool .getProxy(SomeNonThreadSafeService.class);