Oracle8i Application Developer's Guide - Fundamentals Release 8.1.5 A68003-01 |
|
Because the database is the most significant resource of information within the enterprise, Oracle created a publish-subscribe solution for enterprise information delivery and messaging to complement this role. Topics in this chapter include:
Networking technologies and products now enable a high degree of connectivity across a large number of computers, applications, and users. In these environments, it is important to provide asynchronous communications for the class of distributed systems that operate in a loosely-coupled and autonomous fashion, and which require operational immunity from network failures. This requirement has been filled by various middleware products that are characterized as messaging, message oriented middleware (MOM), message queuing, or publish-subscribe.
Applications which communicate through a publish and subscribe paradigm require the sending applications (publishers) to publish messages without explicitly specifying recipients or having knowledge of intended recipients. Similarly, receiving applications (subscribers) must receive only those messages that the subscriber has registered an interest in.
This decoupling between senders and recipients is usually accomplished by an intervening entity between the publisher and the subscriber, which serves as a level of indirection. This intervening entity is a queue which is used to represent a subject or channel. Figure 15-1 illustrates publish and subscribe functionality.
A subscriber subscribes to a queue by expressing interest in messages enqueued to that queue and by using a subject- or content-based rule as a filter. This results in a set of rule-based subscriptions associated with a given queue.
At runtime, publishers post messages to various queues. The queue (in other words, the delivery mechanisms of the underlying infrastructure) then delivers messages that match the various subscriptions to the appropriate subscribers.
Oracle includes the following infrastructure and features to support database-enabled publish-subscribe messaging:
Database events support declarative definitions for publishing database events, detection, and run-time publication of such events. This feature enables active publication of information to end-users in an event-driven manner, to complement the traditional pull-oriented approaches to accessing information.
Oracle Advanced Queuing supports a queue-based publish-subscribe paradigm. Database queues serve as a durable store for messages, along with capabilities to allow publish and subscribe based on queues. A rules-engine and subscription service dynamically route messages to recipients based on expressed interest. This allows decoupling of addressing between senders and receivers to complement the existing explicit sender-receiver message addressing.
Client notifications support asynchronous delivery of messages to interested subscribers. This enables database clients to register interest in certain queues, and it enables these clients to receive notifications when publications on such queues occur. Asynchronous delivery of messages to database clients is in contrast to the traditional polling techniques used to retrieve information.
This section describes various concepts related to publish-subscribe.
A queue is an entity that supports the notion of named subjects of interest. Queues can be characterized as:
The underlying queue infrastructure pushes the messages published to connected clients in a lightweight, at-best-once, manner.
Queues serve as durable containers for messages. Messages are delivered in a deferred and reliable mode.
Publishers and subscribers are internally represented as agents. There is a distinction between an agent and a client.
An agent is a persistent logical subscribing entity that expresses interest in a queue via a subscription. An agent has properties, such as an associated subscription, an address, and a delivery mode for messages. In this context, an agent is an electronic proxy for a publisher or subscriber.
A client is a transient physical entity. The attributes of a client include the physical process where the client programs run, the node name, and the client application logic. There could be several clients acting on behalf of a single agent. Also, the same client, if authorized, can act on behalf of multiple agents.
A rule on a queue is specified as a conditional expression using a predefined set of operators on the message format attributes or on the message header attributes. Each queue has an associated message content format that describes the structure of the messages represented by that queue. The message format may be unstructured (RAW
) or it may have a well-defined structure (ADT). This allows both subject- or content-based subscriptions.
Subscribers (agents) may specify subscriptions on a queue using a rule. Subscribers are durable and are stored in a catalog.
The database represents a significant source for publishing information. An event framework is proposed to allow declarative definition of database event publication. As these pre-defined events occur, the framework detects and publishes such events. This allows active delivery of information to end-users in an event-driven manner as part of the publish-subscribe capability.
Registration is the process of associated delivery information by a given client, acting on behalf of an agent. There is an important distinction between the subscription and registration related to the agent/client separation.
Subscription indicates an interest in a particular queue by an agent. It does not specify where and how delivery must occur. Delivery information is a physical property that is associated with a client, and it is a transient manifestation of the logical agent; i.e., the subscriber. A specific client process acting on behalf of an agent registers delivery information by associating a host and port, indicating where the delivery should be done, and a callback, indicating how there delivery should be done.
Publishers publish messages to queues by using the appropriate queuing interfaces. The interfaces may depend on which model the queue is implemented on. For example, an enqueue call represents the publishing of a message.
When a message is posted or published to a given queue, a rules engine extracts the set of candidate rules from all rules defined on that queue that match the published message.
Corresponding to the list of candidate rules on a given queue, the set of subscribers that match the candidate rules can be evaluated. In turn, the set of agents corresponding to this subscription list can be determined and notified.
The queue notifies all registered clients of the appropriate published messages. This concept is called posting. When the queue needs to notify all interested clients, it posts the message to all registered clients.
A subscriber may receive messages via any of the following mechanisms:
Scenario: This example shows how system events, client notification, and AQ work together to implement publish-subscribe.
pubsub
, with all objects necessary to support a publish-subscribe mechanism. In this particular code, the Agent snoop
subscribe to messages that are published at logon events. Note that the user pubsub
needs AQ_ADMINISTRATOR_ROLE
privileges to use AQ functionalities.
Rem ------------------------------------------------------ REM create queue table for persistent multiple consumers: Rem ------------------------------------------------------ CONNECT pubsub/pubsub; Rem Create or replace a queue table BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => 'Pubsub.Raw_msg_table', Multiple_consumers => TRUE, Queue_payload_type => 'RAW', Compatible => '8.1'); END; / Rem ------------------------------------------------------ Rem Create a persistent queue for publishing messages: Rem ------------------------------------------------------ Rem Create a queue for logon events begin BEGIN DBMS_AQADM.CREATE_QUEUE( Queue_name => 'Pubsub.Logon', Queue_table => 'Pubsub.Raw_msg_table', Comment => 'Q for error triggers'); END; / Rem ------------------------------------------------------ Rem Start the queue: Rem ------------------------------------------------------ BEGIN DBMS_AQADM.START_QUEUE('pubsub.logon'); END; / Rem ------------------------------------------------------ Rem define new_enqueue for convenience: Rem ------------------------------------------------------ CREATE OR REPLACE PROCEDURE New_enqueue( Queue_name IN VARCHAR2, Payload IN RAW , Correlation IN VARCHAR2 := NULL, Exception_queue IN VARCHAR2 := NULL) AS Enq_ct DBMS_AQ.Enqueue_options_t; Msg_prop DBMS_AQ.Message_properties_t; Enq_msgid RAW(16); Userdata RAW(1000); BEGIN Msg_prop.Exception_queue := Exception_queue; Msg_prop.Correlation := Correlation; Userdata := Payload; DBMS_AQ.ENQUEUE(Queue_name, Enq_ct, Msg_prop, Userdata, Enq_msgid); END; / Rem ------------------------------------------------------ Rem add subscriber with rule based on current user name, Rem using correlation_id Rem ------------------------------------------------------ DECLARE Subscriber Sys.Aq$_agent; BEGIN Subscriber := sys.aq$_agent('SNOOP', NULL, NULL); DBMS_AQADM.ADD_SUBSCRIBER( Queue_name => 'Pubsub.logon', Subscriber => subscriber, Rule => 'CORRID = ''SCOTT'' '); END; / Rem ------------------------------------------------------ Rem create a trigger on logon on database: Rem ------------------------------------------------------ Rem create trigger on after logon: CREATE OR REPLACE TRIGGER pubsub.Systrig2 AFTER LOGON ON DATABASE BEGIN New_enqueue('Pubsub.Logon', HEXTORAW('9999'), Dbms_standard.login_user); END; /
ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; /* callback function for notification of logon of user 'scott' on database: */ ub4 notifySnoop(ctx, subscrhp, pay, payl, desc, mode) dvoid *ctx; OCISubscription *subscrhp; dvoid *pay; ub4 payl; dvoid *desc; ub4 mode; { printf("Notification : User Scott Logged on\n"); } int main() { OCISession *authp = (OCISession *) 0; OCISubscription *subscrhpSnoop = (OCISubscription *)0; /***************************************************** Initialize OCI Process/Environment Initialize Server Contexts Connect to Server Set Service Context ******************************************************/ /* Registration Code Begins */ /* Each call to initSubscriptionHn allocates and Initialises a Registration Handle */ initSubscriptionHn( &subscrhpSnoop, /* subscription handle */ "ADMIN:PUBSUB.SNOOP", /* subscription name */ /* <agent_name>:<queue_name> */ (dvoid*)notifySnoop); /* callback function */ /***************************************************** The Client Process does not need a live Session for Callbacks End Session and Detach from Server ******************************************************/ OCISessionEnd ( svchp, errhp, authp, (ub4) OCI_DEFAULT); /* detach from server */ OCIServerDetach( srvhp, errhp, OCI_DEFAULT); while (1) /* wait for callback */ sleep(1); } void initSubscriptionHn (subscrhp, subscriptionName, func) OCISubscription **subscrhp; char* subscriptionName; dvoid * func; { /* allocate subscription handle: */ (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); /* set subscription name in handle: */ (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) subscriptionName, (ub4) strlen((char *)subscriptionName), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); /* set callback function in handle: */ (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) func, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) 0, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CTX, errhp); /* set namespace in handle: */ (void) OCIAttrSet((dvoid *) *subscrhp, (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 1, errhp, OCI_DEFAULT)); }
Now, if user SCOTT
logged on to the database, the client is notified, and the call back function notifySnoop
is called.