1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.dhcc.finance.config;
import com.dhcc.common.constant.SysConstants;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
/**
* @功能描述:Direct模式
* @author chenrui
* @date 2018年11月17日 下午2:36:48
* @修改日志:
*/
/**
* 功能描述:
*
* @author haozg
* @date 2019年7月10日 下午10:44:59
* @修改日志:
*/
@Configuration
public class RabbitDirectConfig implements RabbitListenerConfigurer {
/**
* 功能描述:流水交换机队列信息处理
*
* @return Binding
* @author haozg
* @date 2019年7月10日 下午10:39:34
* @修改日志:
*/
@Bean
TopicExchange topicSeqLogTopicExchange() {
// 定义一个名为fanoutExchange的fanout交换器 //可进行持久化存储,避免MQ宕机引起消息丢失
return new TopicExchange(SysConstants.MqConstans.EXCHANGE_BUSLOG_TOPIC, true, false);
}
/**
* 功能描述:建立会计记账队列
*
* @return Queue
* @author haozg
* @date 2019年7月9日 下午11:04:21
* @修改日志:
*/
@Bean
public Queue topicSeqAccountLogQueue() {
// new Queue(name, durable) //durable 为true时可以进行持久化
return new Queue(SysConstants.MqConstans.ROUTE_KEY_SEQ_ACCOUNT_LOG, true);
}
/**
* 功能描述:建立总分发
*
* @return Queue
* @author haozg
* @date 2019年7月9日 下午11:04:07
* @修改日志:
*/
@Bean
public Queue topicSeqBenefitLogQueue() {
//durable 为true时可以进行持久化
return new Queue(SysConstants.MqConstans.ROUTE_KEY_SEQ_LOG, true);
}
/**
* 功能描述:建立总分发
*
* @return Binding
* @author haozg
* @date 2019年7月10日 下午10:39:34
* @修改日志:
*/
@Bean
public Binding bindingTopicSeqLogTopicExchangeWithTopicSeqLogQueue() {
//将topicBenefitLogQueue队列绑定到topicBenefitTopicExchange交换机上,键值为:BENEFIT_LOG_QUEUE---benefit.log
return BindingBuilder.bind(topicSeqBenefitLogQueue()).to(topicSeqLogTopicExchange()).with(SysConstants.MqConstans.ROUTE_KEY_SEQ_LOG);
}
/**
* 功能描述:需要记账流水信息处理
*
* @return Binding
* @author haozg
* @date 2019年3月5日 下午8:47:53
* @修改日志:
*/
@Bean
public Binding bindingTopicSeqLogTopicExchangeWithTopicSeqAccountLogQueue() {
//将topicBenefitLogQueue队列绑定到topicBenefitTopicExchange交换机上,键值为:BENEFIT_LOG_QUEUE---benefit.log
return BindingBuilder.bind(topicSeqAccountLogQueue()).to(topicSeqLogTopicExchange()).with(SysConstants.MqConstans.ROUTE_KEY_SEQ_ACCOUNT_LOG);
}
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
rabbitTemplate.setMandatory(true);// true 消息发送失败回调
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
@Bean
MessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
return messageHandlerMethodFactory;
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter);
return factory;
}
}