Как работать с Java Message Service (JMS) API
Освойте основы работы с Java Message Service (JMS) API, узнайте о создании соединений и взаимодействии с сообщениями!
Java Message Service (JMS) API предоставляет простой и мощный способ работы с асинхронными сообщениями между компонентами в распределенных системах. В этой статье мы рассмотрим основы работы с JMS API и примеры использования.
Что такое JMS?
JMS — это стандартный Java API для отправки и получения сообщений между клиентами с использованием асинхронного шаблона сообщений. JMS поддерживает две модели взаимодействия:
- Point-to-Point (P2P) — сообщения отправляются от одного отправителя к одному получателю через очередь.
- Publish-Subscribe (Pub/Sub) — сообщения отправляются от одного отправителя ко многим подписчикам через тему.
Начало работы с JMS API
Для начала работы с JMS API необходимо выполнить следующие шаги:
- Создать соединение с JMS провайдером.
- Создать сессию для отправки и получения сообщений.
- Создать объекты для отправки и получения сообщений (Destination, MessageProducer и MessageConsumer).
- Отправить сообщение и/или получить сообщение.
Создание соединения
Для создания соединения с JMS провайдером используйте объект ConnectionFactory :
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection();
Создание сессии
Создайте сессию, используя соединение:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Создание объектов для отправки и получения сообщений
Создайте объекты для отправки и получения сообщений в зависимости от выбранной модели взаимодействия:
Point-to-Point (P2P)
Queue queue = session.createQueue("myQueue"); MessageProducer producer = session.createProducer(queue); MessageConsumer consumer = session.createConsumer(queue);
Publish-Subscribe (Pub/Sub)
Topic topic = session.createTopic("myTopic"); MessageProducer producer = session.createProducer(topic); MessageConsumer consumer = session.createConsumer(topic);
Отправка и получение сообщений
Отправьте сообщение с помощью объекта MessageProducer :
TextMessage message = session.createTextMessage("Hello, JMS!"); producer.send(message);
Получите сообщение с помощью объекта MessageConsumer :
Message receivedMessage = consumer.receive(1000); // timeout 1000ms if (receivedMessage instanceof TextMessage)
Заключение
Теперь вы знакомы с основами работы с Java Message Service (JMS) API. В этой статье мы рассмотрели основные понятия, создание соединения, сессии и объектов для отправки и получения сообщений, а также примеры отправки и получения сообщений. Продолжайте изучать JMS API, чтобы узнать о дополнительных возможностях, таких как фильтрация сообщений, обработка ошибок и использование транзакций. Удачи в обучении! 😉
Работа с JMS сообщениями и MDB в JEE
Работа с сообщениями подразумевает взаимодействие между компонентами системы посредством передачи сообщений. JMS позволяет реализовать это взаимодействие в java приложении, а MDB бины позволяют асинхронно обрабатывать получаемые сообщения на сервере приложений без дополнительных усилий по асинхронной обработке.
Ниже представлен простой пример обработки JMS сообщения с помощью MDB.
Немного теории
Для работы с сообщениями используется вспомогательное программное обеспечение, обычно входящее в поставку сервера приложений.
Компоненты системы могут посылать сообщения (producer) и получать их (consumer). Сообщение
отправляет producer на пункт назначения (destination), являющимся на сервере queue или topic, после чего consumer может забрать оттуда сообщение
В зависимости от того, какой тип имеет destination, разделяют две модели работы с сообщениями.
Первая модель — Point-to-Point
В случае если на сервере destination имеет тип queue, то сообщение, которое отправил producer, получает единственный consumer. Если на эту очередь сообщений подписано несколько получателей, то сообщение получит только один из них.
Вторая модель — Publish-subscribe
В случае если на сервере destination имеет тип topic, то одно сообщение может быть прочитано неограниченным количеством consumer, подписанных на этот на этот destination.
Структура JMS сообщения
Сообщение состоит из заголовка, поля свойств и тела.
Заголовок хранит мета информацию сообщения, заполняемую автоматически.
Поле свойств схоже с заголовком, но оно заполняется программно, и позже получатель сможет прочитать эту информацию.
Тело содержит полезную нагрузку сообщения. Тип нагрузки определяется при создании сообщения. Конкретные типы унаследованы от интерфейса javax.jms.Message
Создание очереди на сервере.
Для примера создадим topic на сервере. Использовать я буду glassfish 3.1.
Для начала создадим Connection Factory. Возможны несколько типов в зависимости от того, какой тип очереди сообщений будет использоваться.
Затем создаем destination с указание типа.
Создание отправителя сообщений
В данном случае producer будет находиться на сервере приложений. В случае если вам необходимо отправлять сообщения из отдельного клиента, то необходимо будет стандартным образом получить доступ к объектам по их JNDI имени из контекста.
//получаем ресурсы сервера для отправки сообщений @Resource(name="jms/TutorialPool") private ConnectionFactory connectionFactory; @Resource(name="jms/TutorialTopic") private Destination destination; public String getEnterString() < return enterString; >public void sendString(String enterString) < try < //создаем подключение Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage(); //добавим в JMS сообщение собственное свойство в поле сообщения со свойствами message.setStringProperty("clientType", "web clien"); //добавляем payload в сообщение message.setText(enterString); //отправляем сообщение producer.send(message); System.out.println("message sent"); //закрываем соединения session.close(); connection.close(); >catch (JMSException ex) < System.err.println("Sending message error"); ex.printStackTrace(); >>
Message-Driven Bean
Для обработки приходящих сообщений на сервере мы будем использовать MDB.
Сообщения можно было бы получать и обрабатывать и с помошью pojo, выступающего как consumer. Но использование MDB позволит параллельно обрабатывать сообщения, не заботясь о сложности асинхронной обработки и дополнительного кода для подписки на очередь сообщений.
Асинхронная обработка реализуется через пул объектов, из которых на обработку сообщения сервер выделят объекты при необходимости.
Для реализации MBD достаточно унаследовать бин от интерфейса javax.jms.MessageListener, реализуя метод onMessage(), и аннотировать соответствующим образом класс.
Сделаем пример MDB, который выводит в консоль сервера информацию о поступившем сообщении.
@MessageDriven( //имя topic, на который подписан бин mappedName="jms/TutorialTopic", name = "ExampleMDB") public class MDBExample implements MessageListener < //метод, вызываемый при получении нового сообщения @Override public void onMessage(Message msg) < try < TextMessage message = (TextMessage)msg; //считываем свойство из соответствующего поля, заданное вручную в consumer System.out.println("FROM MDB - client type IS " + message.getStringProperty("clientType")); //считываем само сообщение System.out.println("FROM MDB - payload IS" + message.getText()); >catch (JMSException ex) < ex.printStackTrace(); >> >
В onMessage метод добавляется необходимая бизнес логика, в зависимости от типа сообщения, его содержания и тд.
При необходимости, для ручной обработки сообщений можно самостоятельно создать обработчика.
Например так:
@Resource(name="jms/TutorialPool") private ConnectionFactory connectionFactory; @Resource(name="jms/TutorialTopic") private Destination destination; void onMessage() < try < Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(destination); connection.start(); while(true)< Message msg = consumer.receive(); //обработка сообщения >//закрыть connection > catch (JMSException ex) < ex.printStackTrace(); >>
Для более подробного изучения JMS и EJB в целом, могу рекомендовать книги:
EJB 3 in Action — Debu Panda, Reza Rahman, Derek Lane
Пару книг от Adam Bien
Знакомство с JMS 2.0
Не так давно, 12 июня 2013, миру был представлен релиз Java EE 7. Одним из ключевых моментов в этом релизе было появление JMS версии 2.0, которая не обновлялась с 2002 года.
Данный текст является вольным переводом начала статьи Найджела Дикина. Текст предназначен для ознакомления заинтересованного читателя с новым API.
API JMS 1.1 требует для работы достаточно много кода, но успело себя хорошо зарекомендовать, потому, с 2002 года не изменялось. В JMS 2.0 новое API призвано упростить отправку и прием JMS-сообщений. Прежнее API не упраздняется и продолжает работать наравне с новым.
В API JMS 2.0 появились новые интерфейсы: JMSContext, JMSProducer и JMSConsumer.
- JMSContext заменяет Connection и Session в классиеском JMS API
- JMSProducer — это легковесная замена MessageProducer-у, которая позволяет задавать настройки доставки сообщений, заголовки, свойства, через вызов цепочки методов (паттерн Строитель)
- JMSConsumer заменяет MessageConsumer и используется по такому же принципу
Отправка JMS
public void sendMessageJMS11(ConnectionFactory connectionFactory, Queue queue, String text) < try < Connection connection = connectionFactory.createConnection(); try < Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageProducer messageProducer = session.createProducer(queue); TextMessage textMessage = session.createTextMessage(text); messageProducer.send(textMessage); >finally < connection.close(); >> catch (JMSException ex) < // handle exception (details omitted) >>
public void sendMessageJMS20(ConnectionFactory connectionFactory, Queue queue, String text) < try (JMSContext context = connectionFactory.createContext();)< context.createProducer().send(queue, text); >catch (JMSRuntimeException ex) < // handle exception (details omitted) >>
- В версии 2.0 мы используем try-with-resources из JavaSE 7
- Параметр Session.AUTO_ACKNOWLEDGE устанавливается по умолчанию в JMSContext. Если требуется установить другое значение (CLIENT_ACKNOWLEDGE или DUPS_OK_ACKNOWLEDGE), оно передается как отдельный параметр
- Используется JMSContext вместо объектов Connection и Session
- Что бы создать TextMessage достаточно просто передать в метод send строку
Отличительной особенностью нового API является то, что его методы бросают RuntimeException — JMSRuntimeException, вместо checked-исключения JMSException. Это дает возможность при желании не обрабатывать JMS-исключения.
Синхронное получение JMS
public String receiveMessageJMS11(ConnectionFactory connectionFactory,Queue queue) < String body=null; try < Connection connection = connectionFactory.createConnection(); try < Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createConsumer(queue); connection.start(); TextMessage textMessage = (TextMessage)messageConsumer.receive(); body = textMessage.getText(); >finally < connection.close(); >> catch (JMSException ex) < // handle exception (details omitted) >return body; >
public String receiveMessageJMS20(ConnectionFactory connectionFactory,Queue queue) < String body=null; try (JMSContext context = connectionFactory.createContext();)< JMSConsumer consumer = context.createConsumer(queue); body = consumer.receiveBody(String.class); >catch (JMSRuntimeException ex) < // handle exception (details omitted) >return body; >
- Используется try-with-resources для автоматического закрытия соединения
- Используется JMSContext вместо объектов Connection и Session
- AUTO_ACKNOWLEDGE выставляется по умолчанию
- Обрабатывается JMSRuntimeException, который можно не обрабатывать, вместо JMSException в JMS 1.1
- connection.start() выполняется автоматически
- Строка получается автоматически через метод consumer.receiveBody(String.class), вместо получения объекта Message, приведения его к TextMessage и вызова метода getText
Асинхронное получение JMS
В JavaSE, что бы получать сообщения асинхронно, в JMS 1.1 используется следующий код:
MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(messageListener); connection.start();
В JMS 2.0 это выглядит так:
JMSConsumer consumer = context.createConsumer(queue); consumer.setMessageListener(messageListener);
Вместо MessageConsumer — JMSConsumer. Соединение стартует автоматически.
В Java EE Web или EJB приложениях, как и прежде, надо использовать message-driven bean, вместо метода setMessageListener
Вставка объекта JMSContext в Java EE приложении
В Java EE приложении JMSContext можно вставить посредством аннотации Inject. После вставки JMSContext будет находиться под управлением сервера приложений.
Следующий фрагмент кода позволяет вставлять JMSContext в session bean или сервлет.
@Inject @JMSConnectionFactory( "jms/connectionFactory") private JMSContext context; @Resource(lookup = "jms/dataQueue") private Queue dataQueue; public void sendMessageJavaEE7(String body)
Закрытие JMSContext производится автоматически сервером приложений. Если во время запроса выполняется JTA-транзакция, то JMSContext закроется автоматически после коммита, если без транзакции, то закроется в конце запроса.
На этом я решил остановиться. Для ознакомления и начала работы этой информации должно быть достаточно. Много детаей и дополнительной информации тут:
What’s New in JMS 2.0, Part One: Ease of Use
What’s New in JMS 2.0, Part Two—New Messaging Features