文章目錄
  1. 1. 一、RabbitMQ与Spring的配置
    1. 1.1. 1.1 RabbitMQ依赖的Jar包
    2. 1.2. 1.2 RaabbitMQ 配置文件
  2. 2. 二、实际使用
    1. 2.1. 2.1 消息实体转为Json
    2. 2.2. 2.2 JMSTemplate的使用
      1. 2.2.1. 2.2.1 关于失控的JMS代码
      2. 2.2.2. 2.2.2 关于AMQPTemplate的使用
        1. 2.2.2.1. 2.2.2.1 装配JMS模版
        2. 2.2.2.2. 2.2.2.2 向RabbitMQService层注入JMS模版
    3. 2.3. 2.3 RabbitMQService接口
    4. 2.4. 2.4 使用Spring容器注入RabbitMQService实现

摘要:一个平台的前台系统与后台多个系统间需要消息交互与处理,特别是异构系统,RabbitMQ能够很轻松的解决问题。消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)。

一、RabbitMQ与Spring的配置

 项目说明:目前不需要接收从后台过来的信息,所以本地不配置RabbitMQ Server与 Erlang(虽然确实是存在环境,但未使用),以后肯定也是要上的。

1.1 RabbitMQ依赖的Jar包

1
2
3
4
5
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>

1.2 RaabbitMQ 配置文件

详见本博客(文档)

实际操作Spring+RabbitMQ遇到的问题以及解决方法

二、实际使用

2.1 消息实体转为Json

 我们使用Json来传输消息,必须要在spring-rabbitmq.xml 配置文件中配置如下:

1
2
<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<bean id="jsonMessageConverter" class="com.nist.targetsystem.rabbitmq.FastJsonMessageConverter"></bean>

 这样配置完成之后,经由RabbitMQ传送的实体类会先将其转为Json字节码的形式进行传送,这样前后台系统可以较为轻松的解析这些消息。

 这里我自己使用Fastjson封装了一个Json的解析类,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.nist.targetsystem.rabbitmq;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

import java.io.UnsupportedEncodingException;

/**
* Created by huangguoxin on 16/4/5.
*/

public class FastJsonMessageConverter extends AbstractMessageConverter{
private static Logger log = LoggerFactory.getLogger(FastJsonMessageConverter.class);
public static final String DEFAULT_CHARSET = "UTF-8";
private volatile String defaultCharset = DEFAULT_CHARSET;
public FastJsonMessageConverter() {

super();

//init();

}
public void setDefaultCharset(String defaultCharset) {

this.defaultCharset = (defaultCharset != null) ? defaultCharset

: DEFAULT_CHARSET;

}
public Object fromMessage(Message message)

throws MessageConversionException {


return null;

}
public <T> T fromMessage(Message message,T t) {
String json = "";
try {
json = new String(message.getBody(),defaultCharset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return (T) JSON.parseObject(json, t.getClass());
}
protected Message createMessage(Object objectToConvert,
MessageProperties messageProperties)

throws MessageConversionException {

byte[] bytes = null;
try {
String jsonString = JSON.toJSONString(objectToConvert);
bytes = jsonString.getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(this.defaultCharset);
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);
}
}

2.2 JMSTemplate的使用

2.2.1 关于失控的JMS代码

 以下的代码是不使用JMS模版而编写的消息队列发送代码,这是不可取的,如同传统的JDBC代码在处理链接,语句,结果集和异常时的冗长和繁杂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import javax.jms.*;  
import javax.naming.*;

public class TestJavaxJmsSendMain{

public void sendMessage() {
try{
//这是自定义的所需传输的对方,必须进行序列化
MessageObject mo = new MessageObject();
mo.setKey("testKey");
mo.setTitle("标题");
mo.setContent("内容");
mo.setDescription("描述测试");
System.out.println("Send Start");
InitialContext initCtx = new InitialContext();
Context envContext = (Context)initCtx.lookup("java:comp/env");
ConnectionFactory connectionFactory = (ConnectionFactory)envContext.lookup("jms/ConnectionFactory");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, 1);
MessageProducer producer = session.createProducer((Destination)envContext.lookup("jms/queue/MyQueue"));
// MessageProducer producer = session.createProducer((Destination)envContext.lookup("jms/queue/MyQueue"));

//ObjectMessage可以用来传输对象
//ObjectMessage的属性可以用来设置查询的条件
//ObjectMessage.setObject(Object)用来设置需要传输的对象
ObjectMessage testMessage = session.createObjectMessage();
testMessage.setJMSType("ObjectSendTest");
testMessage.setStringProperty("testKey", "testValue");
testMessage.setStringProperty("JMSXGroupID", "QS_10/3/11");
testMessage.setObject(mo);
//发送的两种方式
// producer.send(message);
// producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
producer.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 10000);
System.out.println("Send End");
} catch(NamingException e){
System.out.println("NamingException");
e.printStackTrace();
}catch(JMSException e){
System.out.println("JMSException");
e.printStackTrace();
}
}
}

