1.现在有个业务需求,车管后台和saas系统的数据要添加到钉钉后台。车管后台和saas系统任何的修改,加盟商、车队、班组的修改,都要同步到钉钉里面。

     刚开始是想着,在车管或者saas里面有修改的地方,直接做业务处理。但是问题是 ,只有业务处理成功才能做后续的操作,于是想到了使用aop的后置通知处理。另外由于是只有含有特定的方法才能进入到后置通知里面,需要再加上自定义注解。然后把要修改的业务放到mq里面进行生产,在起一个服务进行mq的消费。嗯 ,大体上是这样。

   1.自定义注解

    

package com.zhuanche.common.dingdingsync;

import java.lang.annotation.*;

/**
 * @Author fanht
 * @Description 含有该注解的controller方法存储到mq
 * @Date 2019/2/28 上午11:26
 * @Version 1.0
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD,ElementType.TYPE})
@Documented
public @interface DingdingAnno {
    String cityId() default "";  //城市id
    String supplierId() default ""; //供应商id
    String teamId() default ""; //teamId
    String method() default ""; //添加修改删除方法
    String level() default "";  //级别 0 城市 1 供应商 2 车队班组
}

 

2.定义切面

    

package com.zhuanche.common.dingdingsync;

import com.zhuanche.common.rocketmq.CommonRocketProducer;
import com.zhuanche.common.rocketmq.DingdingSupplierAndTeamProducer;
import com.zhuanche.controller.supplier.SupplierController;
import com.zhuanche.dto.CarDriverTeamDTO;
import com.zhuanche.entity.rentcar.CarBizSupplierVo;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author fanht
 * @Description
 * @Date 2019/2/28 上午11:59
 * @Version 1.0
 */
@Component
@Aspect
public class DingdingAspect {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Pointcut("execution(* com.zhuanche.controller.driverteam.DriverTeamController.*(..)) ||" +
            " execution(* com.zhuanche.controller.supplier.SupplierController.*(..)) ")
    public void pointCut(){
        logger.info("含有自定义注解dingdingAnno的方法...");
    }

    @Before("pointCut()  && @annotation(dingdingAnno) ")
    public void dingdingVerify(JoinPoint joinPoint,DingdingAnno dingdingAnno){
        logger.info(joinPoint.getSignature().getName() + ",入参:{" + Arrays.asList(joinPoint.getArgs() + "}"));
    }



    @AfterReturning("pointCut() && @annotation(dingdingAnno)")
    public void finish(JoinPoint jointPoint,DingdingAnno dingdingAnno){
        Signature signature = jointPoint.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        Method method = methodSignature.getMethod();
        if (method != null){
           dingdingAnno = method.getAnnotation(DingdingAnno.class);
           if (dingdingAnno != null && "2".equals(dingdingAnno.level())){
               Object[] args = jointPoint.getArgs();
               Map<String,Object> map = new HashMap<>();
               for(Object obj : args){
                   if(obj instanceof CarDriverTeamDTO){
                       CarDriverTeamDTO teamDTO = (CarDriverTeamDTO) obj;
                       String teamId = teamDTO.getId()==null?null:teamDTO.getId().toString();
                       String openFlag = teamDTO.getOpenCloseFlag() == null ? null: teamDTO.getOpenCloseFlag().toString();
                       map.put("city",teamDTO.getCity());
                       map.put("cityName",teamDTO.getCityName());
                       map.put("supplier",teamDTO.getSupplier());
                       map.put("teamName",teamDTO.getTeamName());
                       map.put("teamId",teamDTO.getId());
                       map.put("pId",teamDTO.getpId());
                       map.put("openCloseFlag",openFlag);
                       map.put("id",teamDTO.getId());
                       String tag = "";
                       if(!"insert".equals(dingdingAnno.method())){
                           if("0".equals(openFlag)){
                               tag = "update";
                           }else if("1".equals(openFlag)){
                               tag = "insert";
                           }else if("2".equals(openFlag)){
                               tag = "delete";
                           }else {
                               tag = "update";
                           }
                       }else {
                           tag = "insert";
                       }

                       DingdingSupplierAndTeamProducer.publishMessage("car_driver_team",tag,teamId,map);
                   }
                   }

           }if (dingdingAnno != null && "1".equals(dingdingAnno.level())){
                Object[] args = jointPoint.getArgs();
                Map<String,Object> map = new HashMap<>();
                for(Object obj : args){
                    if(obj instanceof CarBizSupplierVo){
                        CarBizSupplierVo supplierVo = (CarBizSupplierVo) obj;
                        map.put("cityId",supplierVo.getSupplierCity());
                        map.put("cityName",supplierVo.getSupplierCityName());
                        map.put("supplierId",supplierVo.getSupplierId());
                        map.put("supplierName",supplierVo.getSupplierFullName());
                        map.put("cooperationType",supplierVo.getCooperationType());
                        DingdingSupplierAndTeamProducer.publishMessage("car_driver_supplier",dingdingAnno.method(),supplierVo.getSupplierId().toString(),map);
                    }

                }
            }
        }
        System.out.println(jointPoint.getSignature().getName());
    }
}

 

