Using advanced queueing v42.5.4.1

New Feature

Advanced queueing is available in JDBC 42.3.2.1 and later.

EDB Postgres Advanced Server advanced queueing provides message queueing and message processing for the EDB Postgres Advanced Server database. User-defined messages are stored in a queue, and a collection of queues is stored in a queue table. You must first create a queue table before creating a queue that depends on it.

On the server side, procedures in the DBMS_AQADM package create and manage message queues and queue tables. Use the DBMS_AQ package to add or remove messages from a queue or register or unregister a PL/SQL callback procedure. For more information about DBMS_AQ and DBMS_AQADM, see the EDB Postgres Advanced Server documentation.

On the client side, the application uses EDB-JDBC driver's JMS API to enqueue and dequeue message.

Enqueueing or dequeueing a message

For more information about using EDB Postgres Advanced Server's advanced queueing functionality, see Built-in packages.

Server-side setup

To use advanced queueing functionality on your JMS-based Java application, first create a user-defined type, queue table, and queue. Then start the queue on the database server. You can use either EDB-PSQL or EDB-JDBC JMS API in the Java application.

Using EDB-PSQL

Invoke EDB-PSQL and connect to the EDB Postgres Advanced Server host database. Use the following SPL commands at the command line.

Creating a user-defined type

To specify a RAW data type, create a user-defined type. This example creates a user-defined type named as mytype.

CREATE TYPE mytype AS (code int, project TEXT);

Create the queue table

A queue table can hold multiple queues with the same payload type. This example creates a table named MSG_QUEUE_TABLE.

EXEC DBMS_AQADM.CREATE_QUEUE_TABLE
      (queue_table => 'MSG_QUEUE_TABLE',
       queue_payload_type => 'mytype',
       comment => 'Message queue table');
END;

Create the queue

This example creates a queue named MSG_QUEUE in the table MSG_QUEUE_TABLE.

EXEC DBMS_AQADM.CREATE_QUEUE ( queue_name => 'MSG_QUEUE', queue_table => 'MSG_QUEUE_TABLE', comment => 'This queue contains pending messages.');

Start the queue

Once the queue is created, invoke the following SPL code at the command line to start a queue in the EDB database.

EXEC DBMS_AQADM.START_QUEUE(queue_name => 'MSG_QUEUE');
commit;

Using EDB-JDBC JMS API

The following JMS API calls perform the same steps performed using EDB-PSQL to:

  • Connect to the EDB Postgres Advanced Server database
  • Create the user-defined type
  • Create the queue table and queue
  • Start the queue
edbJmsFact = new EDBJmsConnectionFactory("localhost", 5444, "edb", "edb", "edb");

conn = (EDBJmsQueueConnection) edbJmsFact.createQueueConnection();
                        
session = (EDBJmsQueueSession) conn.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);            
            
String sql = "CREATE TYPE mytype AS (code int, project TEXT);";
UDTType udtType = new UDTType(conn.getConn(), sql, "mytype");
Operation operation = new UDTTypeOperation(udtType);
operation.execute();

queueTable = session.createQueueTable(conn.getConn(), "MSG_QUEUE_TABLE", "mytype", "Message queue table");
            
Queue queue1 = new Queue(conn.getConn(), "MSG_QUEUE", "MSG_QUEUE_TABLE", "Message Queue");
operation = new QueueOperation(queue1);
operation.execute();

queue = (EDBJmsQueue) session.createQueue("MSG_QUEUE");
queue.setEdbQueueTbl(queueTable);
            
queue.start();

Client-side example

After you create a user-defined type followed by queue table and queue, start the queue. Then, you can enqueue or dequeue a message using EDB-JDBC driver's JMS API.

Create a Java project and add the edb-jdbc18.jar from the edb-jdbc installation directory to its libraries.

Create a Java Bean corresponding to the type you created.

package mypackage;

import java.util.ArrayList;
import com.edb.aq.UDTType;

public class MyType extends UDTType {

    private int code;
    private String project;

    public MyType() {}

    /**
     * @return the code
     */
    public int getCode() {
        return code;
    }

    /**
     * @param code the code to set
     */
    public void setCode(int code) {
        this.code = code;
    }

    /**
     * @return the project
     */
    public String getProject() {
        return project;
    }

    /**
     * @param project the project to set
     */
    public void setProject(String project) {
        this.project = project;
    }
    /**
     * Override this method and call getter methods in the same order as in CREATE TYPE statement.
     * CREATE TYPE mytype AS (code int, project TEXT);
     * @return
     */
    @Override
    public Object[] getParamValues() {
        ArrayList<Object> params = new ArrayList<Object>();
        params.add(getCode());
        params.add(getProject());
        return params.toArray(); //To change body of generated methods, choose Tools | Templates.
    }
}

Enqueue and dequeue a message

To enqueue and dequeue a message:

  1. Create a JMS connection factory and create a queue connection.
  2. Create a queue session.
edbJmsFact = new EDBJmsConnectionFactory("localhost", 5445, "edb", "edb", "edb");

conn = (EDBJmsQueueConnection) edbJmsFact.createQueueConnection();

session = (EDBJmsQueueSession) conn.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);

queue = new EDBJmsQueue("MSG_QUEUE");

Enqueue a message

To enqueue a message:

  1. Create EDBJmsMessageProducer from the session.
  2. Create the enqueue message.
  3. Call the EDBJmsMessageProducer.send method.
messageProducer = (EDBJmsMessageProducer) session.createProducer(queue);

MyType udtType1 = new MyType();
udtType1.setProject("Test Omega");
udtType1.setCode(321);

udtType1.setName("mytype");

messageProducer.send(udtType1);

