Spring与RabbitMQ的结合使用

摘要:一个平台的前台系统与后台多个系统间需要消息交互与处理,特别是异构系统,RabbitMQ能够很轻松的解决问题。消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC)。

一、RabbitMQ与Spring的配置

 项目说明:目前不需要接收从后台过来的信息,所以本地不配置RabbitMQ Server与 Erlang(虽然确实是存在环境,但未使用),以后肯定也是要上的。

1.1 RabbitMQ依赖的Jar包

1
2
3
4
5
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>

1.2 RaabbitMQ 配置文件

详见本博客(文档)

实际操作Spring+RabbitMQ遇到的问题以及解决方法

二、实际使用

2.1 消息实体转为Json

 我们使用Json来传输消息,必须要在spring-rabbitmq.xml 配置文件中配置如下:

1
2
<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<bean id="jsonMessageConverter" class="com.nist.targetsystem.rabbitmq.FastJsonMessageConverter"></bean>

 这样配置完成之后,经由RabbitMQ传送的实体类会先将其转为Json字节码的形式进行传送,这样前后台系统可以较为轻松的解析这些消息。

 这里我自己使用Fastjson封装了一个Json的解析类,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.nist.targetsystem.rabbitmq;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

import java.io.UnsupportedEncodingException;

/**
* Created by huangguoxin on 16/4/5.
*/

public class FastJsonMessageConverter extends AbstractMessageConverter{
private static Logger log = LoggerFactory.getLogger(FastJsonMessageConverter.class);
public static final String DEFAULT_CHARSET = "UTF-8";
private volatile String defaultCharset = DEFAULT_CHARSET;
public FastJsonMessageConverter() {

super();

//init();

}
public void setDefaultCharset(String defaultCharset) {

this.defaultCharset = (defaultCharset != null) ? defaultCharset

: DEFAULT_CHARSET;

}
public Object fromMessage(Message message)

throws MessageConversionException {


return null;

}
public <T> T fromMessage(Message message,T t) {
String json = "";
try {
json = new String(message.getBody(),defaultCharset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return (T) JSON.parseObject(json, t.getClass());
}
protected Message createMessage(Object objectToConvert,
MessageProperties messageProperties)

throws MessageConversionException {

byte[] bytes = null;
try {
String jsonString = JSON.toJSONString(objectToConvert);
bytes = jsonString.getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(this.defaultCharset);
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);
}
}

2.2 JMSTemplate的使用

2.2.1 关于失控的JMS代码

 以下的代码是不使用JMS模版而编写的消息队列发送代码,这是不可取的,如同传统的JDBC代码在处理链接,语句,结果集和异常时的冗长和繁杂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import javax.jms.*;  
import javax.naming.*;

public class TestJavaxJmsSendMain{

public void sendMessage() {
try{
//这是自定义的所需传输的对方,必须进行序列化
MessageObject mo = new MessageObject();
mo.setKey("testKey");
mo.setTitle("标题");
mo.setContent("内容");
mo.setDescription("描述测试");
System.out.println("Send Start");
InitialContext initCtx = new InitialContext();
Context envContext = (Context)initCtx.lookup("java:comp/env");
ConnectionFactory connectionFactory = (ConnectionFactory)envContext.lookup("jms/ConnectionFactory");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, 1);
MessageProducer producer = session.createProducer((Destination)envContext.lookup("jms/queue/MyQueue"));
// MessageProducer producer = session.createProducer((Destination)envContext.lookup("jms/queue/MyQueue"));

//ObjectMessage可以用来传输对象
//ObjectMessage的属性可以用来设置查询的条件
//ObjectMessage.setObject(Object)用来设置需要传输的对象
ObjectMessage testMessage = session.createObjectMessage();
testMessage.setJMSType("ObjectSendTest");
testMessage.setStringProperty("testKey", "testValue");
testMessage.setStringProperty("JMSXGroupID", "QS_10/3/11");
testMessage.setObject(mo);
//发送的两种方式
// producer.send(message);
// producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
producer.send(testMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 10000);
System.out.println("Send End");
} catch(NamingException e){
System.out.println("NamingException");
e.printStackTrace();
}catch(JMSException e){
System.out.println("JMSException");
e.printStackTrace();
}
}
}

2.2.2 关于AMQPTemplate的使用

2.2.2.1 装配JMS模版

1
2
<!-- spring template声明-->
<rabbit:template exchange="message" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>

2.2.2.2 向RabbitMQService层注入JMS模版