3.mq 生产者

   

package com.zhuanche.common.rocketmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.common.RemotingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * @Author fanht
 * @Description
 * @Date 2019/3/14 上午11:54
 * @Version 1.0
 */
public class DingdingSupplierAndTeamProducer {

    private static final Logger logger = LoggerFactory.getLogger(DingdingSupplierAndTeamProducer.class);

    private static DefaultMQProducer producer; //静态的生产者

    private String namesrvAddr;//RocketMQ nameserverAddr
    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    /**初始化生产者**/
    public void init() throws MQClientException {
        DingdingSupplierAndTeamProducer.startProducer( this.namesrvAddr );
    }


    private synchronized static void startProducer(String namesrvAddr) throws MQClientException {
        producer = new DefaultMQProducer("mp-manage");
        producer.setInstanceName( RemotingUtil.getLocalAddress() +"@"+System.nanoTime()  );
        producer.setNamesrvAddr(namesrvAddr);
        producer.setRetryAnotherBrokerWhenNotStoreOK(true);//消息没有存储成功是否发送到另外一个broker
        producer.setRetryTimesWhenSendFailed(4); //定义重试次数,默认是2
        producer.setSendMsgTimeout(6000); //定义超时时间,默认是3000
        producer.start();
        logger.info(">>>>>>>>>>>通用的RocketMQ生产者初始化成功!");
    }
    /**销毁生产者**/
    public void destroy() {
        DingdingSupplierAndTeamProducer.stopProducer();
    }
    private synchronized static void stopProducer() {
        producer.shutdown();
        logger.info(">>>>>>>>>>>通用的RocketMQ生产者销毁成功!");
    }

    /**发送普通消息**/
    public static boolean publishMessage(String topic, String tags, String keys, Object message){
        if(topic==null) {
            return false;
        }
        if(tags==null) {
            tags = "default";
        }
        if(keys==null) {
            keys = "default";
        }
        if(message==null) {
            return false;
        }
        String msg = JSON.toJSONString(message);
        try{
            logger.info("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: " + msg );
            Message rocketMsg = new Message( topic, tags, keys,  msg.getBytes("UTF-8") );
            SendResult sendResult = producer.send(rocketMsg);

            if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) {
                logger.error("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send failed!" );
                return false;
            }
            logger.info("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send successful!" );
            return true;
        } catch (Exception e) {
            logger.error("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: 发送普通消息异常!",e );
            return false;
        }
    }

    /**
     * 发送顺序消息
     */
    public static boolean publishMessageOrderly(String topic, String tags, String keys, Object message){
        if(topic==null) {
            return false;
        }
        if(tags==null) {
            tags = "default";
        }
        if(keys==null) {
            keys = "default";
        }
        if(message==null) {
            return false;
        }
        String msg = JSON.toJSONString(message);
        try{
            logger.info("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: " + msg );
            Message rocketMsg = new Message( topic, tags, keys,  msg.getBytes("UTF-8") );
            SendResult sendResult = producer.send(rocketMsg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> messageQueues, Message msg, Object obj) {
                    if(obj==null) {
                        return messageQueues.get(0);
                    }
                    int hashCode = Math.abs(obj.hashCode());
                    int index = hashCode % messageQueues.size();
                    return messageQueues.get(index);
                }
            }, keys);

            if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) {
                logger.error("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send failed!" );
                return false;
            }
            logger.info("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send successful!" );
            return true;
        }catch (Exception e) {
            logger.error("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: 发送顺序消息异常!",e );
            return false;
        }
    }
}

 

