package com.dhcc.finance.config; import com.dhcc.common.constant.SysConstants; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; /** * @功能描述:Direct模式 * @author chenrui * @date 2018年11月17日 下午2:36:48 * @修改日志: */ /** * 功能描述: * * @author haozg * @date 2019年7月10日 下午10:44:59 * @修改日志: */ @Configuration public class RabbitDirectConfig implements RabbitListenerConfigurer { /** * 功能描述:流水交换机队列信息处理 * * @return Binding * @author haozg * @date 2019年7月10日 下午10:39:34 * @修改日志: */ @Bean TopicExchange topicSeqLogTopicExchange() { // 定义一个名为fanoutExchange的fanout交换器 //可进行持久化存储,避免MQ宕机引起消息丢失 return new TopicExchange(SysConstants.MqConstans.EXCHANGE_BUSLOG_TOPIC, true, false); } /** * 功能描述:建立会计记账队列 * * @return Queue * @author haozg * @date 2019年7月9日 下午11:04:21 * @修改日志: */ @Bean public Queue topicSeqAccountLogQueue() { // new Queue(name, durable) //durable 为true时可以进行持久化 return new Queue(SysConstants.MqConstans.ROUTE_KEY_SEQ_ACCOUNT_LOG, true); } /** * 功能描述:建立总分发 * * @return Queue * @author haozg * @date 2019年7月9日 下午11:04:07 * @修改日志: */ @Bean public Queue topicSeqBenefitLogQueue() { //durable 为true时可以进行持久化 return new Queue(SysConstants.MqConstans.ROUTE_KEY_SEQ_LOG, true); } /** * 功能描述:建立总分发 * * @return Binding * @author haozg * @date 2019年7月10日 下午10:39:34 * @修改日志: */ @Bean public Binding bindingTopicSeqLogTopicExchangeWithTopicSeqLogQueue() { //将topicBenefitLogQueue队列绑定到topicBenefitTopicExchange交换机上,键值为:BENEFIT_LOG_QUEUE---benefit.log return BindingBuilder.bind(topicSeqBenefitLogQueue()).to(topicSeqLogTopicExchange()).with(SysConstants.MqConstans.ROUTE_KEY_SEQ_LOG); } /** * 功能描述:需要记账流水信息处理 * * @return Binding * @author haozg * @date 2019年3月5日 下午8:47:53 * @修改日志: */ @Bean public Binding bindingTopicSeqLogTopicExchangeWithTopicSeqAccountLogQueue() { //将topicBenefitLogQueue队列绑定到topicBenefitTopicExchange交换机上,键值为:BENEFIT_LOG_QUEUE---benefit.log return BindingBuilder.bind(topicSeqAccountLogQueue()).to(topicSeqLogTopicExchange()).with(SysConstants.MqConstans.ROUTE_KEY_SEQ_ACCOUNT_LOG); } @Bean @Scope("prototype") public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(producerJackson2MessageConverter()); rabbitTemplate.setMandatory(true);// true 消息发送失败回调 return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter producerJackson2MessageConverter() { return new Jackson2JsonMessageConverter(); } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean MessageHandlerMethodFactory messageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter()); return messageHandlerMethodFactory; } @Bean public MappingJackson2MessageConverter consumerJackson2MessageConverter() { return new MappingJackson2MessageConverter(); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(messageConverter); return factory; } }