 使用JMS模版,我们每次使用JMS就不需要做重复工作。
 以下的实现类实现了设置消息和发送功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* RabbitMQService实现类
* Created by huangguoxin on 16/4/5.
*/

@Service("rabbitMQService")
public class RabbitMQServiceImpl implements RabbitMQService {

private static Logger logger = LoggerFactory.getLogger(RabbitMQServiceImpl.class);

private RabbitMQMessage message;

@Resource
private RabbitTemplate rabbitTemplate;

public void send(String queueName) {
rabbitTemplate.convertAndSend(queueName,message);
}

public void setRabbitMQMessageType(CMDTYPE cmdtype, OPTYPE optype, Object content, String username) {
message = new RabbitMQMessage(cmdtype,optype,content,username);
logger.info("RabbitMQMessage Info{}",message.toString());
}
}

1
amqpTemplate.convertAndSend(queueKey, object);

说明:

convertAndSend:将Java对象转换为消息发送到匹配Key的交换机中Exchange,由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.

2.3 RabbitMQService接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

public interface RabbitMQService {
/**
* 发送消息到RabbitMQ Server
* 发送之前需要定义RabbitMQMessage的类型
* @param queueName 发送队列名字
*/

void send(String queueName);

/**
* 根据不同的参数类型设置RabbitMQMessage类型
* @param cmdtype
* @param optype
* @param content
* @param username
* @return
*/

void setRabbitMQMessageType(CMDTYPE cmdtype, OPTYPE optype, Object content,String username);
}

2.4 使用Spring容器注入RabbitMQService实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Controller
@RequestMapping("/Login")
public class UserController {

private static Logger logger = LoggerFactory.getLogger(UserController.class);

//当前控制器使用的队列名称
private static final String QUEUE_NAME = "task_queue";

@Autowired
@Qualifier("userService")
private UserService userService; //Service接口

@Autowired
@Qualifier("rabbitMQService")
private RabbitMQService rabbitMQService; //RabbitMQService接口

/**
* 跳转到登陆页面
* @return 跳转到AdminLogin文件夹下的login.jsp
*/

@RequestMapping(value = "/index")
public String loginIndex() {
return "AdminLogin/login";
}
/**
* 登陆功能
* @param user 接受前端页面传来的参数,自动匹配User类
* @param model 返回的Model,使用RedirectAttribute,可使重定向后的Url不带参数
* @return
*/

@RequestMapping(value = "/signUp",method = RequestMethod.POST)
public String signUp(@ModelAttribute User user,RedirectAttributes model){

try{
if(userService.addNewUser(user)){//user已增加加密后的密码,盐和UUID
user.setSalt("");//在发送过程中,将盐值置为null
rabbitMQService.setRabbitMQMessageType(CMDTYPE.CREATE, OPTYPE.USER,user,user.getUserName());
rabbitMQService.send(QUEUE_NAME);
model.addFlashAttribute("error","注册成功,请重新登陆!");
return "redirect:/Login/index";
}else{
model.addFlashAttribute("error","注册失败!");
return "redirect:/Login/index";
}
}catch (IllegalArgumentException e){
model.addFlashAttribute("error","注册失败!");
return "redirect:/Login/index";
}catch (Exception e){
model.addFlashAttribute("error","注册失败!");
return "redirect:/Login/index";
}

}

}