博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ Spring 集成
阅读量:6678 次
发布时间:2019-06-25

本文共 6556 字,大约阅读时间需要 21 分钟。

概述

本文介绍如何在 Spring 框架下用消息队列 RocketMQ 收发消息。主要包括以下两部分内容:

  • 普通消息生产者和 Spring 集成
  • 消息消费者和 Spring 集成

测试流程

资源创建

1、创建实例、Topic及Group;

2、注意:如果程序在本地测试运行,请选择在公网区域创建。

代码测试

1、pom.xml

4.0.0
MavenSpringDemoMQ
MavenSpringDemoMQ
1.0-SNAPSHOT
5.0.8.RELEASE
org.springframework
spring-beans
${org.springframework.version}
org.springframework
spring-core
${org.springframework.version}
org.springframework
spring-context
${org.springframework.version}
org.springframework
spring-expression
${org.springframework.version}
org.springframework
spring-context-support
${org.springframework.version}
org.springframework
spring-context-indexer
${org.springframework.version}
org.springframework
spring-aop
${org.springframework.version}
org.springframework
spring-aspects
${org.springframework.version}
org.springframework
spring-instrument
${org.springframework.version}
com.aliyun.openservices
ons-client
1.7.9.Final

2、发送端配置文件producer.xml

********
********
http://MQ_INST_********_BaQUuiNE.mq-internet-access.mq-internet.aliyuncs.com:80

3、发送端代码

import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.Producer;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.exception.ONSClientException;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ProduceWithSpring {    public static void main(String[] args) {        /**         * 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中         */        ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");        Producer producer = (Producer) context.getBean("producer");        //循环发送消息        for (int i = 0; i < 100; i++) {            Message msg = new Message( //                    // Message 所属的 Topic                    "yutopic",                    // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤                    "TagSpring",                    // Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预                    // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式                    "Hello MQ".getBytes());            // 设置代表消息的业务关键属性,请尽可能全局唯一            // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发            // 注意:不设置也不会影响消息正常收发            msg.setKey("ORDERID_100");            // 发送消息,只要不抛异常就是成功            try {                SendResult sendResult = producer.send(msg);                assert sendResult != null;                System.out.println("send success: " + sendResult.getMessageId());            }catch (ONSClientException e) {                System.out.println("发送失败");            }        }        //关系producer        producer.shutdown();    }}```  4、消费端配置文件consumer.xml

<?xml version="1.0" encoding="UTF-8"?>

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
********
********
http://MQ_INST_********_BaQUuiNE.mq-internet-access.mq-internet.aliyuncs.com:80
GID_Spring

5、DemoMessageListener

import com.aliyun.openservices.ons.api.Action;

import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
public class DemoMessageListener implements MessageListener {

public Action consume(Message message, ConsumeContext context) {    System.out.println("Receive: " + message.getMsgID());    try {        //do something..        return Action.CommitMessage;    }catch (Exception e) {        //消费失败        return Action.ReconsumeLater;    }}

}`

6、消费端启动程序

import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ConsumeWithSpring {    public static void main(String[] args) {        /**         * 消费者 Bean 配置在 consumer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中         */        ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");        System.out.println("Consumer Started");    }}
测试效果
  • 发送端
    _
  • 消费端
    _

更多参考

转载地址:http://cggxo.baihongyu.com/

你可能感兴趣的文章
学习Sass之安装Sass(一)
查看>>
11111 - Generalized Matrioshkas
查看>>
MongoDB基础之 用户和数据库基于角色的访问控制
查看>>
DOS运用3_SVN文件清理
查看>>
MyApp
查看>>
SqlServer字段说明查询
查看>>
键盘监听,高度获取
查看>>
android ANR
查看>>
Shell - 简明Shell入门02 - 变量(Variable)
查看>>
Shell - 简明Shell入门06 - 循环语句(Loop)
查看>>
MySQL C#连接ySQL保存当前时间,时分秒都是0,只有日期
查看>>
Aras Innovator DB备份与还原
查看>>
Java设计模式-单例模式
查看>>
git合并多个commit
查看>>
[SCOI2007]修车
查看>>
对数学学习的几点看法
查看>>
陶哲轩实分析 引理 7.1.13 证明
查看>>
纯数学教程 Page 203 例XLI (3)
查看>>
数据库回滚(rollback)和撤销(undo)的区别
查看>>
Treap树
查看>>