本文共 6556 字,大约阅读时间需要 21 分钟。
本文介绍如何在 Spring 框架下用消息队列 RocketMQ 收发消息。主要包括以下两部分内容:
1、创建实例、Topic及Group;
2、注意:如果程序在本地测试运行,请选择在公网区域创建。1、pom.xml
4.0.0 MavenSpringDemoMQ MavenSpringDemoMQ 1.0-SNAPSHOT 5.0.8.RELEASE org.springframework spring-beans ${org.springframework.version} org.springframework spring-core ${org.springframework.version} org.springframework spring-context ${org.springframework.version} org.springframework spring-expression ${org.springframework.version} org.springframework spring-context-support ${org.springframework.version} org.springframework spring-context-indexer ${org.springframework.version} org.springframework spring-aop ${org.springframework.version} org.springframework spring-aspects ${org.springframework.version} org.springframework spring-instrument ${org.springframework.version} com.aliyun.openservices ons-client 1.7.9.Final
2、发送端配置文件producer.xml
******** ******** http://MQ_INST_********_BaQUuiNE.mq-internet-access.mq-internet.aliyuncs.com:80
3、发送端代码
import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.Producer;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.exception.ONSClientException;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ProduceWithSpring { public static void main(String[] args) { /** * 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中 */ ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml"); Producer producer = (Producer) context.getBean("producer"); //循环发送消息 for (int i = 0; i < 100; i++) { Message msg = new Message( // // Message 所属的 Topic "yutopic", // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤 "TagSpring", // Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预 // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式 "Hello MQ".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一 // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发 // 注意:不设置也不会影响消息正常收发 msg.setKey("ORDERID_100"); // 发送消息,只要不抛异常就是成功 try { SendResult sendResult = producer.send(msg); assert sendResult != null; System.out.println("send success: " + sendResult.getMessageId()); }catch (ONSClientException e) { System.out.println("发送失败"); } } //关系producer producer.shutdown(); }}``` 4、消费端配置文件consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">******** ******** http://MQ_INST_********_BaQUuiNE.mq-internet-access.mq-internet.aliyuncs.com:80 GID_Spring
5、DemoMessageListener
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.MessageListener;public class DemoMessageListener implements MessageListener {public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message.getMsgID()); try { //do something.. return Action.CommitMessage; }catch (Exception e) { //消费失败 return Action.ReconsumeLater; }}
}`
6、消费端启动程序
import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ConsumeWithSpring { public static void main(String[] args) { /** * 消费者 Bean 配置在 consumer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中 */ ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml"); System.out.println("Consumer Started"); }}
转载地址:http://cggxo.baihongyu.com/