RabbitDirectConfig.java 5.3 KB
package com.dhcc.finance.config;

import com.dhcc.common.constant.SysConstants;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;

/**
 * @功能描述:Direct模式
 * @author chenrui
 * @date 2018年11月17日 下午2:36:48
 * @修改日志:
 */

/**
 * 功能描述:
 *
 * @author haozg
 * @date 2019年7月10日 下午10:44:59
 * @修改日志:
 */
@Configuration
public class RabbitDirectConfig implements RabbitListenerConfigurer {

    /**
     * 功能描述:流水交换机队列信息处理
     *
     * @return Binding
     * @author haozg
     * @date 2019年7月10日 下午10:39:34
     * @修改日志:
     */
    @Bean
    TopicExchange topicSeqLogTopicExchange() {
        // 定义一个名为fanoutExchange的fanout交换器 //可进行持久化存储,避免MQ宕机引起消息丢失
        return new TopicExchange(SysConstants.MqConstans.EXCHANGE_BUSLOG_TOPIC, true, false);
    }

    /**
     * 功能描述:建立会计记账队列
     *
     * @return Queue
     * @author haozg
     * @date 2019年7月9日 下午11:04:21
     * @修改日志:
     */
    @Bean
    public Queue topicSeqAccountLogQueue() {
//        new Queue(name, durable)  //durable 为true时可以进行持久化
        return new Queue(SysConstants.MqConstans.ROUTE_KEY_SEQ_ACCOUNT_LOG, true);
    }

    /**
     * 功能描述:建立总分发
     *
     * @return Queue
     * @author haozg
     * @date 2019年7月9日 下午11:04:07
     * @修改日志:
     */
    @Bean
    public Queue topicSeqBenefitLogQueue() {
        //durable 为true时可以进行持久化
        return new Queue(SysConstants.MqConstans.ROUTE_KEY_SEQ_LOG, true);
    }

    /**
     * 功能描述:建立总分发
     *
     * @return Binding
     * @author haozg
     * @date 2019年7月10日 下午10:39:34
     * @修改日志:
     */
    @Bean
    public Binding bindingTopicSeqLogTopicExchangeWithTopicSeqLogQueue() {
        //将topicBenefitLogQueue队列绑定到topicBenefitTopicExchange交换机上,键值为:BENEFIT_LOG_QUEUE---benefit.log
        return BindingBuilder.bind(topicSeqBenefitLogQueue()).to(topicSeqLogTopicExchange()).with(SysConstants.MqConstans.ROUTE_KEY_SEQ_LOG);
    }

    /**
     * 功能描述:需要记账流水信息处理
     *
     * @return Binding
     * @author haozg
     * @date 2019年3月5日 下午8:47:53
     * @修改日志:
     */
    @Bean
    public Binding bindingTopicSeqLogTopicExchangeWithTopicSeqAccountLogQueue() {
        //将topicBenefitLogQueue队列绑定到topicBenefitTopicExchange交换机上,键值为:BENEFIT_LOG_QUEUE---benefit.log
        return BindingBuilder.bind(topicSeqAccountLogQueue()).to(topicSeqLogTopicExchange()).with(SysConstants.MqConstans.ROUTE_KEY_SEQ_ACCOUNT_LOG);
    }

    @Bean
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        rabbitTemplate.setMandatory(true);// true 消息发送失败回调
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }


    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

    @Bean
    MessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
        messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
        return messageHandlerMethodFactory;
    }

    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter);
        return factory;
    }
}