4.rocketmq配置

  

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:cache="http://www.springframework.org/schema/cache"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd
">
<!-- 新的配置方式BEGIN ( 说明: 在JVM内只初始化一个全局的生产者,通过此生产者,可以发布任何主题的消息 )  -->

    <bean id="dingdingSupplierAndTeamProducer"  class="com.zhuanche.common.rocketmq.DingdingSupplierAndTeamProducer" init-method="init" destroy-method="destroy" scope="singleton">
        <property name="namesrvAddr" value="${rocketmq.dingdingNamesrvAddr}"/>
    </bean>
<!-- 新的配置方式END -->
</beans>

5.在特定的方法里面添加自定义注解,成功后使用后置通知

  

    @RequestMapping("/updateSupplier")
    @ResponseBody
    @RequestFunction(menu = SUPPLIER_UPDATE)
    @DingdingAnno(level = "1",method = "update")
    public AjaxResponse updateSupplier(CarBizSupplierVo supplier){
        return supplierService.saveSupplierInfo(supplier);
    }

 

 

6.rocketMq消费

package com.zhuanche.message.listener;

import com.alibaba.fastjson.JSONObject;
import com.zhuanche.message.common.DingdingConstants;
import com.zhuanche.message.dao.manage.CarDriverTeamDao;
import com.zhuanche.message.dao.manage.RelationSqycDingdingMapper;
import com.zhuanche.message.dingding.DingdingHelper;
import com.zhuanche.message.model.RelationSqycDingding;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;
import java.util.Map;

/**
 * @Author fanht
 * @Description
 * @Date 2019/3/2 下午3:24
 * @Version 1.0
 */
public class DingdingDriverTeamListener implements MessageListenerOrderly{

    private Logger logger = LoggerFactory.getLogger(this.getClass());


    private static final Integer ZIYING = 1;//ziying

    private static final Integer JIAMWNG = 2;//jiameng

    @Autowired
    private CarDriverTeamDao carDriverTeamDao;

