Oracle Advanced Queuing (AQ) - механизм организации очередей. В простейшем случае создается тип, на основе типа специальным API создается таблица очереди. Очередь действует по принципу FIFO. Приведенный ниже листинг создает демонстрирует вышесказанное.
-- Создаем тип данных
create or replace type XX_QUEUE_TYPE is object
(
text varchar2(5)
)
-- Таблица для очереди
begin
dbms_aqadm.create_queue_table(
queue_table => 'XX_QUEUE_TBL',
queue_payload_type => 'XX_QUEUE_TYPE'
);
end;
-- выборка записей из таблицы
select * from XX_QUEUE_TBL;
-- автоматически созданной представление вида aq$*
select * from AQ$XX_QUEUE_TBL;
-- Собственно очередь
begin
dbms_aqadm.create_queue(
queue_name => 'XX_QUEUE_Q',
queue_table => 'XX_QUEUE_TBL'
);
end;
-- Запуск очереди
begin
dbms_aqadm.start_queue(
queue_name => 'XX_QUEUE_Q'
);
end;
-- Вставка 9 сообщений
declare
l_msg_id raw(32767);
l_enq_opt dbms_aq.enqueue_options_t;
l_msg_prop dbms_aq.message_properties_t;
begin
for i in 1..9 loop
dbms_aq.enqueue(
queue_name => 'XX_QUEUE_Q',
enqueue_options => l_enq_opt,
message_properties => l_msg_prop,
payload => xx_queue_type('TEST'||to_char(i)),
msgid => l_msg_id
);
end loop;
commit;
end;
-- проверяем, что внесли 9 сообщений
select * from AQ$XX_QUEUE_TBL;
-- Выборка 8 сообщений
declare
l_msg_id raw(16);
l_deq_opt dbms_aq.dequeue_options_t;
l_msg_prop dbms_aq.message_properties_t;
l_payload xx_queue_type;
begin
for i in 1..8 loop
dbms_aq.dequeue(
queue_name => 'XX_QUEUE_Q',
dequeue_options => l_deq_opt,
message_properties => l_msg_prop,
payload => l_payload,
msgid => l_msg_id
);
dbms_output.put_line('Got a message: '||l_payload.text);
end loop;
commit;
end;
-- проверяем, что осталось 1 сообщение
select * from AQ$XX_QUEUE_TBL;
-- очистка таблицы
declare
po dbms_aqadm.aq$_purge_options_t;
begin
po.block := FALSE;
DBMS_AQADM.PURGE_QUEUE_TABLE(
queue_table => 'XX_QUEUE_TBL',
purge_condition => NULL,
purge_options => po);
end;
-- проверяем, что осталось 0 сообщений
select * from AQ$XX_QUEUE_TBL;
/*остановка очереди,
удаление очереди,
удаление таблицы,
удаление типа */
begin
dbms_aqadm.stop_queue(
queue_name => 'XX_QUEUE_Q'
);
dbms_aqadm.drop_queue(
queue_name => 'XX_QUEUE_Q'
);
dbms_aqadm.drop_queue_table(
queue_table => 'XX_QUEUE_TBL'
);
execute immediate 'drop type XX_QUEUE_TYPE';
end;Создание очереди на Java:public static void singleCunsumerAQTest() throws ClassNotFoundException {
// vars
Connection db_conn;
AQSession aq_sess;
AQQueueTableProperty qtable_prop;
AQQueueProperty queue_prop;
AQQueueTable q_table;
AQQueue queue;
Class.forName("oracle.jdbc.driver.OracleDriver");
// connect to DB
try{
Connection connection =
db_conn = DriverManager.getConnection("jdbc:oracle:thin:@SERV_URL:SERV_PORT:SID", "login", "pass");
connection.setAutoCommit(false);
System.out.println("Successfully connected.");
//Load the Oracle AQ driver:
Class.forName("oracle.AQ.AQOracleDriver");
// create AQ session
aq_sess = AQDriverManager.createAQSession(db_conn);
System.out.println("Successfully created AQSession.");
/* Creating a AQQueueTableProperty object (payload type - RAW): */
qtable_prop = new AQQueueTableProperty("RAW");
// single consumer
qtable_prop.setMultiConsumer(false);
/* Creating a queue table called aq_table1 in aqjava schema: */
q_table = aq_sess.createQueueTable("APPS", "XX_QUEUE_TBL", qtable_prop);
System.out.println("Successfully created XX_QUEUE_TBL in APPS schema");
/* Creating a new AQQueueProperty object */
queue_prop = new AQQueueProperty();
/* Creating a queue called aq_queue1 in aq_table1 */
queue = aq_sess.createQueue (q_table, "XX_QUEUE_Q", queue_prop);
System.out.println("Successfully created XX_QUEUE_Q in XX_QUEUE_TBL");
}catch (Exception ex)
{
System.out.println("Exception: " + ex);
ex.printStackTrace();
}
}
3 комментария :
Спасибо, изложено доступно
Возможно ли считать 1 запись одновременно 2мя процессами обратившимся к очереди одновременно?
Думаю, нет, поскольку теряется непротиворечивость данных
Отправить комментарий