2.2.2 关于AMQPTemplate的使用

2.2.2.1 装配JMS模版

1
2
<!-- spring template声明-->
<rabbit:template exchange="message" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>

2.2.2.2 向RabbitMQService层注入JMS模版

 使用JMS模版,我们每次使用JMS就不需要做重复工作。
 以下的实现类实现了设置消息和发送功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* RabbitMQService实现类
* Created by huangguoxin on 16/4/5.
*/

@Service("rabbitMQService")
public class RabbitMQServiceImpl implements RabbitMQService {

private static Logger logger = LoggerFactory.getLogger(RabbitMQServiceImpl.class);

private RabbitMQMessage message;

@Resource
private RabbitTemplate rabbitTemplate;

public void send(String queueName) {
rabbitTemplate.convertAndSend(queueName,message);
}

public void setRabbitMQMessageType(CMDTYPE cmdtype, OPTYPE optype, Object content, String username) {
message = new RabbitMQMessage(cmdtype,optype,content,username);
logger.info("RabbitMQMessage Info{}",message.toString());
}
}

1
amqpTemplate.convertAndSend(queueKey, object);

说明:

convertAndSend:将Java对象转换为消息发送到匹配Key的交换机中Exchange,由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.

2.3 RabbitMQService接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

public interface RabbitMQService {
/**
* 发送消息到RabbitMQ Server
* 发送之前需要定义RabbitMQMessage的类型
* @param queueName 发送队列名字
*/

void send(String queueName);

/**
* 根据不同的参数类型设置RabbitMQMessage类型
* @param cmdtype
* @param optype
* @param content
* @param username
* @return
*/

void setRabbitMQMessageType(CMDTYPE cmdtype, OPTYPE optype, Object content,String username);
}

2.4 使用Spring容器注入RabbitMQService实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Controller
@RequestMapping("/Login")
public class UserController {

private static Logger logger = LoggerFactory.getLogger(UserController.class);

//当前控制器使用的队列名称
private static final String QUEUE_NAME = "task_queue";

@Autowired
@Qualifier("userService")
private UserService userService; //Service接口

@Autowired
@Qualifier("rabbitMQService")
private RabbitMQService rabbitMQService; //RabbitMQService接口

/**
* 跳转到登陆页面
* @return 跳转到AdminLogin文件夹下的login.jsp
*/

@RequestMapping(value = "/index")
public String loginIndex() {
return "AdminLogin/login";
}
/**
* 登陆功能
* @param user 接受前端页面传来的参数,自动匹配User类
* @param model 返回的Model,使用RedirectAttribute,可使重定向后的Url不带参数
* @return
*/

@RequestMapping(value = "/signUp",method = RequestMethod.POST)
public String signUp(@ModelAttribute User user,RedirectAttributes model){

try{
if(userService.addNewUser(user)){//user已增加加密后的密码,盐和UUID
user.setSalt("");//在发送过程中,将盐值置为null
rabbitMQService.setRabbitMQMessageType(CMDTYPE.CREATE, OPTYPE.USER,user,user.getUserName());
rabbitMQService.send(QUEUE_NAME);
model.addFlashAttribute("error","注册成功,请重新登陆!");
return "redirect:/Login/index";
}else{
model.addFlashAttribute("error","注册失败!");
return "redirect:/Login/index";
}
}catch (IllegalArgumentException e){
model.addFlashAttribute("error","注册失败!");
return "redirect:/Login/index";
}catch (Exception e){
model.addFlashAttribute("error","注册失败!");
return "redirect:/Login/index";
}

}

}
文章目錄
  1. 1. 一、RabbitMQ与Spring的配置
    1. 1.1. 1.1 RabbitMQ依赖的Jar包
    2. 1.2. 1.2 RaabbitMQ 配置文件
  2. 2. 二、实际使用
    1. 2.1. 2.1 消息实体转为Json
    2. 2.2. 2.2 JMSTemplate的使用
      1. 2.2.1. 2.2.1 关于失控的JMS代码
      2. 2.2.2. 2.2.2 关于AMQPTemplate的使用
        1. 2.2.2.1. 2.2.2.1 装配JMS模版
        2. 2.2.2.2. 2.2.2.2 向RabbitMQService层注入JMS模版
    3. 2.3. 2.3 RabbitMQService接口
    4. 2.4. 2.4 使用Spring容器注入RabbitMQService实现