    @Autowired
    private RelationSqycDingdingMapper relationSqycDingdingMapper;

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        try {
            for(MessageExt ext : msgs){
                logger.info("修改钉钉班组信息,msgId:" + ext.getMsgId());
                if(StringUtils.isEmpty(ext.getMsgId())){
                    logger.info("msgId is null");
                    continue;
                }

                String str = new String(ext.getBody());
                if(StringUtils.isNotEmpty(str)){
                    JSONObject jsonObject = JSONObject.parseObject(str);
                    String tag = ext.getTags();
                    //删除时候传的是id
                    String departId = jsonObject.get("teamId") == null ? null : jsonObject.get("teamId").toString();
                    String teamName = jsonObject.get("teamName") == null ? null : jsonObject.get("teamName").toString();

                    if(DingdingConstants.INSERT.equals(tag)){
                        try {
                            String pId = jsonObject.get("pId") == null ? null : jsonObject.get("pId").toString();
                            if(StringUtils.isEmpty(pId)){ //如果只是小队
                                String supplier = jsonObject.get("supplier") == null ? null : jsonObject.get("supplier").toString();
                                RelationSqycDingding fatherDingding = relationSqycDingdingMapper.getDingdingId(DingdingConstants.SUPPLIER,supplier,null);
                                if(fatherDingding != null && fatherDingding.getDingdingId() > 0) {
                                  int dingdingId =  DingdingHelper.createDepartment(teamName, fatherDingding.getDingdingId().toString(), null, false, false, true,
                                          this.orgType(fatherDingding.getCooperationType()));
                                  if(dingdingId > 0){
                                      RelationSqycDingding dingDui = new RelationSqycDingding();
                                      dingDui.setDingdingId(dingdingId);
                                      dingDui.setName(teamName);
                                      dingDui.setSourceId(departId);
                                      dingDui.setType(DingdingConstants.DUI);
                                      dingDui.setCooperationType(fatherDingding.getCooperationType());
                                      int code = relationSqycDingdingMapper.insertDingding(dingDui);
                                      if(code > 0){
                                          logger.info("添加小队到dingding表数据成功");
                                      }
                                  }
                                }
                            }else {//增加班组
                                RelationSqycDingding relationSqycDingding = relationSqycDingdingMapper.getDingdingId(DingdingConstants.DUI,pId,null);
                                if(relationSqycDingding !=null && relationSqycDingding.getDingdingId() > 0){
                                    int dingdingId =  DingdingHelper.createDepartment(teamName,relationSqycDingding.getDingdingId().toString(),null,false,false,true,orgType(relationSqycDingding.getCooperationType()));
                                    if(dingdingId > 0){
                                        RelationSqycDingding dingDui = new RelationSqycDingding();
                                        dingDui.setDingdingId(dingdingId);
                                        dingDui.setName(teamName);
                                        dingDui.setSourceId(departId);
                                        dingDui.setType(DingdingConstants.TEAM);
                                        dingDui.setCooperationType(relationSqycDingding.getCooperationType());
                                        int code = relationSqycDingdingMapper.insertDingding(dingDui);
                                        if(code > 0){
                                            logger.info("添加班组到dingding数据成功");
                                        }
                                    }
                                }
                            }
                        } catch (Exception e) {
                            logger.info("调用dingding接口异常" + e.getMessage());
                            continue;
                        }
                    }else if(DingdingConstants.UPDATE.equals(tag)){
                        //调用更新dingding的接口  部门员工只能看到自己,显示部门
                        try {
                            RelationSqycDingding relationSqycDingding =  relationSqycDingdingMapper.getDingdingId(DingdingConstants.DUI,departId,null);
                            if(relationSqycDingding == null){//修改时候传的teamId不清楚是班组还是车队的
                                relationSqycDingding = relationSqycDingdingMapper.getDingdingId(DingdingConstants.TEAM,departId,null);
                            }
                            String openCloseFlag = jsonObject.get("openCloseFlag") == null ? null : jsonObject.get("openCloseFlag").toString();
                            if(relationSqycDingding != null && relationSqycDingding.getDingdingId() > 0 ){
                                if(StringUtils.isNotEmpty(openCloseFlag) &&  "2".equals(openCloseFlag) ){
                                    DingdingHelper.deleteDepartment(relationSqycDingding.getDingdingId().toString(),orgType(relationSqycDingding.getCooperationType()));
                                }else {
                                    DingdingHelper.updateDepartment(teamName,relationSqycDingding.getDingdingId().toString(),null,false,true,orgType(relationSqycDingding.getCooperationType()));
                                }
                            }

                        } catch (Exception e) {
                            logger.info("调用dingding接口异常" + e.getMessage());
                            continue;
                        }
                    }else if(DingdingConstants.DELETE.equals(tag)){
                        try {
                            String teamId = jsonObject.get("teamId") == null ? null : jsonObject.get("teamId").toString();
                            RelationSqycDingding fatherDingding = relationSqycDingdingMapper.getDingdingByBanZu(teamId,null);
                            if(fatherDingding != null && fatherDingding.getDingdingId() > 0) {
                                //获取钉钉下面的子部门列表
                                String subList = DingdingHelper.listIds(fatherDingding.getDingdingId(),orgType(fatherDingding.getCooperationType()));
                                if(StringUtils.isNotEmpty(subList) && subList.length()>2){
                                    String[] sub = subList.substring(1,subList.length()-1).split(",");
                                    for(String dep : sub){
                                        DingdingHelper.deleteDepartment(dep,orgType(fatherDingding.getCooperationType()));
                                        relationSqycDingdingMapper.deleteByDingDingId(dep);
                                    }
                                }



                               Integer code = DingdingHelper.deleteDepartment(fatherDingding.getDingdingId().toString(),orgType(fatherDingding.getCooperationType()));
                                if(code == null || code != 0){
                                    logger.info("失败次数:" + ext.getReconsumeTimes());
                                    if(ext.getReconsumeTimes() == 10){
                                        return ConsumeOrderlyStatus.SUCCESS;
                                    }
                                    return null;
                                }
                                relationSqycDingdingMapper.deleteByTeamId(departId);
                            }
                        } catch (Exception e) {
                            logger.info("调用dingding接口异常" + e.getMessage());
                            continue;
                        }
                    }
                }

            }
        } catch (NumberFormatException e) {
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }

