Oracle8
Server Application Developer's Guide Release 8.0 A54642_01 |
|
This chapter has three sections:
This introductory section:
Consider the following scenarios.
_______________________________________________________________________ A brokerage firm, Makers & Breakers, advertises to the public that its new service will let clients stipulate time as well as (or instead of) price as a parameter i.e. a request to buy or sell is not executed unless it takes place within a specific time period (e.g., within 15 minutes). The campaign is extremely successful and a mutual fund house, America's Standard Guarantee, takes advantage of the technology to offer its clients an opportunity to buy and sell units during course of the day rather than at the close of trading. However, M&B are informed by the Securities and Exchange Commission that they have received two complaints: (a) That in executing the buy/sell orders M&B are giving an unfair advantage to large customers, such as the mutual fund house. (b) That in managing the time parameter M&B are taking advantage of their customers e.g., in a falling market selling earlier in the time period than they inform their clients, and then pocketing the difference. The problem facing Makers & Breakers is not only to answer these unfounded charges but to do so quickly and in such a way as to leave no doubt in the minds of the public regarding the fairness of their practices. A large state university(35,000 students) decides to automate its class enrollment process. Students will be able to register for classes using web templates from home or at terminals on both the main and satellite campuses for any of the more than a thousand classes offered by Big U. The administration announces that the following parameters will apply: * Priority: registration is on a `first come' basis except that - seniors receive priority for upper level courses, followed by juniors, sophomores and frosh; - frosh receive priority for entry level courses, followed by a senior who needs the course to graduate. * Registration phases: the above priority criteria hold only for specific defined time phases e.g., in the second phase, seniors and juniors are treated as being on an equal `first come' basis with regard to upper- level classes, but continue to receive priority over juniors and frosh. * Full-time undergraduates must register for a minimum of three classes and may register for a maximum of four classes without special permission. Students may register for as many as ten classes (ranking their preferences 1-10) in case they are not admitted to their preferred classes, but only the first three choices are regarded as `live'. In the event that a class becomes full, the student's next choice becomes `live'. However, should a full class develop a vacancy, the student with the highest priority will be admitted, at that time being removed from the roll of a class of her/his lower preference. A power utility, Most Power, develops a sophisticated model in order to decide how to deploy its resources. The way the system works is that the utility gets ongoing reports from * numerous weather centers regarding current conditions, and * power stations regarding ongoing utilization. It then compares this information to historical data in order to predict demand for power in specific geographic areas for given time periods. A crucial part of this modeling has to do with noting the rapidity and degree of change in the incoming reports as weather changes and power is deployed. During a prolonged blizzard, matters are complicated by a failure of a power station which also forwards weather data. The question facing Most Power is whether to purchase power from a neighboring utility, organizing such an arrangement having a lead time of five days. _______________________________________________________________________
These scenarios all describe large-scale applications, but the problems they pose are familiar. Certainly, many people would be happy to endorse any institution that would allow them to buy/sell stocks sensitive to temporal fluctuations in the market, to register for classes without having to stand in line, to enjoy uninterrupted power supply during emergencies, and so on.
The primary problem is, of course, the complexity of information-handling. Each of these scenarios describes a situation in which messages come from and are distributed to multiple clients (nodes) in a distributed computing environment. Messages are not only passed back and forth between client and server but also are intricately interleaved between processes on the server that utilize database. Indeed, large-scale applications can be viewed as consisting of multi-step processes in which each step is triggered by one or more messages and gives rises to one or more messages. Another way of saying this is that messages are events that trigger other message-events.
Business Process Management, or Workflow, based on this notion of the interrelation of messages and events, is more and more recognized as a fundamental technology for emerging applications. Queuing is one of the key technologies for this class of application because it implements deferred execution of messages. This decoupling of `requests for service' from `supply of services' increases efficiency, and provides the infrastructure for complex scheduling.
Handling the intricacy message-passing is not the only problem. Unfortunately, networks, computing hardware, and software applications will all fail from time to time, as is the case in power utility scenario. Nevertheless, the ACID properties of the information must be preserved. Chaos would quickly follow if buy orders were `lost', or students were registered into the same class twice, or power were not distributed to an area that required it. In other words, messaging must be persistent. By integrating transaction processing with queuing technology, persistent messaging in the form of queuing is made possible. The importance of queuing has been proven by TP-monitors that typically include such a facility.
The persistence of messages that is required goes beyond the ability to recover information in the event of system failure. Applications may have to deal with multiple unprocessed messages arriving simultaneously from external clients or from programs internal to the application. If the system falls short in its capacity to deal with these messages immediately, the application must be able to store the messages until they can be processed. By the same token, external clients or internal programs may not be ready to receive messages that have been processed.
Even more important, applications must be able to deal with shifting priorities: messages arriving later may be of higher priority than messages arriving earlier; messages arriving earlier may have to wait for later messages before actions are executed; the same message may have to be accessed by different processes; and so on.
One crucial dimension of handling the dynamic aspect of message persistence has to do with windows of opportunity that grow and shrink. In the case of the share brokerage application, the window for completing the sale shrinks to nothing (i.e. offer to sell expires) from the time the offer to sell message is received. In the case of the student registration application, different priorities apply during different temporal phases, and data must be re-evaluated with the transition from one phase to another. And in the case of the power utility, the entire decision-making process depends on whether conditions are stable (the persistence of a large window) or dynamic (the rapid appearance and disappearance of windows).
What is true for all the scenarios is the time that messages are received or dispatched is a crucial part of the message. This means that the control component of the message - in this case, time markers - is as important as the payload data. Put another way: the message retains importance as a business asset after it has been executed.
Persistent messaging thus implies accurate documentation of messages
for analysis of historical patterns and future trends. For instance:
* The ability to retrieve the sequence of messages is absolutely critical
for the brokerage firm in the first scenario to refute the charges made
to the SEC. They must be able to show that the offer to sell made by client_A
was matched by the first available offer to buy by client_B.
* With regard to student registration, the withdrawal of a student from
a class which is full, requires (1) tracking-down the next student in line
based on priority, time-period and specified preference, (2) moving him/her
from a class in which he/she is registered into the available spot, and
(3) dealing with the resultant repercussions i.e. keeping track of the
relationships between messages and navigating from one message to another
based on queries
* In the case of the power utility, messages about weather and power utilization
need to be preserved over time so as to be able to analyze patterns by
querying message warehouses. The utility is specifically concerned with
time lapses between events e.g.,
- between distinguishing where power is needed and distributing the
power
- between sending the message to distribute power and the actual distribution.
What are the key requirements of a persistent messaging system given the above issues?
Generally, attempts to provide communication between programs can be classified into one of two types: Synchronous and Disconnected/Deferred Communication.
This model of communication, also called on-line or connected, is based on the request/reply paradigm. In this model a program sends a request to another program and waits (blocks) until the reply arrives. This model of communication, in which the sender and receiver of the message are tightly coupled, is suitable for programs that need to get a reply before they can proceed with any task. Traditional client/server architectures are based on this model.
The major drawback of the synchronous model of communication is that the programs must be available and running for the application to work. In the event of network or machine failure, or even if the program needed being busy, the entire application grinds to a halt.
In this model programs in the role of producers place requests in a queue and then proceed with their work. Programs in the role of consumers retrieve requests from the queue and acts on them. This model is well suited for applications that can continue with their work after placing a request in the queue because they are not blocked waiting for a reply. It is also suited to applications that can continue with their work until there is a message to retrieve.
For deferred execution to work correctly even in the presence of network, machine and application failures, the requests must be stored persistently, and processed exactly once. This can be achieved by combining persistent queuing with transaction protection. Oracle8 provides a queuing technology that does not depend on the use of TP-monitors or any other evolving Message-Oriented Middleware (MOM) infrastructure.
Oracle AQ (Oracle Advanced Queueing) provides message queuing as an integrated part of the Oracle server. Oracle AQ provides this functionality by integrating the queuing system with the database, thereby creating a message-enabled database. By providing an integrated solution Oracle AQ frees application developers to devote their efforts to their specific business logic rather than having to construct a messaging infrastructure.
A message is the smallest unit of information inserted into and retrieved from a queue. A message consists of control information (metadata) and payload (data). The control information represents message properties used by AQ to manage messages. The payload data is the information stored in the queue and is transparent to Oracle AQ. A message can reside in only one queue. A message is created by the enqueue call and consumed by the dequeue call.
A queue is a repository for messages. There are two types of queues: user queues, also known as normal queues, and exception queues. The user queue is for normal message processing. Messages are transferred to an exception queue if they can not be retrieved and processed for some reason. Queues can be created, altered, started, stopped, and dropped by using the Oracle AQ administrative interfaces.
Queues are stored in queue tables. Each queue table is a database table and contains one or more queues. Each queue table contains a default exception queue.
The following figure shows the relationship between messages, queues, and queue tables. The columns represent message queues, with rows representing individual messages.
An agent is a queue user. There are two types of agents: producers who place messages in a queue (enqueuing) and consumers who retrieve messages (dequeuing). Any number of producers and consumers may be accessing the queue at a given time. An agent is identified by its name, address and protocol. The address and protocol fields are reserved for future use. Agents insert messages into a queue and retrieve messages from the queue by using the Oracle AQ operational interfaces.
The time manager is a background process that monitors the messages in the queue. It provides the mechanism for message expiration, retry and delay.
At its most basic, one producer may enqueues different messages into one queue. Each message will be dequeued and processed once by one of the consumers. A message will stay in the queue until a consumer dequeues it or the message expires. A producer may stipulate a delay before the message is available to be consumed, and a time after which the message expires. Likewise, a consumer may wait when trying to dequeue a message if no message is available. Note that an agent program, or application, can act as both a producer and a consumer.
At a slightly higher level of complexity, many producers may enqueue messages into a queue, all of which are processed by one consumer.
In this next stage, many producers may enqueue messages, each message being processed by a different consumer depending on type and correlation identifier. The figure below shows this scenario.
A message can only be enqueued into one queue at a time. If a producer had to insert the same message into several queues in order to reach different consumers, this would require management of a very large number of queues. Oracle AQ provides two mechanisms to allow for multiple consumers to dequeue the same message: queue subscribers and message recipients. The queue must reside in a queue table that is created with multiple consumer option to allow for subscriber and recipient lists. Each message remains in the queue until it is consumed by all its intended consumers.
Using this approach, multiple consumer-subscribers are associated with a queue. This will cause all messages enqueued in the queue to be made available to be consumed by each of the queue subscribers. The subscribers to the queue can be changed dynamically without any change to the messages or message producers. Subscribers to the queue are added and removed by using the Oracle AQ administrative package. The diagram below shows multiple producers enqueuing messages into queue, each of which is consumed by multiple consumer-subscribers.
A message producer can submit a list of recipients at the time a message is enqueued. This allows for a unique set of recipients for each message in the queue. The recipient list associated with the message overrides the subscriber list associated with the queue, if there is one. The recipients need not be in the subscriber list.
Oracle AQ by Example guides users by means of a step-by-step approach.
/* Create user and grant privileges: */ CONNECT sys/change_on_install; CREATE user aq identified by AQ; GRANT AQ_ADMINISTRATOR_ROLE TO aq; GRANT CONNECT TO aq; GRANT RESOURCE TO aq; EXECUTE dbms_aqadm.grant_type_access(`aq'); CONNECT aq/AQ; SET ECHO ON; SET SERVEROUTPUT ON;
/* Create a message type: */ CREATE type aq.message_type as object ( subject VARCHAR2(30), text VARCHAR2(80)); /* Create a object type queue table and queue: */ EXECUTE dbms_aqadm.create_queue_table ( queue_table => `aq.msg', queue_payload_type => `aq.message_type'); EXECUTE dbms_aqadm.create_queue ( queue_name => `msg_queue', queue_table => `aq.msg'); EXECUTE dbms_aqadm.start_queue ( queue_name => `msg_queue');
/* Create a RAW type queue table and queue: */ EXECUTE dbms_aqadm.create_queue_table ( queue_table => 'aq.raw_msg', queue_payload_type => 'RAW'); EXECUTE dbms_aqadm.create_queue ( queue_name => 'raw_msg_queue', queue_table => 'aq.raw_msg'); EXECUTE dbms_aqadm.start_queue ( queue_name => 'raw_msg_queue');
/* Create a prioritized message queue table and queue: */ EXECUTE dbms_aqadm.create_queue_table ( queue_table => `aq.priority_msg', sort_list => `PRIORITY,ENQ_TIME', queue_payload_type => `aq.message_type'); EXECUTE dbms_aqadm.create_queue ( queue_name => `priority_msg_queue', queue_table => `aq.priority_msg'); EXECUTE dbms_aqadm.start_queue ( queue_name => `priority_msg_queue'); Create a queue table and queue /* Create a multiple consumer queue table and queue: */ EXECUTE dbms_aqadm.create_queue_table ( queue_table => `aq.msg_multiple', multiple_consumers => TRUE, queue_payload_type => `aq.message_type'); EXECUTE dbms_aqadm.create_queue ( queue_name => `msg_queue_multiple', queue_table => `aq.msg_multiple'); EXECUTE dbms_aqadm.start_queue ( queue_name => `msg_queue_multiple');
To enqueue a single message without any other parameters specify the queue name and the payload.
/* Enqueue to msg_queue: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN message := message_type(`NORMAL MESSAGE', `enqued to msg_queue first.'); dbms_aq.enqueue(queue_name => `msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; / /* Dequeue from msg_queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN dbms_aq.dequeue(queue_name => `msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line (`Message: ` || message.subject || ` ... ` || message.text ); COMMIT; END; /
#include <stdio.h> #include <string.h> #include <sqlca.h> #include <sql2oci.h> /* The header file generated by processing object type 'aq.message_type': */ #include "pceg.h" void sql_error(msg) char *msg; { EXEC SQL WHENEVER SQLERROR CONTINUE; printf("%s\n", msg); printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc); EXEC SQL ROLLBACK WORK RELEASE; exit(1); } main() { message_type *message = (message_type*)0; /* payload */ char user[60]="aq/AQ"; /* user logon password */ char subject[30]; /* components of the */ char txt[80]; /* payload type */ /* ENQUEUE and DEQUEUE to an OBJECT QUEUE */ /* Connect to database: */ EXEC SQL CONNECT :user; /* On an oracle error print the error number :*/ EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :"); /* Allocate memory for the host variable from the object cache : */ EXEC SQL ALLOCATE :message; /* ENQUEUE */ strcpy(subject, "NORMAL ENQUEUE"); strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); /* Initialize the components of message : */ EXEC SQL OBJECT SET SUBJECT, TEXT OF :message TO :subject, :txt; /* Embedded PLSQL call to the AQ enqueue procedure : */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; msgid RAW(16); BEGIN /* Bind the host variable 'message' to the payload: */ dbms_aq.enqueue(queue_name => 'msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work */ EXEC SQL COMMIT; printf("Enqueued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* Dequeue */ /* Embedded PLSQL call to the AQ dequeue procedure : */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Return the payload into the host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work :*/ EXEC SQL COMMIT; /* Extract the components of message: */ EXEC SQL OBJECT GET SUBJECT,TEXT FROM :message INTO :subject,:txt; printf("Dequeued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); } Enqueue and Dequeue of RAW Type Messages /* Enqueue a message containing a RAW: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message RAW(4096); BEGIN message := hextoraw(rpad('FF',4095,'FF')); dbms_aq.enqueue(queue_name => 'raw_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; /* Dequeue from raw_msg_queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message RAW(4096); BEGIN dbms_aq.dequeue(queue_name => 'raw_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; Enqueue and Dequeue of RAW Type Messages using Pro*C/C++ #include <stdio.h> #include <string.h> #include <sqlca.h> #include <sql2oci.h> void sql_error(msg) char *msg; { EXEC SQL WHENEVER SQLERROR CONTINUE; printf("%s\n", msg); printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc); EXEC SQL ROLLBACK WORK RELEASE; exit(1); } main() { OCIEnv *oeh; /* OCI Env handle */ OCIError *err; /* OCI Err handle */ OCIRaw *message= (OCIRaw*)0; /* payload */ ub1 message_txt[100]; /* data for payload */ char user[60]="aq/AQ"; /* user logon password */ int status; /* returns status of the OCI call */ /* Enqueue and dequeue to a RAW queue */ /* Connect to database: */ EXEC SQL CONNECT :user; /* On an oracle error print the error number: */ EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :"); /* Get the OCI Env handle: */ if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS) { printf(" error in SQLEnvGet \n"); exit(1); } /* Get the OCI Error handle: */ if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err, (ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0)) { printf(" error in OCIHandleAlloc %d \n", status); exit(1); } /* Enqueue */ /* The bytes to be put into the raw payload:*/ strcpy(message_txt, "Enqueue to a Raw payload queue "); /* Assign bytes to the OCIRaw pointer : Memory needs to be allocated explicitly to OCIRaw*: */ if (status=OCIRawAssignBytes(oeh, err, message_txt, 100, &message)) { printf(" error in OCIRawAssignBytes %d \n", status); exit(1); } /* Embedded PLSQL call to the AQ enqueue procedure : */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; msgid RAW(16); BEGIN /* Bind the host variable message to the raw payload: */ dbms_aq.enqueue(queue_name => 'raw_msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; /* Dequeue */ /* Embedded PLSQL call to the AQ dequeue procedure :*/ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Return the raw payload into the host variable 'message':*/ dbms_aq.dequeue(queue_name => 'raw_msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; }
When two messages are enqued with the same priority, the message which was enqued earlier will be dequeued first. However, if two messages are of different priorities, the message with the lower value (higher priority) will be dequeued first.
/* Enqueue two messages with priority 30 and 5: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN message := message_type(`PRIORITY MESSAGE', enqued at priority 30.'); message_properties.priority := 30; dbms_aq.enqueue(queue_name => `priority_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_type(`PRIORITY MESSAGE', `Enqueued at priority 5.'); message_properties.priority := 5; dbms_aq.enqueue(queue_name => `priority_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); END; / /* Dequeue from priority queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN dbms_aq.dequeue(queue_name => `priority_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line (`Message: ` || message.subject || ` ... ` || message.text ); COMMIT; dbms_aq.dequeue(queue_name => `priority_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line (`Message: ` || message.subject || ` ... ` || message.text ); COMMIT; END; / On return, the second message with priority set to 5 will be retrieved before the message with priority set to 30 since priority takes precedence over enqueue time.
An application can preview messages in browse mode or locked mode without deleting the message. The message of interest can then be removed from the queue.
/* Enqueue 6 messages to msg_queue - GREEN, GREEN, YELLOW, VIOLET, BLUE, RED */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN message := message_type(`GREEN', `GREEN enqueued to msg_queue first.'); dbms_aq.enqueue(queue_name => `msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_type(`GREEN', `GREEN also enqueued to msg_queue second.'); dbms_aq.enqueue(queue_name => `msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_type(`YELLOW', `YELLOW enqueued to msg_queue third.'); dbms_aq.enqueue(queue_name => `msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line (`Message handle: ` || message_handle); message := message_type(`VIOLET', `VIOLET enqueued to msg_queue fourth.'); dbms_aq.enqueue(queue_name => `msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_type(`BLUE', `BLUE enqueued to msg_queue fifth.'); dbms_aq.enqueue(queue_name => `msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_type(`RED', `RED enqueued to msg_queue sixth.'); dbms_aq.enqueue(queue_name => `msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; / /* Dequeue in BROWSE mode until RED is found, and remove RED from queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN dequeue_options.dequeue_mode := dbms_aq.BROWSE; LOOP dbms_aq.dequeue(queue_name => `msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line (`Message: ` || message.subject || ` ... ` || message.text ); EXIT WHEN message.subject = `RED'; END LOOP; dequeue_options.dequeue_mode := dbms_aq.REMOVE; dequeue_options.msgid := message_handle; dbms_aq.dequeue(queue_name => `msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line (`Message: ` || message.subject || ` ... ` || message.text ); COMMIT; END; / /* Dequeue in LOCKED mode until BLUE is found, and remove BLUE from queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN dequeue_options.dequeue_mode := dbms_aq.LOCKED; LOOP dbms_aq.dequeue(queue_name => `msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line (`Message: ` || message.subject || ` ... ` || message.text ); EXIT WHEN message.subject = `BLUE'; END LOOP; dequeue_options.dequeue_mode := dbms_aq.REMOVE; dequeue_options.msgid := message_handle; dbms_aq.dequeue(queue_name => `msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line (`Message: ` || message.subject || ` ... ` || message.text ); COMMIT; END; /
An enqueue can specify the time before which a message cannot be retrieved by a dequeue call. To do this, the producer (i.e the agent enqueuing the message) can also specify the time when a message expires, at which time the message is can use the parameter "delay" when enqueueing the message. The producer can also specify the time when a message expires, at which time the message is moved to an exception queue.
Note that expiration is calculated from the earliest dequeue time. So, if an application wants a message to be dequeued no earlier than a week from now, but no later than 3 weeks from now, this requires setting the expiration time for 2 weeks. This scenario is described in the following code segment.
/* Enqueue message for delayed availability: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN message := message_type(`DELAYED', `This message is delayed one week.'); message_properties.delay := 7*24*60*60; message_properties.expiration := 2*7*24*60*60; dbms_aq.enqueue(queue_name => `msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
#include <stdio.h> #include <string.h> #include <sqlca.h> #include <sql2oci.h> /* The header file generated by processing object type 'aq.message_type': */ #include "pceg.h" void sql_error(msg) char *msg; { EXEC SQL WHENEVER SQLERROR CONTINUE; printf("%s\n", msg); printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc); EXEC SQL ROLLBACK WORK RELEASE; exit(1); } main() { OCIEnv *oeh; /* OCI Env Handle */ OCIError *err; /* OCI Error Handle */ message_type *message = (message_type*)0; /* queue payload */ OCIRaw *msgid = (OCIRaw*)0; /* message id */ ub1 msgmem[16]=""; /* memory for msgid */ char user[60]="aq/AQ"; /* user login password */ char subject[30]; /* components of */ char txt[80]; /* message_type */ char correlation1[30]; /* message correlation */ char correlation2[30]; int status; /* code returned by the OCI calls */ /* Dequeue by correlation and msgid */ /* Connect to the database: */ EXEC SQL CONNECT :user; EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :"); /* Allocate space in the object cache for the host variable: */ EXEC SQL ALLOCATE :message; /* Get the OCI Env handle: */ if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS) { printf(" error in SQLEnvGet \n"); exit(1); } /* Get the OCI Error handle: */ if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err, (ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0)) { printf(" error in OCIHandleAlloc %d \n", status); exit(1); } /* Assign memory for msgid: Memory needs to be allocated explicitly to OCIRaw*: */ if (status=OCIRawAssignBytes(oeh, err, msgmem, 16, &msgid)) { printf(" error in OCIRawAssignBytes %d \n", status); exit(1); } /* First enqueue */ strcpy(correlation1, "1st message"); strcpy(subject, "NORMAL ENQUEUE1"); strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); /* Initialize the components of message: */ EXEC SQL OJECT SET SUBJECT, TEXT OF :message TO :subject, :txt; /* Embedded PLSQL call to the AQ enqueue procedure: */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; BEGIN /* Bind the host variable 'correlation1': to message correlation*/ message_properties.correlation := :correlation1; /* Bind the host variable 'message' to payload and return message id into host variable 'msgid': */ dbms_aq.enqueue(queue_name => 'msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message, msgid => :msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; printf("Enqueued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* Second enqueue */ strcpy(correlation2, "2nd message"); strcpy(subject, "NORMAL ENQUEUE2"); strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); /* Initialize the components of message: */ EXEC SQL OBJECT SET SUBJECT, TEXT OF :messsage TO :subject,:txt; /* Embedded PLSQL call to the AQ enqueue procedure: */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; msgid RAW(16); BEGIN /* Bind the host variable 'correlation2': to message correlaiton */ message_properties.correlation := :correlation2; /* Bind the host variable 'message': to payload */ dbms_aq.enqueue(queue_name => 'msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; printf("Enqueued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* First dequeue - by correlation */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Dequeue by correlation in host variable 'correlation2': */ dequeue_options.correlation := :correlation2; /* Return the payload into host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work : */ EXEC SQL COMMIT; /* Extract the values of the components of message: */ EXEC SQL OBJECT GET SUBJECT, TEXT FROM :message INTO :subject,:txt; printf("Dequeued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* SECOND DEQUEUE - by MSGID */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Dequeue by msgid in host variable 'msgid': */ dequeue_options.msgid := :msgid; /* Return the payload into host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; }
/* Create subscriber list: */ DECLARE subscriber sys.aq$_agent; /* Add subscribers RED and GREEN to the suscriber list: */ BEGIN subscriber := sys.aq$_agent(`RED', NULL, NULL); dbms_aqadm.add_subscriber(queue_name => `msg_queue_multiple', subscriber => subscriber); subscriber := sys.aq$_agent(`GREEN', NULL, NULL); dbms_aqadm.add_subscriber(queue_name => `msg_queue_multiple', subscriber => subscriber); END; / DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; recipients dbms_aq.aq$_recipient_list_t; message_handle RAW(16); message aq.message_type; /* Enqueue MESSAGE 1 for subscribers to the queue i.e. for RED and GREEN: */ BEGIN message := message_type(`MESSAGE 1', `This message is queued for queue subscribers.'); dbms_aq.enqueue(queue_name => `msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); /* Enqueue MESSAGE 2 for specified recipients i.e. for RED and BLUE.*/ message := message_type(`MESSAGE 2', `This message is queued for two recipients.'); recipients(1) := sys.aq$_agent(`RED', NULL, NULL); recipients(2) := sys.aq$_agent(`BLUE', NULL, NULL); message_properties.recipient_list := recipients; dbms_aq.enqueue(queue_name => `msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; /
Note that RED is both a subscriber to the queue, as well as being a specified recipient of MESSAGE 2. By contrast, GREEN is only a subscriber to those messages in the queue (in this case, MESSAGE) for which no recipients have been specified. BLUE, while not a subscriber to the queue, is nevertheless specified to receive MESSAGE 2.
/* Dequeue messages from msg_queue_multiple: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; no_messages exception; pragma exception_init (no_messages, -25228); BEGIN dequeue_options.wait := dbms_aq.NO_WAIT; /* Consumer BLUE will get MESSAGE 2: */ dequeue_options.consumer_name := `BLUE'; LOOP dbms_aq.dequeue(queue_name => `msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line (`Message: ` || message.subject || ` ... ` || message.text ); END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line (`No more messages for BLUE'); COMMIT; END; BEGIN /* Consumer RED will get MESSAGE 1 and MESSAGE 2: */ dequeue_options.consumer_name := `RED'; LOOP dbms_aq.dequeue(queue_name => `msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line (`Message: ` || message.subject || ` ... ` || message.text ); END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line (`No more messages for RED'); COMMIT; END; BEGIN /* Consumer GREEN will get MESSAGE 1: */ dequeue_options.consumer_name := `GREEN'; LOOP dbms_aq.dequeue(queue_name => `msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line (`Message: ` || message.subject || ` ... ` || message.text ); END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line (`No more messages for GREEN'); COMMIT; END; /
/* Cleans up all objects related to the object type: */ CONNECT aq/AQ; EXECUTE dbms_aqadm.stop_queue ( queue_name => `msg_queue'); EXECUTE dbms_aqadm.drop_queue ( queue_name => `msg_queue'); EXECUTE dbms_aqadm.drop_queue_table ( queue_table => `aq.msg'); /* Cleans up all objects related to the RAW type: */ EXECUTE dbms_aqadm.stop_queue ( queue_name => 'raw_msg_queue'); EXECUTE dbms_aqadm.drop_queue ( queue_name => 'raw_msg_queue'); EXECUTE dbms_aqadm.drop_queue_table ( queue_table => 'aq.raw_msg'); /* Cleans up all objects related to the priority queue: */ EXECUTE dbms_aqadm.stop_queue ( queue_name => `priority_msg_queue'); EXECUTE dbms_aqadm.drop_queue ( queue_name => `priority_msg_queue'); EXECUTE dbms_aqadm.drop_queue_table ( queue_table => `aq.priority_msg'); /* Cleans up all objects related to the multiple-consumer queue: */ EXECUTE dbms_aqadm.stop_queue ( queue_name => `msg_queue_multiple'); EXECUTE dbms_aqadm.drop_queue ( queue_name => `msg_queue_multiple'); EXECUTE dbms_aqadm.drop_queue_table ( queue_table => `aq.msg_multiple'); drop type aq.message_type;
CONNECT sys/change_on_install; drop user aq;
This section contains a detailed description of the technical specifications:
- Init ora Parameter
- Data Structures
- Agent
- Message Properties
- Queue Options
- Operational Interface
- Administrative Interface
- Administration Topics
- Data Objects
A parameter called aq_tm_processes should be specified in the init.ora parameter file if you want to perform time monitoring on queue messages. This will be used for messages which have delay and expiration properties specified. This parameter can be set to 0 or 1. Setting it to any other number will result in an error. If this parameter is set to 1, one time manager process will be created as a background process to monitor the messages. If the parameter is not specified or is set to 0, then the time manager process is not created. The administrative interfaces to start and stop the time manager are only valid if the time manager process is started as part of instance startup by specifying this parameter.
Parameter Name: aq_tm_processes
Parameter Type: integer
Parameter Class: Dynamic
Allowable Values: 0 or 1
Syntax: aq_tm_processes = <either 0 or 1>
Name of process: ora_aqtm_<oracle sid>
Example: aq_tm_processes = 1
The following data structures are used in the operational and administrative interfaces.
Naming of database objects. This naming convention applies to queues, queue tables and object types.
object_name := VARCHAR2 object_name := [<schema_name>.]<name>
Names for objects are specified by an optional schema name and a name. If the schema name is not specified then the current schema is assumed. The schema name and the name can each be up to 24 characters long.
type_name := VARCHAR2 type_name := <object_type> | "RAW"
Type Name
TYPE sys.aq$_agent IS OBJECT (
name VARCHAR2(30), address VARCHAR2(30), protocol NUMBER)
Parameter | Description |
---|---|
name |
Name of a producer or consumer of a message. |
address |
Reserved for future use. |
protocol |
Reserved for future use. |
The Message Properties describe the information that is used by AQ to manage individual messages. These are set at enqueue time and their values are returned at dequeue time.
TYPE message_properties_t IS RECORD (
priority BINARY_INTEGER default 1, delay BINARY_INTEGER default NO_DELAY, expiration BINARY_INTEGER default NEVER, correlation VARCHAR2(128) default NULL, attempts BINARY_INTEGER, recipient_list aq$_recipient_list_t, exception_queue VARCHAR2(51) default NULL, enqueue_time date, state BINARY_INTEGER)
TYPE aq$_recipient_list_t IS TABLE OF sys.aq$_agent
INDEX BY BINARY_INTEGER
Usage:
TYPE enqueue_options_t IS RECORD (
visibility BINARY_INTEGER default ON_COMMIT, relative_msgid RAW(16) default NULL, sequence_deviation BINARY_INTEGER default NULL)
TYPE dequeue_options_t IS RECORD (
consumer_name VARCHAR2(30) default NULL, dequeue_mode BINARY_INTEGER default REMOVE, navigation BINARY_INTEGER default NEXT_MESSAGE, visibility BINARY_INTEGER default ON_COMMIT, wait BINARY_INTEGER default FOREVER msgid RAW(16) default NULL, correlation VARCHAR2(128) default NULL)
The following interface calls are available to enqueue and dequeue messages from queues.
Adds a message to the specified queue. In the simplest case, if the user wants to enqueue a message, without any other parameters, only the queue name and the payload have to be specified.
DBMS_AQ.ENQUEUE (
queue_name IN VARCHAR2, enqueue_options IN enqueue_options_t, message_properties IN message_properties_t, payload IN "<type_name>", msgid OUT RAW)
Usage:
The sequence_deviation parameter in enqueue_options can be used to change the order of processing between two messages. The identity of the other message, if any, is specified by the enqueue_options parameter relative_msgid. The relationship is identified by the sequence_deviation parameter.
Specifying sequence_deviation for a message introduces some restrictions
for the delay and priority values that can be specified for this message.
The delay of this message has to be less than or equal to the delay of
the message before which this message is to be enqueued. The priority of
this message has to be
greater than or equal to the priority of the message before which this
message is to be enqueued.
Dequeues a message from the specified queue.
DBMS_AQ.DEQUEUE (
queue_name IN VARCHAR2, dequeue_options IN dequeue_options_t, message_properties OUT message_properties_t, payload OUT "<type_name>", msgid OUTraw)
The search criteria for messages to be dequeued is determined by the consumer_name, msgid and correlation parameters in the dequeue_options. Msgid uniquely identifies the message to be dequeued. Correlation identifiers are application-defined identifiers that are not interpreted by AQ.
Only messages in the READY state are dequeued unless a msgid is specified.
The dequeue order is determined by the values specified at the time the queue table is created unless overridden by the msgid and correlation id in dequeue_options.
The database consistent read mechanism is applicable for queue operations. For example, a BROWSE call may not see a message that is enqueued after the beginning of the browsing transaction.
When using enumerated constants such as BROWSE, LOCKED, REMOVE, the PL/SQL constants need to be specified with the scope of the packages defining it. All types associated with the operational interfaces have to be prepended with dbms_aq . For example:
dbms_aq.BROWSE
Configuration information can be managed through procedures in the DBMS_AQADM package. Because incorrect usage of the administration interface can have substantial performance impact on the database system, the administration interface should be treated as privileged commands, and only the designated queue administrator or privileged users should be granted access to the administration package. Initially, only SYS has the execution privilege for the procedures in DBMS_AQADM and DBMS_AQ.
Access to AQ operations are granted to users through roles. These roles provide execution privileges on the AQ procedures. Currently, we do not support fine grained access control at the database object level. This implies that a user with the AQ_USER_ROLE can enqueue and dequeue to any queue in the system.
AQ_ADMINISTRATOR_ROLE grants execute privileges to procedures in the DBMS_AQADM and DBMS_AQ packages. These include all the administrative and operational interfaces. The user `SYS' must grant the AQ_ADMINISTRATOR_ROLE to the AQ administrator.
AQ_USER_ROLE grants execute privileges to procedures in the DBMS_AQ packages. These include all the operational interfaces. The AQ administrator must grant the AQ_USER_ROLE to AQ users.
The procedure grant_type_access must first be executed by the user `SYS' to grant access for AQ object types to the AQ administrator. The AQ administrator can then execute this procedure to grant access for AQ object types to other AQ users. The procedure needs to be executed if the user wishes to perform any administrative operation involving a multiple consumer queue. These include CREATE_QUEUE_TABLE, CREATE_QUEUE, ADD_SUBSCRIBER and REMOVE_SUBSCRIBER.
PROCEDURE grant_type_access (user_name IN VARCHAR2);
1. Scott is appointed as the AQ administrator.
CONNECT sys/change_on_install GRANT AQ_ADMINISTRATOR_ROLE to scott with admin option; execute dbms_aqadm.grant_type_access(`scott');
2. Scott lets Jones use AQ.
CONNECT scott/tiger GRANT AQ_USER_ROLE to jones;
3. Jones wishes to create queue tables that are enabled for multiple dequeues.
CONNECT scott/tiger execute dbms_aqadm.grant_type_access(`jones');
Create a queue table for messages of a pre-defined type. The sort keys for dequeue ordering, if any, need to be defined at table creation time. The following objects are created at this time:
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table IN VARCHAR2, queue_payload_type IN VARCHAR2, storage_clause IN VARCHAR2 default NULL, sort_list IN VARCHAR2 default NULL, multiple_consumers IN BOOLEAN default FALSE, message_grouping IN BINARY_INTEGER default NONE, comment IN VARCHAR2 default NULL, auto_commit IN BOOLEAN default TRUE)
Table 11-9: DBMS_AQADM.CREATE_QTABLE
Create a queue in the specified queue table. All queue names must be unique within a schema. Once a queue is created with CREATE_QUEUE, it can be enabled by calling START_QUEUE. By default, the queue is created with both enqueue and dequeue disabled.
DBMS_AQADM.CREATE_QUEUE (
queue_name IN VARCHAR2, queue_table IN VARCHAR2, queue_type IN BINARY_INTEGER default NORMAL_QUEUE, max_retries IN NUMBER default 0, retry_delay IN NUMBER default 0, retention_time IN NUMBER default 0, dependency_tracking IN BOOLEAN default FALSE, comment IN VARCHAR2 default NULL, auto_commit IN BOOLEAN default TRUE)
Usage:
Drop an existing queue table. All the queues in a queue table have to be stopped and dropped before the queue table can be dropped.
DBMS_AQADM.DROP_QUEUE_TABLE (
queue_table IN VARCHAR2, force IN BOOLEAN default FALSE, auto_commit IN BOOLEAN default TRUE)
Table 11-11: DBMS_AQADM.DROP_QUEUE_TABLE
Drops an existing queue. DROP_QUEUE is not allowed unless STOP_QUEUE has been called to disable the queue for both enqueuing and dequeuing. All the queue data is deleted as part of the drop operation.
DBMS_AQADM.DROP_QUEUE (
queue_name IN VARCHAR2, auto_commit IN BOOLEAN default TRUE)
Table 11-12: DBMS_AQADM.DROP_QUEUE
Alter existing properties of a queue. Only max_retries, retry_delay, and retention_time can be altered.
DBMS_AQADM.ALTER_QUEUE (
queue_name IN VARCHAR2, max_retries IN NUMBER default NULL, retry_delay IN NUMBER default NULL, retention_time IN NUMBER default NULL, auto_commit IN BOOLEAN default TRUE)
Table 11-13: DBMS_AQADM.ALTER_QUEUE
Enables the specified queue for enqueuing and/or dequeueing. After creating a queue the administrator must use START_QUEUE to enable the queue. The default is to enable it for both ENQUEUE and DEQUEUE. Only dequeue operations are allowed on an exception queue. This operation takes effect when the call completes and does not have any transactional characteristics.
DBMS_AQADM.START_QUEUE (
queue_name IN VARCHAR2, enqueue IN BOOLEAN default TRUE, dequeue IN BOOLEAN default TRUE)
Table 11-14: DBMS_AQADM.START_QUEUE
Disables enqueuing and/or dequeuing on the specified queue. By default, it disables both ENQUEUEs or DEQUEUEs. A queue cannot be stopped if there are outstanding transactions against the queue. This operation takes effect when the call completes and does not have any transactional characteristics.
DBMS_AQADM.STOP_QUEUE (
queue_name IN VARCHAR2, enqueue IN BOOLEAN default TRUE, dequeue IN BOOLEAN default TRUE, wait IN BOOLEAN default TRUE)
Table 11-15: DBMS_AQADM.STOP_QUEUE
To start the time manager process.
DBMS_AQADM.START_TIME_MANAGER;
This command causes the time manager process to start executing its operations. The physical process has to be started at database startup time by setting the aq_tm_process init.ora parameter to 1. This operation takes effect when the call completes and does not have any transactional characteristics.
To stop the time manager process.
DBMS_AQADM.STOP_TIME_MANAGER;
The command causes the time manager to stop executing all its operations. The physical process is not terminated. This operation takes effect when the call completes and does not have any transactional characteristics.
Add a default subscriber to a queue. A program can enqueue messages to a specific list of recipients or to the default list of subscribers. This operation will only succeed on queues that allow multiple consumers. This operation takes effect immediately and the containing transaction is committed. Enqueue requests that are executed after the completion of this call will reflect the new behavior. The user must have been granted type access by executing the grant_type_access procedure.
DBMS_AQADM.ADD_SUBSCRIBER(
queue_namein VARCHAR2, subscriberinsys.aq$_agent)
Table 11-16: DBMS_AQADM.ADD_SUBSCRIBER
Parameter | Description |
---|---|
(IN varchar2) |
Specifies the name of the queue |
(IN aq$_agent) |
See definition in section titled `Agent' |
Remove a default subscriber from a queue. This operation takes effect immediately and the containing transaction is committed. All references to the subscriber in existing messages are removed as part of the operation. The user must have been granted type access by executing the grant_type_access procedure.
DBMS_AQADM.REMOVE_SUBSCRIBER(
queue_name IN VARCHAR2, subscriber IN sys.aq$_agent)
Parameter | Description |
---|---|
(IN VARCHAR2) |
Specifies the name of the queue |
(IN aq$_agent) |
See definition in section titled `Agent' |
When using enumerated constants such as BROWSE, LOCKED, REMOVE, the symbol needs to be specified with the scope of the packages defining it. All types associated with the administrative interfaces have to be prepended with dbms_aqadm. For example:
dbms_aqadm.NORMAL_QUEUE
Table 11-18: Enumerated types in the administrative interface
Parameter | Options |
---|---|
retention |
INFINITE |
message_grouping |
TRANSACTIONAL, NONE |
queue_type |
NORMAL_QUEUE, EXCEPTION_QUEUE |
Queues are stored in database tables. The performance characteristics of queue operations are very similar to the underlying database operations.
To understand the performance characteristics of queues it is important to understand the tables and index layout for AQ objects.
Creating a queue table creates a database table with approximately 25 columns. These columns store the AQ meta data and the user defined payload. The payload can be of an object type or RAW. The AQ meta data contains object types and scaler types. A view and two indexes are created on the queue table. The view allows users to query the message data. The indexes are used to accelerate access to message data. Please refer to the create queue table command for a detailed description of the objects created.
The code path of an enqueue operation is comparable to an insert into a multi-column table with two indexes. The code path of a dequeue operation is comparable to a select and delete operation on a similar table. These operations are performed using PL/SQL functions.
Oracle Parallel Server (OPS) can be used to ensure highly available access to queue data. Queues are implemented using database tables. The tail and the head of a queue can be extreme hot spots. Since OPS does not scale well in the presence of hot spots it is recommended to limit normal access to a queue from one instance only. In case of an instance failure messages managed by the failed instance can be processed immediately by one of the surviving instances.
Queue operation scalability is similar to the underlying database operation scalability. If a dequeue operation with wait option is issued in a Multi-Threaded Server (MTS) environment the shared server process will be dedicated to the dequeue operation for the duration of the call including the wait time. The presence of many such processes could cause severe performance and availability problems and could result in deadlocking the shared server processes. For this reason it is recommended that dequeue requests with wait option be only issued via dedicated server processes. This restriction is not enforced.
The standard database reliability and recoverability characteristics apply to queue data.
Enterprise manager supports GUIs for some of the administrative functions listed in the administrative interfaces section.
These include:
Queues are implemented on tables. The import/export of queues constitutes the import/export of the underlying queue tables and related dictionary tables. Import and export of queues can only be done at queue table granularity.
When a queue table is exported, both the table definition information and the queue data are exported. When a queue table is imported, export action procedures will maintain the queue dictionary. Because the queue table data is also exported, the user is responsible for maintaining application-level data integrity when queue table data are being transported.
Importing queue data into a queue table with existing data is not recommended. During a table mode import, if the queue table already exists at the import site the old queue table definition, and the old queue definition will be dropped and recreated. Hence, queue table and queue definitions prior to the import will be lost.
For every queue table that supports multiple recipients, there is a index-organized table (IOT) that contains important queue metadata. This metadata is essential to the operations of the queue, so the user must export and import this IOT as well as the queue table for the queues in this table to work after import. When the schema containing the queue table is exported, the IOT is also automatically exported. The behavior is similar at import time. Because the metadata table contains rowids of some rows in the queue table, import will issue a note about the rowids being obsolete when importing the metadata table. This message can be ignored, as the queueing system will automatically correct the obsolete rowids as a part of the import process. However, if another problem is encountered while doing the import (such as running out of rollback segment space), the problem should be corrected and the import should be rerun.
This is a view of the queue table in which message data is stored. This view is automatically created with each queue table and is called aq$<queue_table_name>. This view should be used for querying the queue data. The dequeue history data (time, user identification and transaction identification) is only valid for single consumer queues. For dequeue history of messages in a multiple consumer queue please refer to a following section.
The administrator can use any SQL statement or SQL tool to analyze and review the content of a queue or queue table. SQL provides full access to the message metadata and/or payload. Use ENQ_TXN_ID and DEQ_TXN_ID to correlate transactions. If the ENQ_TXN_ID of message m2 is the same as the DEQ_TXN_ID of m1, m2 is created in the transaction that consumed m1. (You may use CONNECT BY in your SQL statements to identify related messages). Remove retained messages that are not automatically removed by AQ. Do not update or modify messages since this may destroy the consistency of the queue metadata. Before you use SQL to correct any error in AQ, please contact the Oracle service representative.
EXPIRATION - number of seconds in which the message will expire after being READY
This view describes the names and types of all queue tables created in the database.
This view is the same as DBA_QUEUES_TABLES with the exception that it only shows queue tables in the user's schema. It does not contain a column for OWNER.
Users can specify operational characteristics for individual queues. DBA_QUEUES contains the view which contains relevant information for every queue in a database.
RETRY_DELAY - number of seconds before retry can be attempted
RETENTION - number of seconds message is retained after dequeue
This view is the same as DBA_QUEUES with the exception that it only shows queues in the user's schema. It does not contain a column for OWNER.
To get a list of subscribers for a queue.
DBMS_AQADM.QUEUE_SUBSCRIBERS(
queue_name IN VARCHAR2)
The function returns a PL/SQL table of aq$_agent. This can be used to get the list of all subscribers for a queue.
Example:
DECLARE
subs dbms_aqadm.aq$_subscriber_list_t; nsubs BINARY_INTEGER; i BINARY_INTEGER;
subs := dbms_aqadm.queue_subscribers('Q1DEF'); nsubs := subs.COUNT; FOR i IN 0..nsubs-1 LOOP
dbms_output.put_line(subs(i).name);
/
The queue table view provides the dequeue history for single consumer queue messages. To query the list of recipients or the dequeue history of a message in a multiple-consumer queue you need to execute a SQL query on the queue table for the message of interest.
For example, to view the dequeue history of the message with msgid
`105E7A2EBFF11348E03400400B40F149' in queue table sys.queue_tab
the following query must be executed. The query will return one row per
consumer of the message.
SELECT consumer, transaction_id, deq_time, deq_user FROM THE(select cast(history as sys.aq$_dequeue_history_t)
FROM sys.queue_tab WHERE msgid='105E7A2EBFF11348E03400400B40F149');
The error messages for AQ are reported in two ranges:
The following demos may be found in the related directories:
$ORACLE_HOME/demo/aqdemo00.sql Main driver of demo $ORACLE_HOME/demo/aqdemo01.sql Create queue tables and queues using AQ administration interface $ORACLE_HOME/demo/aqdemo02.sql Load the demo package $ORACLE_HOME/demo/aqdemo03.sql Submit the event handler as a job to Job Queue $ORACLE_HOME/demo/aqdemo04.sql Enqueue messages
|
Copyright © 1997 Oracle Corporation. All Rights Reserved. |
|