06.11.2014

Реализация очереди Oracle AQ

Oracle Advanced Queuing (AQ) - механизм организации очередей. В простейшем случае создается тип, на основе типа специальным API создается таблица очереди. Очередь действует по принципу FIFO. Приведенный ниже листинг создает демонстрирует вышесказанное.

-- Создаем тип данных
create or replace type XX_QUEUE_TYPE is object
(
  text varchar2(5)
)

-- Таблица для очереди
begin
  dbms_aqadm.create_queue_table(
    queue_table        => 'XX_QUEUE_TBL',
    queue_payload_type => 'XX_QUEUE_TYPE'
  );
end;

-- выборка записей из таблицы
select * from XX_QUEUE_TBL;
-- автоматически созданной представление вида aq$*
select * from AQ$XX_QUEUE_TBL;

-- Собственно очередь
begin
  dbms_aqadm.create_queue(
    queue_name  => 'XX_QUEUE_Q',
    queue_table => 'XX_QUEUE_TBL'
  );
end;

-- Запуск очереди
begin
  dbms_aqadm.start_queue(
    queue_name  => 'XX_QUEUE_Q'
  );
end;

-- Вставка 9 сообщений
declare
  l_msg_id raw(32767);
  l_enq_opt dbms_aq.enqueue_options_t;
  l_msg_prop dbms_aq.message_properties_t;
begin
  for i in 1..9 loop
    dbms_aq.enqueue(
      queue_name         => 'XX_QUEUE_Q',
      enqueue_options    => l_enq_opt,
      message_properties => l_msg_prop,
      payload            => xx_queue_type('TEST'||to_char(i)),
      msgid              => l_msg_id
    );
  end loop;
  commit;
end;

-- проверяем, что внесли 9 сообщений
select * from AQ$XX_QUEUE_TBL;

-- Выборка 8 сообщений
declare
  l_msg_id raw(16);
  l_deq_opt dbms_aq.dequeue_options_t;
  l_msg_prop dbms_aq.message_properties_t;
  l_payload xx_queue_type;
begin
  for i in 1..8 loop
    dbms_aq.dequeue(
      queue_name         => 'XX_QUEUE_Q',
      dequeue_options    => l_deq_opt,
      message_properties => l_msg_prop,
      payload            => l_payload,
      msgid              => l_msg_id
    );
    dbms_output.put_line('Got a message: '||l_payload.text);
  end loop;
  commit;
end;

-- проверяем, что осталось 1 сообщение
select * from AQ$XX_QUEUE_TBL;

-- очистка таблицы
declare
   po dbms_aqadm.aq$_purge_options_t;
begin
   po.block := FALSE;
   DBMS_AQADM.PURGE_QUEUE_TABLE(
     queue_table     => 'XX_QUEUE_TBL',
     purge_condition => NULL,
     purge_options   => po);
end;

-- проверяем, что осталось 0 сообщений
select * from AQ$XX_QUEUE_TBL;
 
/*остановка очереди,
  удаление очереди,
  удаление таблицы,
  удаление типа */       
begin
  dbms_aqadm.stop_queue(
       queue_name  => 'XX_QUEUE_Q'
       );
  dbms_aqadm.drop_queue(
       queue_name  => 'XX_QUEUE_Q'
       );     
  dbms_aqadm.drop_queue_table(
       queue_table => 'XX_QUEUE_TBL'
       );
  execute immediate 'drop type XX_QUEUE_TYPE';
end;
Создание очереди на Java:
public static void singleCunsumerAQTest() throws ClassNotFoundException {
 // vars
 Connection db_conn;
 AQSession aq_sess;
 AQQueueTableProperty     qtable_prop;
 AQQueueProperty          queue_prop;
 AQQueueTable             q_table;
 AQQueue                  queue;

Class.forName("oracle.jdbc.driver.OracleDriver");
// connect to DB
try{                                         
    Connection connection =
    db_conn = DriverManager.getConnection("jdbc:oracle:thin:@SERV_URL:SERV_PORT:SID", "login", "pass");
    connection.setAutoCommit(false);
    System.out.println("Successfully connected.");
   
    //Load the Oracle AQ driver:
    Class.forName("oracle.AQ.AQOracleDriver");
    // create AQ session
    aq_sess = AQDriverManager.createAQSession(db_conn);
    System.out.println("Successfully created AQSession."); 
   
    /* Creating a AQQueueTableProperty object (payload type - RAW): */
    qtable_prop = new AQQueueTableProperty("RAW");
    // single consumer
    qtable_prop.setMultiConsumer(false);
      
    /* Creating a queue table called aq_table1 in aqjava schema: */
    q_table = aq_sess.createQueueTable("APPS", "XX_QUEUE_TBL", qtable_prop);
    System.out.println("Successfully created XX_QUEUE_TBL in APPS schema"); 

    /* Creating a new AQQueueProperty object */
    queue_prop = new AQQueueProperty();
      
    /* Creating a queue called aq_queue1 in aq_table1 */
    queue = aq_sess.createQueue (q_table, "XX_QUEUE_Q", queue_prop);
    System.out.println("Successfully created XX_QUEUE_Q in XX_QUEUE_TBL"); 
}catch (Exception ex)
  {
     System.out.println("Exception: " + ex);
     ex.printStackTrace();
  }
}

3 комментария :

Unknown комментирует...

Спасибо, изложено доступно

Unknown комментирует...

Возможно ли считать 1 запись одновременно 2мя процессами обратившимся к очереди одновременно?

Aleksey Panin комментирует...

Думаю, нет, поскольку теряется непротиворечивость данных