    private int orgType(Integer cooperationType){
        Integer  orgType = 1;
        if("5".equals(cooperationType.toString()) || "18".equals(cooperationType.toString()) ||
                "20".equals(cooperationType.toString())){
            orgType = ZIYING;
        }else {
            orgType = JIAMWNG;
        }
        return orgType;
    }
}

 

7.rocketmq配置 

 

 <bean id="dingdingTeamConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start"
          destroy-method="shutdown">
        <property name="consumerGroup" value="${mq.dingdingteam.consumergroup}"/>
        <property name="namesrvAddr" value="${mq.dingdingteam.nameservAdd}"/>
        <property name="instanceName" value="dingdingTeamMessage"/>
        <property name="messageModel" value="CLUSTERING"/>
        <property name="consumeFromWhere" value="CONSUME_FROM_LAST_OFFSET"/>
        <property name="vipChannelEnabled" value="false"/>
        <property name="subscription">
            <map>
                <entry key="car_driver_team" value="*"/>
            </map>
        </property>
        <property name="messageListener" ref="dingdingTeamListener"/>
    </bean>

    <bean id="dingdingTeamListener" class="com.zhuanche.message.listener.DingdingDriverTeamListener"/>



    <bean id="dingdingSupplierConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start"
          destroy-method="shutdown">
        <property name="consumerGroup" value="${mq.dingdingSupplier.consumergroup}"/>
        <property name="namesrvAddr" value="${mq.dingdingSupplier.nameservAdd}"/>
        <property name="instanceName" value="dingdingSupplierMessage"/>
        <property name="messageModel" value="CLUSTERING"/>
        <property name="consumeFromWhere" value="CONSUME_FROM_LAST_OFFSET"/>
        <property name="vipChannelEnabled" value="false"/>
        <property name="subscription">
            <map>
                <entry key="car_driver_supplier" value="*"/>
            </map>
        </property>
        <property name="messageListener" ref="dingdingSupplierListener"/>
    </bean>

    <bean id="dingdingSupplierListener" class="com.zhuanche.message.listener.DingdingSupplierListener"/>

 

 

 

遇到的问题:mq无法消费,原因 comsumerGroup 配置过相同的。另外instanceName 每台机器上面只能有一个。

版权声明:本文为thinkingandworkinghard原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/thinkingandworkinghard/p/10621263.html