Oracle 高级队列 Java
Oracle Advanced Queue In Java
我正在实施 Oracle Advanced Queue 并且完全陌生。我对此有一些疑问。下面是我的代码:
package com;
/* Set up main class from which we will call subsequent examples and handle
exceptions: */
import java.sql.*;
import oracle.AQ.*;
public class test_aqjava
{
public static void main(String args[])
{
AQSession aq_sess = null;
try
{
aq_sess = createSession(args);
createAqTables(aq_sess);
enqueueMsg(aq_sess);
// dequeueMsg(aq_sess);
aq_sess.close();
/* now run the test: */
// runTest(aq_sess);
}
catch (Exception ex)
{
System.out.println("Exception-1: " + ex);
ex.printStackTrace();
}
}
public static AQSession createSession(String args[])
{
Connection db_conn;
AQSession aq_sess = null;
try
{
Class.forName("oracle.jdbc.driver.OracleDriver");
/* your actual hostname, port number, and SID will
vary from what follows. Here we use 'dlsun736,' '5521,'
and 'test,' respectively: */
db_conn =
DriverManager.getConnection(
"jdbc:oracle:thin:@hostname.com:1521:sid",
"USER", "USER");
System.out.println("JDBC Connection opened ");
db_conn.setAutoCommit(false);
/* Load the Oracle8i AQ driver: */
Class.forName("oracle.AQ.AQOracleDriver");
/* Creating an AQ Session: */
aq_sess = AQDriverManager.createAQSession(db_conn);
System.out.println("Successfully created AQSession ");
}
catch (Exception ex)
{
System.out.println("Exception: " + ex);
ex.printStackTrace();
}
return aq_sess;
}
public static void createAqTables(AQSession aq_sess) throws AQException
{
AQQueueTableProperty qtable_prop;
AQQueueProperty queue_prop;
AQQueueTable q_table;
AQQueue queue;
/* Creating a AQQueueTableProperty object (payload type - RAW): */
qtable_prop = new AQQueueTableProperty("RAW");
/* Creating a queue table called aq_table1 in aqjava schema: */
q_table = aq_sess.createQueueTable ("USER", "aq_table1", qtable_prop);
System.out.println("Successfully created aq_table1 in aqjava schema");
/* Creating a new AQQueueProperty object */
queue_prop = new AQQueueProperty();
/* Creating a queue called aq_queue1 in aq_table1: */
queue = aq_sess.createQueue (q_table, "aq_queue1", queue_prop);
System.out.println("Successfully created aq_queue1 in aq_table1");
/* Enable enqueue/dequeue on this queue: */
queue.start();
System.out.println("Successful start queue");
}
public static void enqueueMsg(AQSession aq_sess) throws AQException
{
AQQueueTable q_table;
AQQueue queue;
AQMessage message;
AQRawPayload raw_payload;
AQEnqueueOption enq_option;
String test_data = "new message";
byte[] b_array;
Connection db_conn;
db_conn = ((AQOracleSession)aq_sess).getDBConnection();
/* Get a handle to queue table - aq_table4 in aqjava schema: */
q_table = aq_sess.getQueueTable ("USER", "aq_table1");
System.out.println("Successful getQueueTable");
/* Get a handle to a queue - aq_queue4 in aquser schema: */
queue = aq_sess.getQueue ("USER", "aq_queue1");
System.out.println("Successful getQueue");
/* Creating a message to contain raw payload: */
message = queue.createMessage();
/* Get handle to the AQRawPayload object and populate it with raw data: */
b_array = test_data.getBytes();
raw_payload = message.getRawPayload();
raw_payload.setStream(b_array, b_array.length);
/* Creating a AQEnqueueOption object with default options: */
enq_option = new AQEnqueueOption();
/* Enqueue the message: */
queue.enqueue(enq_option, message);
try {
db_conn.commit();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void dequeueMsg(AQSession aq_sess) throws AQException
{
AQQueueTable q_table;
AQQueue queue;
AQMessage message;
AQRawPayload raw_payload;
AQDequeueOption deq_option;
byte[] b_array;
Connection db_conn;
db_conn = ((AQOracleSession)aq_sess).getDBConnection();
/* Get a handle to queue table - aq_table4 in aqjava schema: */
q_table = aq_sess.getQueueTable ("USER", "aq_table1");
System.out.println("Successful getQueueTable");
/* Get a handle to a queue - aq_queue4 in aquser schema: */
queue = aq_sess.getQueue ("USER", "aq_queue1");
System.out.println("Successful getQueue");
/* Creating a AQDequeueOption object with default options: */
deq_option = new AQDequeueOption();
/* Enqueue the message: */
message = queue.dequeue(deq_option);
raw_payload = message.getRawPayload();
b_array= raw_payload.getBytes();
String msg = new String(b_array);
System.out.println("Dequeue Msg "+msg);
try {
db_conn.commit();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
我创建了一个队列 Table 和一个队列。已从队列写入和读取消息。
Q1。我可以向同一个队列再写一条消息并从中读取吗?如果是,我们该怎么做?因为我尝试将消息写入同一个队列但不能
Q2。如何将我的上述代码转换为发布-订阅?如何通过多次阅读同一条消息来测试它?
感谢任何帮助。
您的队列是具有两个注册订阅者("SUB1"、"SUB2")的多消费者队列。
1) 在没有指定订阅者的情况下将消息加入队列。
2) 消息将为所有订阅者复制。
可以通过 select consumer_name from aq$queue_table_name;
中的 select 进行检查
查询应该 return SUB1 和 SUB2。
3) 您不需要通知订阅者。所有订阅者都在队列中收听,他们正在等待他们的消息
Oracle 有非常有用的手册 Streams Advanced Queuing User's Guide
1) 登录到 oracle 数据库并创建用户。
CREATE USER jmsuser IDENTIFIED BY a;
GRANT DBA, AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE to jmsuser;
GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
GRANT EXECUTE ON DBMS_AQ TO jmsuser;
GRANT EXECUTE ON DBMS_LOB TO jmsuser;
GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;
2) Class 创建多消费者队列并为队列注册两个订阅者。 (ConnectionDefinition.getOracleConnection() return 到 oracle 的常规 jdbc 连接)
import java.sql.Connection;
import oracle.AQ.AQAgent;
import oracle.AQ.AQDriverManager;
import oracle.AQ.AQQueue;
import oracle.AQ.AQQueueProperty;
import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.AQ.AQSession;
/**
*
* @author alukasiewicz
*/
public class NewClass {
public static void main(String[] args) throws Exception {
Class.forName("oracle.AQ.AQOracleDriver");
Connection con = ConnectionDefinition.getOracleConnection();
AQSession aq_sess = AQDriverManager.createAQSession(con);
AQQueueTableProperty qtable_prop;
AQQueueProperty queue_prop;
AQQueueTable q_table;
AQQueue queue;
AQAgent subs1, subs2;
qtable_prop = new AQQueueTableProperty("SYS.AQ$_JMS_BYTES_MESSAGE");
qtable_prop.setMultiConsumer(true);
q_table = aq_sess.createQueueTable("jmsuser", "aq_table5", qtable_prop);
queue_prop = new AQQueueProperty();
queue = aq_sess.createQueue(q_table, "aq_queue5", queue_prop);
System.out.println("Successful createQueue");
System.out.println("Successful start queue");
subs1 = new AQAgent("GREEN", "", 0);
subs2 = new AQAgent("BLUE", "", 0);
queue.addSubscriber(subs2, null);
queue.addSubscriber(subs1, null);
queue.start();
}
}
3) Class 将消息发布到队列。
public class Publisher {
public static void main(String[] args) throws Exception {
Class.forName("oracle.AQ.AQOracleDriver");
Connection con = ConnectionDefinition.getOracleConnection();
TopicConnection tc_conn =AQjmsTopicConnectionFactory.createTopicConnection(con);
tc_conn.start();
TopicSession jms_sess = tc_conn.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic queueTopic= ((AQjmsSession )jms_sess).getTopic("JMSUSER","AQ_QUEUE5");
AQjmsTopicPublisher publisherAq = (AQjmsTopicPublisher)jms_sess.createPublisher(queueTopic);
BytesMessage messAll = jms_sess.createBytesMessage();
BytesMessage messOnlyForGreen = jms_sess.createBytesMessage();
messAll.writeUTF("Message for all subscribers");
messOnlyForGreen.writeUTF("Message only for green");
publisherAq.publish(messAll);
publisherAq.publish(messOnlyForGreen, new AQjmsAgent[]{new AQjmsAgent("GREEN", null)} );
con.commit();
tc_conn.close();
con.close();
}
}
在 oracle 中,您可以在队列中查看这些消息。两个绿色一个红色。
SELECT a.queue, a.msg_state, a.consumer_name FROM jmsuser.aq$aq_table5 a
4) Class 从队列中读取消息;
public class Subscriber {
public static void main(String[] args) throws Exception {
Class.forName("oracle.AQ.AQOracleDriver");
Connection con = ConnectionDefinition.getOracleConnection();
TopicConnection tc_conn = AQjmsTopicConnectionFactory.createTopicConnection(con);
TopicSession jms_sess = tc_conn.createTopicSession(true, Session.SESSION_TRANSACTED);
tc_conn.start();
Topic queueTopic = ((AQjmsSession) jms_sess).getTopic("jmsuser", "AQ_QUEUE5");
TopicSubscriber subGreen = (TopicSubscriber)((AQjmsSession) jms_sess).createDurableSubscriber(queueTopic, "GREEN");
TopicSubscriber subRed = (TopicSubscriber)((AQjmsSession) jms_sess).createDurableSubscriber(queueTopic, "RED");
Message msg = subGreen.receive(10);
System.err.println("Start receiving message for green subscriber");
while(msg != null){
System.err.println(" GREEN recive message "+ ((BytesMessage)msg).readUTF());
msg = subGreen.receive(10); // receive with timeout;
}
System.err.println("End receiving message for green subscriber");
System.err.println(" ");
System.err.println("Start receiving message for red subscriber");
BytesMessage byteMsg = (BytesMessage)msg;
msg = subRed.receive(10);
while(msg != null){
System.err.println(" RED recive message "+ ((BytesMessage)msg).readUTF());
msg = subRed.receive(10); // receive with timeout;
}
System.err.println("End receiving message for red subscriber");
con.commit();
tc_conn.close();
con.close();
}
}
5) Pom 依赖项
<dependencies>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.4</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>aqapi</artifactId>
<version>13</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>orai18n</artifactId>
<version>11.2.0.4</version>
</dependency>
</dependencies>
我正在实施 Oracle Advanced Queue 并且完全陌生。我对此有一些疑问。下面是我的代码:
package com;
/* Set up main class from which we will call subsequent examples and handle
exceptions: */
import java.sql.*;
import oracle.AQ.*;
public class test_aqjava
{
public static void main(String args[])
{
AQSession aq_sess = null;
try
{
aq_sess = createSession(args);
createAqTables(aq_sess);
enqueueMsg(aq_sess);
// dequeueMsg(aq_sess);
aq_sess.close();
/* now run the test: */
// runTest(aq_sess);
}
catch (Exception ex)
{
System.out.println("Exception-1: " + ex);
ex.printStackTrace();
}
}
public static AQSession createSession(String args[])
{
Connection db_conn;
AQSession aq_sess = null;
try
{
Class.forName("oracle.jdbc.driver.OracleDriver");
/* your actual hostname, port number, and SID will
vary from what follows. Here we use 'dlsun736,' '5521,'
and 'test,' respectively: */
db_conn =
DriverManager.getConnection(
"jdbc:oracle:thin:@hostname.com:1521:sid",
"USER", "USER");
System.out.println("JDBC Connection opened ");
db_conn.setAutoCommit(false);
/* Load the Oracle8i AQ driver: */
Class.forName("oracle.AQ.AQOracleDriver");
/* Creating an AQ Session: */
aq_sess = AQDriverManager.createAQSession(db_conn);
System.out.println("Successfully created AQSession ");
}
catch (Exception ex)
{
System.out.println("Exception: " + ex);
ex.printStackTrace();
}
return aq_sess;
}
public static void createAqTables(AQSession aq_sess) throws AQException
{
AQQueueTableProperty qtable_prop;
AQQueueProperty queue_prop;
AQQueueTable q_table;
AQQueue queue;
/* Creating a AQQueueTableProperty object (payload type - RAW): */
qtable_prop = new AQQueueTableProperty("RAW");
/* Creating a queue table called aq_table1 in aqjava schema: */
q_table = aq_sess.createQueueTable ("USER", "aq_table1", qtable_prop);
System.out.println("Successfully created aq_table1 in aqjava schema");
/* Creating a new AQQueueProperty object */
queue_prop = new AQQueueProperty();
/* Creating a queue called aq_queue1 in aq_table1: */
queue = aq_sess.createQueue (q_table, "aq_queue1", queue_prop);
System.out.println("Successfully created aq_queue1 in aq_table1");
/* Enable enqueue/dequeue on this queue: */
queue.start();
System.out.println("Successful start queue");
}
public static void enqueueMsg(AQSession aq_sess) throws AQException
{
AQQueueTable q_table;
AQQueue queue;
AQMessage message;
AQRawPayload raw_payload;
AQEnqueueOption enq_option;
String test_data = "new message";
byte[] b_array;
Connection db_conn;
db_conn = ((AQOracleSession)aq_sess).getDBConnection();
/* Get a handle to queue table - aq_table4 in aqjava schema: */
q_table = aq_sess.getQueueTable ("USER", "aq_table1");
System.out.println("Successful getQueueTable");
/* Get a handle to a queue - aq_queue4 in aquser schema: */
queue = aq_sess.getQueue ("USER", "aq_queue1");
System.out.println("Successful getQueue");
/* Creating a message to contain raw payload: */
message = queue.createMessage();
/* Get handle to the AQRawPayload object and populate it with raw data: */
b_array = test_data.getBytes();
raw_payload = message.getRawPayload();
raw_payload.setStream(b_array, b_array.length);
/* Creating a AQEnqueueOption object with default options: */
enq_option = new AQEnqueueOption();
/* Enqueue the message: */
queue.enqueue(enq_option, message);
try {
db_conn.commit();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void dequeueMsg(AQSession aq_sess) throws AQException
{
AQQueueTable q_table;
AQQueue queue;
AQMessage message;
AQRawPayload raw_payload;
AQDequeueOption deq_option;
byte[] b_array;
Connection db_conn;
db_conn = ((AQOracleSession)aq_sess).getDBConnection();
/* Get a handle to queue table - aq_table4 in aqjava schema: */
q_table = aq_sess.getQueueTable ("USER", "aq_table1");
System.out.println("Successful getQueueTable");
/* Get a handle to a queue - aq_queue4 in aquser schema: */
queue = aq_sess.getQueue ("USER", "aq_queue1");
System.out.println("Successful getQueue");
/* Creating a AQDequeueOption object with default options: */
deq_option = new AQDequeueOption();
/* Enqueue the message: */
message = queue.dequeue(deq_option);
raw_payload = message.getRawPayload();
b_array= raw_payload.getBytes();
String msg = new String(b_array);
System.out.println("Dequeue Msg "+msg);
try {
db_conn.commit();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
我创建了一个队列 Table 和一个队列。已从队列写入和读取消息。
Q1。我可以向同一个队列再写一条消息并从中读取吗?如果是,我们该怎么做?因为我尝试将消息写入同一个队列但不能
Q2。如何将我的上述代码转换为发布-订阅?如何通过多次阅读同一条消息来测试它?
感谢任何帮助。
您的队列是具有两个注册订阅者("SUB1"、"SUB2")的多消费者队列。
1) 在没有指定订阅者的情况下将消息加入队列。
2) 消息将为所有订阅者复制。
可以通过 select consumer_name from aq$queue_table_name;
中的 select 进行检查
查询应该 return SUB1 和 SUB2。
3) 您不需要通知订阅者。所有订阅者都在队列中收听,他们正在等待他们的消息
Oracle 有非常有用的手册 Streams Advanced Queuing User's Guide
1) 登录到 oracle 数据库并创建用户。
CREATE USER jmsuser IDENTIFIED BY a;
GRANT DBA, AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE to jmsuser;
GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
GRANT EXECUTE ON DBMS_AQ TO jmsuser;
GRANT EXECUTE ON DBMS_LOB TO jmsuser;
GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;
2) Class 创建多消费者队列并为队列注册两个订阅者。 (ConnectionDefinition.getOracleConnection() return 到 oracle 的常规 jdbc 连接)
import java.sql.Connection;
import oracle.AQ.AQAgent;
import oracle.AQ.AQDriverManager;
import oracle.AQ.AQQueue;
import oracle.AQ.AQQueueProperty;
import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.AQ.AQSession;
/**
*
* @author alukasiewicz
*/
public class NewClass {
public static void main(String[] args) throws Exception {
Class.forName("oracle.AQ.AQOracleDriver");
Connection con = ConnectionDefinition.getOracleConnection();
AQSession aq_sess = AQDriverManager.createAQSession(con);
AQQueueTableProperty qtable_prop;
AQQueueProperty queue_prop;
AQQueueTable q_table;
AQQueue queue;
AQAgent subs1, subs2;
qtable_prop = new AQQueueTableProperty("SYS.AQ$_JMS_BYTES_MESSAGE");
qtable_prop.setMultiConsumer(true);
q_table = aq_sess.createQueueTable("jmsuser", "aq_table5", qtable_prop);
queue_prop = new AQQueueProperty();
queue = aq_sess.createQueue(q_table, "aq_queue5", queue_prop);
System.out.println("Successful createQueue");
System.out.println("Successful start queue");
subs1 = new AQAgent("GREEN", "", 0);
subs2 = new AQAgent("BLUE", "", 0);
queue.addSubscriber(subs2, null);
queue.addSubscriber(subs1, null);
queue.start();
}
}
3) Class 将消息发布到队列。
public class Publisher {
public static void main(String[] args) throws Exception {
Class.forName("oracle.AQ.AQOracleDriver");
Connection con = ConnectionDefinition.getOracleConnection();
TopicConnection tc_conn =AQjmsTopicConnectionFactory.createTopicConnection(con);
tc_conn.start();
TopicSession jms_sess = tc_conn.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic queueTopic= ((AQjmsSession )jms_sess).getTopic("JMSUSER","AQ_QUEUE5");
AQjmsTopicPublisher publisherAq = (AQjmsTopicPublisher)jms_sess.createPublisher(queueTopic);
BytesMessage messAll = jms_sess.createBytesMessage();
BytesMessage messOnlyForGreen = jms_sess.createBytesMessage();
messAll.writeUTF("Message for all subscribers");
messOnlyForGreen.writeUTF("Message only for green");
publisherAq.publish(messAll);
publisherAq.publish(messOnlyForGreen, new AQjmsAgent[]{new AQjmsAgent("GREEN", null)} );
con.commit();
tc_conn.close();
con.close();
}
}
在 oracle 中,您可以在队列中查看这些消息。两个绿色一个红色。
SELECT a.queue, a.msg_state, a.consumer_name FROM jmsuser.aq$aq_table5 a
4) Class 从队列中读取消息;
public class Subscriber {
public static void main(String[] args) throws Exception {
Class.forName("oracle.AQ.AQOracleDriver");
Connection con = ConnectionDefinition.getOracleConnection();
TopicConnection tc_conn = AQjmsTopicConnectionFactory.createTopicConnection(con);
TopicSession jms_sess = tc_conn.createTopicSession(true, Session.SESSION_TRANSACTED);
tc_conn.start();
Topic queueTopic = ((AQjmsSession) jms_sess).getTopic("jmsuser", "AQ_QUEUE5");
TopicSubscriber subGreen = (TopicSubscriber)((AQjmsSession) jms_sess).createDurableSubscriber(queueTopic, "GREEN");
TopicSubscriber subRed = (TopicSubscriber)((AQjmsSession) jms_sess).createDurableSubscriber(queueTopic, "RED");
Message msg = subGreen.receive(10);
System.err.println("Start receiving message for green subscriber");
while(msg != null){
System.err.println(" GREEN recive message "+ ((BytesMessage)msg).readUTF());
msg = subGreen.receive(10); // receive with timeout;
}
System.err.println("End receiving message for green subscriber");
System.err.println(" ");
System.err.println("Start receiving message for red subscriber");
BytesMessage byteMsg = (BytesMessage)msg;
msg = subRed.receive(10);
while(msg != null){
System.err.println(" RED recive message "+ ((BytesMessage)msg).readUTF());
msg = subRed.receive(10); // receive with timeout;
}
System.err.println("End receiving message for red subscriber");
con.commit();
tc_conn.close();
con.close();
}
}
5) Pom 依赖项
<dependencies>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.4</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>aqapi</artifactId>
<version>13</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>orai18n</artifactId>
<version>11.2.0.4</version>
</dependency>
</dependencies>