package com.dhcc.finance.rabbitmq; import com.dhcc.common.constant.FinanceStatusCode; import com.dhcc.common.constant.SysConstants; import com.dhcc.common.exception.RestStatusException; import com.dhcc.common.model.CurrencyMeta; import com.dhcc.common.model.response.RestfulResponse; import com.dhcc.finance.constant.FinanceDictConsts; import com.dhcc.finance.main.dao.CwBdRecordBatchMapper; import com.dhcc.finance.main.dao.CwBdRecordSerialMapper; import com.dhcc.finance.main.domain.CwBdRecordBatch; import com.dhcc.finance.main.domain.CwBdRecordSerial; import com.dhcc.finance.main.model.VoucherRequestInfo; import com.dhcc.finance.main.service.FinanceForThirdService; import com.dhcc.finance.main.service.FinanceForThirdSupplyService; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Date; import java.util.concurrent.TimeUnit; /** * 功能描述:监听处理MQ里面的记账信息 * * @author dml * @date 2019年7月12日 下午7:22:25 * @修改日志: */ @Slf4j @Component @RabbitListener(queues = SysConstants.MqConstans.ROUTE_KEY_SEQ_ACCOUNT_LOG) public class SeqAccountLogServiceImpl { @Autowired FinanceForThirdService financeForThirdService;// 财务对外提供的服务 @Autowired CwBdRecordBatchMapper cwBdRecordBatchMapper;// 自动记录批次信息表 @Autowired CwBdRecordSerialMapper cwBdRecordSerialMapper; @Autowired FinanceForThirdSupplyService financeForThirdSupplyService; @Autowired private CuratorFramework curatorFramework; @Value("${curator.lockPath}") private String lockPath;// 锁路径 /** * 功能描述:消费消息获取记账信息并处理 * * @param voucherRequestInfo * @param channel * @param message void * @throws IOException * @author dml * @date 2019年7月12日 下午7:25:33 * @修改日志: */ @RabbitHandler public void recieved(VoucherRequestInfo voucherRequestInfo, Channel channel, Message message) throws IOException { RestfulResponse createVoucherInfo = new RestfulResponse(SysConstants.CODE.FAIL, FinanceStatusCode.CREATE_VOUCHER_INFO_ERROR.message()); try { log.info("进入监听>>>>>>>>>>>>" + voucherRequestInfo.toString()); //记录批次号 RestfulResponse res = financeForThirdSupplyService.insertRecordForBatch(voucherRequestInfo); if (SysConstants.CODE.SUCCESS != res.getCode()) {//若重复批次号记账成功,则不记账 return; } // InterProcessMutex 构建一个分布式锁 InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); try { if (lock.acquire(5, TimeUnit.MINUTES)) { // 调用记账服务 createVoucherInfo = financeForThirdService.createVoucherInfo(voucherRequestInfo); } } catch (Exception e) { throw e;// 如果发生异常将异常抛出 } finally { lock.release();// 释放该锁 log.info("释放锁成功>>>>>>>>>"); } log.info("消费返回结果>>>>>>>>>" + createVoucherInfo.getMsg()); } catch (Exception e) { if (e instanceof RestStatusException) { // 如果是自己抛出的异常,根据返回的异常编号,从财务状态枚举中获取提示信息 FinanceStatusCode enumByKey = FinanceStatusCode.getEnumByKey(e.getMessage()); if (enumByKey != null) { createVoucherInfo.setMsg(enumByKey.message()); } } log.error("消息消费异常>>>>>>>>>", e); } finally { //最终进行消息确认处理的确认,将消息从队列中进行移除,完成消费 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); CwBdRecordBatch cwBdRecordBatch1 = new CwBdRecordBatch();//批次号记录实体 cwBdRecordBatch1.setBatchNo(voucherRequestInfo.getBatchNo()); cwBdRecordBatch1.setTradecode(voucherRequestInfo.getTradeCode()); cwBdRecordBatch1.setUpdateTime(new Date()); if (SysConstants.CODE.SUCCESS != createVoucherInfo.getCode()) { cwBdRecordBatch1.setFailReason(createVoucherInfo.getMsg()); cwBdRecordBatch1.setSignType(FinanceDictConsts.SF.no); if (FinanceStatusCode.CREATE_VOUCHER_INFO_ERROR.message().equals(createVoucherInfo.getMsg())) { CwBdRecordSerial cwBdRecordSerial = new CwBdRecordSerial();//流水记录实体 cwBdRecordSerial.setBatchNo(voucherRequestInfo.getBatchNo()); cwBdRecordSerial.setTradecode(voucherRequestInfo.getTradeCode()); cwBdRecordSerial.setSignType(FinanceDictConsts.STATE.fail); cwBdRecordSerialMapper.updateRecordForSerialNo(cwBdRecordSerial); } } else { cwBdRecordBatch1.setSignType(FinanceDictConsts.SF.yes); if (StringUtils.isNotBlank((String) createVoucherInfo.getData()) && FinanceStatusCode.SCENARIO_NOT_CREATE_VOUCHER.message().equals((String) createVoucherInfo.getData())) { cwBdRecordBatch1.setFailReason((String) createVoucherInfo.getData()); } else { cwBdRecordBatch1.setFailReason(""); } } cwBdRecordBatchMapper.updateRecordBybatchNo(cwBdRecordBatch1);//更新批次号记录表 } } }