Oracle Advanced Queuing (AQ) - механизм организации очередей. В простейшем случае создается тип, на основе типа специальным API создается таблица очереди. Очередь действует по принципу FIFO. Приведенный ниже листинг создает демонстрирует вышесказанное.
-- Создаем тип данных create or replace type XX_QUEUE_TYPE is object ( text varchar2(5) ) -- Таблица для очереди begin dbms_aqadm.create_queue_table( queue_table => 'XX_QUEUE_TBL', queue_payload_type => 'XX_QUEUE_TYPE' ); end; -- выборка записей из таблицы select * from XX_QUEUE_TBL; -- автоматически созданной представление вида aq$* select * from AQ$XX_QUEUE_TBL; -- Собственно очередь begin dbms_aqadm.create_queue( queue_name => 'XX_QUEUE_Q', queue_table => 'XX_QUEUE_TBL' ); end; -- Запуск очереди begin dbms_aqadm.start_queue( queue_name => 'XX_QUEUE_Q' ); end; -- Вставка 9 сообщений declare l_msg_id raw(32767); l_enq_opt dbms_aq.enqueue_options_t; l_msg_prop dbms_aq.message_properties_t; begin for i in 1..9 loop dbms_aq.enqueue( queue_name => 'XX_QUEUE_Q', enqueue_options => l_enq_opt, message_properties => l_msg_prop, payload => xx_queue_type('TEST'||to_char(i)), msgid => l_msg_id ); end loop; commit; end; -- проверяем, что внесли 9 сообщений select * from AQ$XX_QUEUE_TBL; -- Выборка 8 сообщений declare l_msg_id raw(16); l_deq_opt dbms_aq.dequeue_options_t; l_msg_prop dbms_aq.message_properties_t; l_payload xx_queue_type; begin for i in 1..8 loop dbms_aq.dequeue( queue_name => 'XX_QUEUE_Q', dequeue_options => l_deq_opt, message_properties => l_msg_prop, payload => l_payload, msgid => l_msg_id ); dbms_output.put_line('Got a message: '||l_payload.text); end loop; commit; end; -- проверяем, что осталось 1 сообщение select * from AQ$XX_QUEUE_TBL; -- очистка таблицы declare po dbms_aqadm.aq$_purge_options_t; begin po.block := FALSE; DBMS_AQADM.PURGE_QUEUE_TABLE( queue_table => 'XX_QUEUE_TBL', purge_condition => NULL, purge_options => po); end; -- проверяем, что осталось 0 сообщений select * from AQ$XX_QUEUE_TBL; /*остановка очереди, удаление очереди, удаление таблицы, удаление типа */ begin dbms_aqadm.stop_queue( queue_name => 'XX_QUEUE_Q' ); dbms_aqadm.drop_queue( queue_name => 'XX_QUEUE_Q' ); dbms_aqadm.drop_queue_table( queue_table => 'XX_QUEUE_TBL' ); execute immediate 'drop type XX_QUEUE_TYPE'; end;Создание очереди на Java:
public static void singleCunsumerAQTest() throws ClassNotFoundException { // vars Connection db_conn; AQSession aq_sess; AQQueueTableProperty qtable_prop; AQQueueProperty queue_prop; AQQueueTable q_table; AQQueue queue; Class.forName("oracle.jdbc.driver.OracleDriver"); // connect to DB try{ Connection connection = db_conn = DriverManager.getConnection("jdbc:oracle:thin:@SERV_URL:SERV_PORT:SID", "login", "pass"); connection.setAutoCommit(false); System.out.println("Successfully connected."); //Load the Oracle AQ driver: Class.forName("oracle.AQ.AQOracleDriver"); // create AQ session aq_sess = AQDriverManager.createAQSession(db_conn); System.out.println("Successfully created AQSession."); /* Creating a AQQueueTableProperty object (payload type - RAW): */ qtable_prop = new AQQueueTableProperty("RAW"); // single consumer qtable_prop.setMultiConsumer(false); /* Creating a queue table called aq_table1 in aqjava schema: */ q_table = aq_sess.createQueueTable("APPS", "XX_QUEUE_TBL", qtable_prop); System.out.println("Successfully created XX_QUEUE_TBL in APPS schema"); /* Creating a new AQQueueProperty object */ queue_prop = new AQQueueProperty(); /* Creating a queue called aq_queue1 in aq_table1 */ queue = aq_sess.createQueue (q_table, "XX_QUEUE_Q", queue_prop); System.out.println("Successfully created XX_QUEUE_Q in XX_QUEUE_TBL"); }catch (Exception ex) { System.out.println("Exception: " + ex); ex.printStackTrace(); } }
3 комментария :
Спасибо, изложено доступно
Возможно ли считать 1 запись одновременно 2мя процессами обратившимся к очереди одновременно?
Думаю, нет, поскольку теряется непротиворечивость данных
Отправить комментарий