Dequeue a message

To dequeue a message:

  1. Create EDBJmsMessageConsumer from the session.
  2. Call the EDBJmsMessageConsumer.Receive method.
messageConsumer = (EDBJmsMessageConsumer) session.createConsumer(queue);
	
queue.setDequeue_mode(DequeueMode.BROWSE);
queue.setTypeName("mytype");
				
Message message = messageConsumer.receive();

A complete enqueue and dequeue program

This example shows enqueue and dequeue. User-defined type, queue table, and queue are created using EDB-PSQL, and the queue is started.

package mypackage;

import com.edb.aq.DequeueMode;
import com.edb.aq.operations.*;
import com.edb.jms.client.EDBJmsQueueConnection;
import com.edb.jms.client.EDBJmsConnectionFactory;
import com.edb.jms.client.EDBJmsMessageConsumer;
import com.edb.jms.client.EDBJmsMessageProducer;
import com.edb.jms.client.EDBJmsQueue;
import com.edb.jms.client.EDBJmsQueueSession;
import com.edb.jms.client.EDBQueueTable;
import java.sql.Connection;
import java.sql.DriverManager;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

public class JMSClient {

    public static void main(String args[]) throws JMSException {

        EDBJmsConnectionFactory edbJmsFact = null;
        EDBJmsQueueConnection conn = null;
        EDBJmsQueueSession session = null;
        EDBQueueTable queueTable = null;
        EDBJmsQueue queue = null;
        EDBJmsMessageProducer messageProducer = null;
        EDBJmsMessageConsumer messageConsumer = null;

        try {

            edbJmsFact = new EDBJmsConnectionFactory("localhost", 5444, "edb", "edb", "edb");

            conn = (EDBJmsQueueConnection) edbJmsFact.createQueueConnection();

            session = (EDBJmsQueueSession) conn.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);

            queue = (EDBJmsQueue) session.createQueue("MSG_QUEUE");
           
            messageProducer = (EDBJmsMessageProducer) session.createProducer(queue);

            MyType udtType1 = new MyType();
            udtType1.setProject("Test Omega");
            udtType1.setCode(321);

            udtType1.setName("mytype");

            messageProducer.send(udtType1);

            messageConsumer = (EDBJmsMessageConsumer) session.createConsumer(queue);
            
            queue.setDequeue_mode(DequeueMode.BROWSE);
            queue.setTypeName("mytype");
                        
            Message message = messageConsumer.receive();
            System.out.println("Received: " + message);
            
            message = messageConsumer.receive();
            
            System.out.println("Received: " + message);
        } catch (JMSException jmsEx) {
            System.out.println(jmsEx.getMessage());
        } finally {
            if(conn != null) {
                conn.close();
            }            
        }
    }
}

This example shows enqueue, dequeue, and creating the user-defined type, queue table, and queue. It also starts the queue.

package mypackage;

import com.edb.aq.DequeueMode;
import com.edb.aq.operations.*;
import com.edb.jms.client.EDBJmsQueueConnection;
import com.edb.jms.client.EDBJmsConnectionFactory;
import com.edb.jms.client.EDBJmsMessageConsumer;
import com.edb.jms.client.EDBJmsMessageProducer;
import com.edb.jms.client.EDBJmsQueue;
import com.edb.jms.client.EDBJmsQueueSession;
import com.edb.jms.client.EDBQueueTable;
import java.sql.Connection;
import java.sql.DriverManager;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

public class JMSClient {

    public static void main(String args[]) throws JMSException {

        EDBJmsConnectionFactory edbJmsFact = null;
        EDBJmsQueueConnection conn = null;
        EDBJmsQueueSession session = null;
        EDBQueueTable queueTable = null;
        EDBJmsQueue queue = null;
        EDBJmsMessageProducer messageProducer = null;
        EDBJmsMessageConsumer messageConsumer = null;

        try {

            edbJmsFact = new EDBJmsConnectionFactory("localhost", 5444, "edb", "edb", "edb");

            conn = (EDBJmsQueueConnection) edbJmsFact.createQueueConnection();

            session = (EDBJmsQueueSession) conn.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);

            String sql = "CREATE TYPE mytype AS (code int, project TEXT);";
            UDTType udtType = new UDTType(conn.getConn(), sql, "mytype");
            Operation operation = new UDTTypeOperation(udtType);
            operation.execute();

            queueTable = session.createQueueTable(conn.getConn(), "MSG_QUEUE_TABLE", "mytype", "Message queue table");
            Queue queue1 = new Queue(conn.getConn(), "MSG_QUEUE", "MSG_QUEUE_TABLE", "Message Queue");
            operation = new QueueOperation(queue1);
            operation.execute();
            queue = (EDBJmsQueue) session.createQueue("MSG_QUEUE");
            queue.setEdbQueueTbl(queueTable);
            queue.start();

            messageProducer = (EDBJmsMessageProducer) session.createProducer(queue);

            MyType udtType1 = new MyType();
            udtType1.setProject("Test Omega");
            udtType1.setCode(321);

            udtType1.setName("mytype");

            messageProducer.send(udtType1);

            messageConsumer = (EDBJmsMessageConsumer) session.createConsumer(queue);
            
            queue.setDequeue_mode(DequeueMode.BROWSE);
            queue.setTypeName("mytype");
                        
            Message message = messageConsumer.receive();
            System.out.println("Received: " + message);
            
            message = messageConsumer.receive();
            
            System.out.println("Received: " + message);
        } catch (JMSException jmsEx) {
            System.out.println(jmsEx.getMessage());
        } finally {
            if(conn != null) {
                conn.close();
            }            
        }
    }
}