Как ставить в очередь в таблице Oracle AQ при фиксации с Java и использовать с клиентом JMS
Я пишу Java-компонент для продукта уровня предприятия и хочу использовать особую функцию баз данных Oracle 11g, Active Queues. Точный сценарий, который я хочу выполнить, - 1. записать сообщение в активную оракул / таблицу очередей на коммите 2. прочитать это сообщение из очереди с потребителем JMS
Я ознакомился с демонстрацией и учебником по адресу http://docs.oracle.com/cd/B28359_01/java.111/b31224/streamsaq.htm
и, в частности, я хотел бы сосредоточиться на части кода в очереди -
// Create the actual AQMessage instance:
AQMessage mesg = AQFactory.createAQMessage(msgprop);
// and add a payload:
byte[] rawPayload = new byte[500];
for (int i = 0; i < rawPayload.length; i++) {
rawPayload[i] = 'b';
}
mesg.setPayload(new RAW(rawPayload));
AQEnqueueOptions opt = new AQEnqueueOptions();
opt.setRetrieveMessageId(true);
opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);
opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);
// execute the actual enqueue operation:
conn.enqueue(queueName, opt, mesg);
Это прекрасно работает для меня, потому что мы хотим, чтобы сообщение было видно только для потребителей, когда транзакция зафиксирована.
Проблема - в демоверсии мы создаем очереди типа RAW полезной нагрузки
doUpdateDatabase(conn,
"BEGIN "+
"DBMS_AQADM.CREATE_QUEUE_TABLE( "+
" QUEUE_TABLE => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE', "+
" QUEUE_PAYLOAD_TYPE => 'RAW', "+
" COMPATIBLE => '10.0'); "+
"END; ");
doUpdateDatabase(conn,
"BEGIN "+
"DBMS_AQADM.CREATE_QUEUE( "+
" QUEUE_NAME => '"+USERNAME+".RAW_SINGLE_QUEUE', "+
" QUEUE_TABLE => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE'); "+
"END; ");
doUpdateDatabase(conn,
"BEGIN "+
" DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+
"END; ");
с помощью очередей, созданных в RAW, я могу ставить сообщения в очередь, однако потребители JMS не могут подписаться на очередь, вызывая исключение (нулевой указатель), когда потребитель ожидает параметр для ожидаемого типа. Короче говоря, этот код вызывает исключение нулевого указателя при инициализации.
Properties env = new Properties();
env.load(new FileInputStream(new File("jndi.properties")));
Context ctx = new InitialContext(env);
ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(connectionFactoryName);
Connection connection = connFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
AQjmsSession queueSession = (AQjmsSession) session;
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer receiver = queueSession.createReceiver(queue);
JNDI.properties
java.naming.factory.initial = oracle.jms.AQjmsInitialContextFactory
java.naming.security.principal = username
java.naming.security.credentials = password
db_url = jdbc:oracle:thin:@host:port:dbname
Я получаю аналогичное исключение при попытке настроить потребителей в Camel.
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<!-- this camel route will read incoming messages from Oracle -->
<route>
<from uri="oracleQueue:queue:RAW_SINGLE_QUEUE" />
<to uri="WebSphereMQ:queue:myWebSphereQueue" />
</route>
</camelContext>
<bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
<constructor-arg index="0">
<value>oracle db URL</value>
</constructor-arg>
<constructor-arg index="1" type="java.util.Properties">
<value></value>
</constructor-arg>
</bean>
<bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory">
<ref bean="connectionFactoryOracleAQQueue" />
</property>
<property name="username">
<value>username</value>
</property>
<property name="password">
<value>password</value>
</property>
</bean>
<bean id="oracleQueue" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="oracleQueueCredentials" />
</bean>
С некоторыми исследованиями я понял, что проблема может быть в типе полезной нагрузки очереди. Поэтому я изменил сценарий создания таблицы очередей и использовал JMS-сообщение в качестве типа полезной нагрузки.
doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "
+ " QUEUE_TABLE => 'RAW_SINGLE_QUEUE_TABLE', "
+ " QUEUE_PAYLOAD_TYPE => 'SYS.AQ$_JMS_MESSAGE', " +
" COMPATIBLE => '10.0'); " + "END; ");
В этом случае потребители JMS могут подключиться, но код очереди не работает - ORA-25215: тип user_data и тип очереди не совпадают
Вопрос в том, как я могу поставить в очередь сообщения, видимые только при фиксации, от производителя Java и быть в состоянии использовать с верблюдом или универсальным потребителем JMS?
ограничения (чтобы отфильтровать некоторые ответы уже в сети) - Невозможно использовать PL/SQL, пружинные транзакции, JTA. Я видел такие примеры, как Как вставить сообщение JMS в Oracle AQ с использованием Java, где таблица очередей создается с типом SYS.AQ$_JMS_MESSAGE, но примером производителя является JMS MessageProducer, а не тот, который приведен в руководстве по оракулу. Я не пытаюсь ставить в очередь сообщения JMS (AQJmsMessage), скорее использую тип AQMessage, как описано в руководстве Oracle, и использовать опцию visible on commit.
Мне кажется, что если проблема основана на несоответствии только типов полезной нагрузки, то на стороне потребителя должна быть некоторая конфигурация для указания типа полезной нагрузки или на стороне производителя, чтобы иметь возможность писать сообщения так, как потребители JMS будут Понимаю. Есть ли способ сделать это?
1 ответ
Мне удалось это сделать - мне пришлось разбираться во многих частях Oracle API и собирать подсказки из разных блогов. Для всех, кто интересуется здесь, есть способ, которым я получил это - 1. Я создал объект Oracle на Oracle Db 2. С этим объектом Oracle я создал таблицы очередей типа объекта в качестве полезной нагрузки 3. Теперь я могу ставить в очередь типы AQMessage с полезной нагрузкой STRUCT, содержащей данные объекта 4. И я могу вывести из очереди клиента JMS, который понимает тип полезной нагрузки ADT (благодаря статье на http://blog.javaforge.net/post/30858904340/oracle-advanced-queuing-spring-custom-types)
Вот шаги с кодом - Создайте объект Oracle. Объект может иметь любые поля первичного типа данных, такие как VARCHAR, TIMESTAMP и т. Д., А также BLOB, CLOB и т. Д. В этом случае я предоставил один из столбцов в виде BLOB-объекта, чтобы сделать вещи более сложными.
create or replace type aq_event_obj as object
(
id varchar2(100),
payload BLOB
);
commit;
Теперь создайте таблицу очередей. Тип полезной нагрузки таблицы - объект оракула.
private void setup(Connection conn) throws SQLException {
doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "
+ " QUEUE_TABLE => 'OBJ_SINGLE_QUEUE_TABLE', " + " QUEUE_PAYLOAD_TYPE => 'AQ_EVENT_OBJ', "
+ " COMPATIBLE => '10.0'); " + "END; ");
doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE( " + " QUEUE_NAME => 'OBJ_SINGLE_QUEUE', "
+ " QUEUE_TABLE => 'OBJ_SINGLE_QUEUE_TABLE'); " + "END; ");
doUpdateDatabase(conn, "BEGIN " + " DBMS_AQADM.START_QUEUE('OBJ_SINGLE_QUEUE'); " + "END; ");
}
Теперь вы можете ставить в очередь типы AQMessage в Java с помощью экземпляра структуры объекта
public void enqueueMessage(OracleConnection conn, String correlationId, byte[] payloadData) throws Exception {
// First create the message properties:
AQMessageProperties aqMessageProperties = AQFactory.createAQMessageProperties();
aqMessageProperties.setCorrelation(correlationId);
aqMessageProperties.setExceptionQueue(EXCEPTION_QUEUE_NAME);
// Specify an agent as the sender:
AQAgent aqAgent = AQFactory.createAQAgent();
aqAgent.setName(SENDER_NAME);
aqAgent.setAddress(QUEUE_NAME);
aqMessageProperties.setSender(aqAgent);
// Create the payload
StructDescriptor structDescriptor = StructDescriptor.createDescriptor(EVENT_OBJECT, conn);
Map<String, Object> payloadMap = new HashMap<String, Object>();
payloadMap.put("ID", correlationId);
payloadMap.put("PAYLOAD", new OracleAQBLOBUtil().createBlob(conn, payloadData));
STRUCT struct = new STRUCT(structDescriptor, conn, payloadMap);
// Create the actual AQMessage instance:
AQMessage aqMessage = AQFactory.createAQMessage(aqMessageProperties);
aqMessage.setPayload(struct);
AQEnqueueOptions opt = new AQEnqueueOptions();
opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);
opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);
// execute the actual enqueue operation:
conn.enqueue(QUEUE_NAME, opt, aqMessage);
}
Поле blob нуждается в особой обработке
public class OracleAQBLOBUtil {
public BLOB createBlob(OracleConnection conn, byte[] payload) throws Exception {
BLOB blob = BLOB.createTemporary(conn, false, BLOB.DURATION_SESSION);
OutputStream outputStream = blob.setBinaryStream(1L);
InputStream inputStream = new ByteArrayInputStream(payload);
try {
byte[] buffer = new byte[blob.getBufferSize()];
int bytesRead = 0;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
return blob;
}
finally {
outputStream.close();
inputStream.close();
}
}
public byte[] saveOutputStream(BLOB blob) throws Exception {
InputStream inputStream = blob.getBinaryStream();
int counter;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
while ((counter = inputStream.read()) > -1) {
byteArrayOutputStream.write(counter);
}
byteArrayOutputStream.close();
return byteArrayOutputStream.toByteArray();
}
}
Для потребителя вам необходимо предоставить экземпляр ORADataFactory, который позволит потребителю понять тип полезной нагрузки (ваш пользовательский объект).
AQjmsSession queueSession = (AQjmsSession) session;
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer receiver = queueSession.createReceiver(queue, new OracleAQObjORADataFactory());
Где код для OracleAQObjORADataFactory
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import oracle.jdbc.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.BLOB;
import oracle.sql.Datum;
import oracle.sql.ORAData;
import oracle.sql.ORADataFactory;
import oracle.sql.STRUCT;
public class OracleAQObjORADataFactory implements ORAData, ORADataFactory {
public static final String EVENT_OBJECT = "SYSTEM.AQ_EVENT_OBJ";
public static final int _SQL_TYPECODE = OracleTypes.STRUCT;
protected MutableStruct _struct;
protected static int[] _sqlType = { java.sql.Types.VARCHAR, java.sql.Types.VARBINARY };
protected static ORADataFactory[] _factory = new ORADataFactory[2];
protected static final OracleAQObjORADataFactory _AqEventObjFactory = new OracleAQObjORADataFactory ();
public static ORADataFactory getORADataFactory() {
return _AqEventObjFactory;
}
/* constructors */
protected void _init_struct(boolean init) {
if (init)
_struct = new MutableStruct(new Object[2], _sqlType, _factory);
}
public OracleAQObjORADataFactory () {
_init_struct(true);
}
public OracleAQObjORADataFactory (String id, byte[] payload) throws SQLException {
_init_struct(true);
setId(id);
setPayload(payload);
}
/* ORAData interface */
public Datum toDatum(Connection c) throws SQLException {
return _struct.toDatum(c, EVENT_OBJECT);
}
/* ORADataFactory interface */
public ORAData create(Datum d, int sqlType) throws SQLException {
return create(null, d, sqlType);
}
protected ORAData create(OracleAQObjORADataFactory o, Datum d, int sqlType) throws SQLException {
if (d == null)
return null;
if (o == null)
o = new OracleAQObjORADataFactory ();
o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
return o;
}
public String getId() throws SQLException {
return (String) _struct.getAttribute(0);
}
public void setId(String id) throws SQLException {
_struct.setAttribute(0, id);
}
public byte[] getPayload() throws SQLException {
BLOB blob = (BLOB) _struct.getAttribute(1);
InputStream inputStream = blob.getBinaryStream();
return getBytes(inputStream);
}
public byte[] getBytes(InputStream body) {
int c;
try {
ByteArrayOutputStream f = new ByteArrayOutputStream();
while ((c = body.read()) > -1) {
f.write(c);
}
f.close();
byte[] result = f.toByteArray();
return result;
}
catch (Exception e) {
System.err.println("Exception: " + e.getMessage());
e.printStackTrace();
return null;
}
}
public void setPayload(byte[] payload) throws SQLException {
_struct.setAttribute(1, payload);
}
}
Вероятно, вы используете Camel или Spring в своем проекте, и в этом случае - 1. Если вы используете Camel 2.10.2 или более поздней версии, вы можете создать потребителя JMS с настраиваемым контейнером списка сообщений (CAMEL-5676) 2. Если вы работаете в предыдущей версии, тогда вы, возможно, не сможете использовать способ конечной точки (я не мог понять это), но вы можете использовать слушатель запросов JMS
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!-- this is just an example, you can also use a datasource as the ctor arg -->
<bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
<constructor-arg index="0">
<value>jdbc:oracle:thin:@blrub442:1522:UB23</value>
</constructor-arg>
<constructor-arg index="1" type="java.util.Properties">
<value></value>
</constructor-arg>
</bean>
<bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory">
<ref bean="connectionFactoryOracleAQQueue" />
</property>
<property name="username">
<value>system</value>
</property>
<property name="password">
<value>oracle</value>
</property>
</bean>
<!-- Definitions for JMS Listener classes that we have created -->
<bean id="aqMessageListener" class="com.misys.test.JmsRequestListener" />
<bean id="aqEventQueue" class="com.misys.test.OracleAqQueueFactoryBean">
<property name="connectionFactory" ref="oracleQueueCredentials" />
<property name="oracleQueueName" value="BOZ_SINGLE_QUEUE" />
</bean>
<!-- The Spring DefaultMessageListenerContainer configuration. This bean is automatically loaded when the JMS application context is started -->
<bean id="jmsContainer" class="com.misys.test.AQMessageListenerContainer" scope="singleton">
<property name="connectionFactory" ref="oracleQueueCredentials" />
<property name="destination" ref="aqEventQueue" />
<property name="messageListener" ref="aqMessageListener" />
<property name="sessionTransacted" value="false" />
</bean>
</beans>
Пользовательский контейнер слушателя сообщений
public class AQMessageListenerContainer extends DefaultMessageListenerContainer {
@Override
protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
return ((AQjmsSession) session).createConsumer(destination, getMessageSelector(),
OracleAQObjORADataFactory.getORADataFactory(), null, isPubSubNoLocal());
}
}
и метод прослушивания запроса onMessage
public void onMessage(Message msg) {
try {
AQjmsAdtMessage aQjmsAdtMessage = (AQjmsAdtMessage) msg;
OracleAQObjORADataFactory obj = (OracleAQObjORADataFactory) aQjmsAdtMessage.getAdtPayload();
System.out.println("Datetime: " + obj.getId());
System.out.println("Payload: " + new String(obj.getPayload(), Charset.forName("UTF-8")));
}
catch (Exception jmsException) {
if (logger.isErrorEnabled()) {
logger.error(jmsException.getLocalizedMessage());
}
}
}