记录一次含有特定注解的方法后面添加指定业务的过程
1.现在有个业务需求,车管后台和saas系统的数据要添加到钉钉后台。车管后台和saas系统任何的修改,加盟商、车队、班组的修改,都要同步到钉钉里面。
刚开始是想着,在车管或者saas里面有修改的地方,直接做业务处理。但是问题是 ,只有业务处理成功才能做后续的操作,于是想到了使用aop的后置通知处理。另外由于是只有含有特定的方法才能进入到后置通知里面,需要再加上自定义注解。然后把要修改的业务放到mq里面进行生产,在起一个服务进行mq的消费。嗯 ,大体上是这样。
1.自定义注解
package com.zhuanche.common.dingdingsync; import java.lang.annotation.*; /** * @Author fanht * @Description 含有该注解的controller方法存储到mq * @Date 2019/2/28 上午11:26 * @Version 1.0 */ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD,ElementType.TYPE}) @Documented public @interface DingdingAnno { String cityId() default ""; //城市id String supplierId() default ""; //供应商id String teamId() default ""; //teamId String method() default ""; //添加修改删除方法 String level() default ""; //级别 0 城市 1 供应商 2 车队班组 }
2.定义切面
package com.zhuanche.common.dingdingsync; import com.zhuanche.common.rocketmq.CommonRocketProducer; import com.zhuanche.common.rocketmq.DingdingSupplierAndTeamProducer; import com.zhuanche.controller.supplier.SupplierController; import com.zhuanche.dto.CarDriverTeamDTO; import com.zhuanche.entity.rentcar.CarBizSupplierVo; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.Signature; import org.aspectj.lang.annotation.*; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.Arrays; import java.util.HashMap; import java.util.Map; /** * @Author fanht * @Description * @Date 2019/2/28 上午11:59 * @Version 1.0 */ @Component @Aspect public class DingdingAspect { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Pointcut("execution(* com.zhuanche.controller.driverteam.DriverTeamController.*(..)) ||" + " execution(* com.zhuanche.controller.supplier.SupplierController.*(..)) ") public void pointCut(){ logger.info("含有自定义注解dingdingAnno的方法..."); } @Before("pointCut() && @annotation(dingdingAnno) ") public void dingdingVerify(JoinPoint joinPoint,DingdingAnno dingdingAnno){ logger.info(joinPoint.getSignature().getName() + ",入参:{" + Arrays.asList(joinPoint.getArgs() + "}")); } @AfterReturning("pointCut() && @annotation(dingdingAnno)") public void finish(JoinPoint jointPoint,DingdingAnno dingdingAnno){ Signature signature = jointPoint.getSignature(); MethodSignature methodSignature = (MethodSignature) signature; Method method = methodSignature.getMethod(); if (method != null){ dingdingAnno = method.getAnnotation(DingdingAnno.class); if (dingdingAnno != null && "2".equals(dingdingAnno.level())){ Object[] args = jointPoint.getArgs(); Map<String,Object> map = new HashMap<>(); for(Object obj : args){ if(obj instanceof CarDriverTeamDTO){ CarDriverTeamDTO teamDTO = (CarDriverTeamDTO) obj; String teamId = teamDTO.getId()==null?null:teamDTO.getId().toString(); String openFlag = teamDTO.getOpenCloseFlag() == null ? null: teamDTO.getOpenCloseFlag().toString(); map.put("city",teamDTO.getCity()); map.put("cityName",teamDTO.getCityName()); map.put("supplier",teamDTO.getSupplier()); map.put("teamName",teamDTO.getTeamName()); map.put("teamId",teamDTO.getId()); map.put("pId",teamDTO.getpId()); map.put("openCloseFlag",openFlag); map.put("id",teamDTO.getId()); String tag = ""; if(!"insert".equals(dingdingAnno.method())){ if("0".equals(openFlag)){ tag = "update"; }else if("1".equals(openFlag)){ tag = "insert"; }else if("2".equals(openFlag)){ tag = "delete"; }else { tag = "update"; } }else { tag = "insert"; } DingdingSupplierAndTeamProducer.publishMessage("car_driver_team",tag,teamId,map); } } }if (dingdingAnno != null && "1".equals(dingdingAnno.level())){ Object[] args = jointPoint.getArgs(); Map<String,Object> map = new HashMap<>(); for(Object obj : args){ if(obj instanceof CarBizSupplierVo){ CarBizSupplierVo supplierVo = (CarBizSupplierVo) obj; map.put("cityId",supplierVo.getSupplierCity()); map.put("cityName",supplierVo.getSupplierCityName()); map.put("supplierId",supplierVo.getSupplierId()); map.put("supplierName",supplierVo.getSupplierFullName()); map.put("cooperationType",supplierVo.getCooperationType()); DingdingSupplierAndTeamProducer.publishMessage("car_driver_supplier",dingdingAnno.method(),supplierVo.getSupplierId().toString(),map); } } } } System.out.println(jointPoint.getSignature().getName()); } }
3.mq 生产者
package com.zhuanche.common.rocketmq; import com.alibaba.fastjson.JSON; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.SendStatus; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.common.RemotingUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; /** * @Author fanht * @Description * @Date 2019/3/14 上午11:54 * @Version 1.0 */ public class DingdingSupplierAndTeamProducer { private static final Logger logger = LoggerFactory.getLogger(DingdingSupplierAndTeamProducer.class); private static DefaultMQProducer producer; //静态的生产者 private String namesrvAddr;//RocketMQ nameserverAddr public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } /**初始化生产者**/ public void init() throws MQClientException { DingdingSupplierAndTeamProducer.startProducer( this.namesrvAddr ); } private synchronized static void startProducer(String namesrvAddr) throws MQClientException { producer = new DefaultMQProducer("mp-manage"); producer.setInstanceName( RemotingUtil.getLocalAddress() +"@"+System.nanoTime() ); producer.setNamesrvAddr(namesrvAddr); producer.setRetryAnotherBrokerWhenNotStoreOK(true);//消息没有存储成功是否发送到另外一个broker producer.setRetryTimesWhenSendFailed(4); //定义重试次数,默认是2 producer.setSendMsgTimeout(6000); //定义超时时间,默认是3000 producer.start(); logger.info(">>>>>>>>>>>通用的RocketMQ生产者初始化成功!"); } /**销毁生产者**/ public void destroy() { DingdingSupplierAndTeamProducer.stopProducer(); } private synchronized static void stopProducer() { producer.shutdown(); logger.info(">>>>>>>>>>>通用的RocketMQ生产者销毁成功!"); } /**发送普通消息**/ public static boolean publishMessage(String topic, String tags, String keys, Object message){ if(topic==null) { return false; } if(tags==null) { tags = "default"; } if(keys==null) { keys = "default"; } if(message==null) { return false; } String msg = JSON.toJSONString(message); try{ logger.info("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: " + msg ); Message rocketMsg = new Message( topic, tags, keys, msg.getBytes("UTF-8") ); SendResult sendResult = producer.send(rocketMsg); if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) { logger.error("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send failed!" ); return false; } logger.info("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send successful!" ); return true; } catch (Exception e) { logger.error("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: 发送普通消息异常!",e ); return false; } } /** * 发送顺序消息 */ public static boolean publishMessageOrderly(String topic, String tags, String keys, Object message){ if(topic==null) { return false; } if(tags==null) { tags = "default"; } if(keys==null) { keys = "default"; } if(message==null) { return false; } String msg = JSON.toJSONString(message); try{ logger.info("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: " + msg ); Message rocketMsg = new Message( topic, tags, keys, msg.getBytes("UTF-8") ); SendResult sendResult = producer.send(rocketMsg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> messageQueues, Message msg, Object obj) { if(obj==null) { return messageQueues.get(0); } int hashCode = Math.abs(obj.hashCode()); int index = hashCode % messageQueues.size(); return messageQueues.get(index); } }, keys); if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) { logger.error("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send failed!" ); return false; } logger.info("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send successful!" ); return true; }catch (Exception e) { logger.error("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: 发送顺序消息异常!",e ); return false; } } }
4.rocketmq配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:cache="http://www.springframework.org/schema/cache" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd "> <!-- 新的配置方式BEGIN ( 说明: 在JVM内只初始化一个全局的生产者,通过此生产者,可以发布任何主题的消息 ) --> <bean id="dingdingSupplierAndTeamProducer" class="com.zhuanche.common.rocketmq.DingdingSupplierAndTeamProducer" init-method="init" destroy-method="destroy" scope="singleton"> <property name="namesrvAddr" value="${rocketmq.dingdingNamesrvAddr}"/> </bean> <!-- 新的配置方式END --> </beans>
5.在特定的方法里面添加自定义注解,成功后使用后置通知
@RequestMapping("/updateSupplier") @ResponseBody @RequestFunction(menu = SUPPLIER_UPDATE) @DingdingAnno(level = "1",method = "update") public AjaxResponse updateSupplier(CarBizSupplierVo supplier){ return supplierService.saveSupplierInfo(supplier); }
6.rocketMq消费
package com.zhuanche.message.listener; import com.alibaba.fastjson.JSONObject; import com.zhuanche.message.common.DingdingConstants; import com.zhuanche.message.dao.manage.CarDriverTeamDao; import com.zhuanche.message.dao.manage.RelationSqycDingdingMapper; import com.zhuanche.message.dingding.DingdingHelper; import com.zhuanche.message.model.RelationSqycDingding; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; import java.util.Map; /** * @Author fanht * @Description * @Date 2019/3/2 下午3:24 * @Version 1.0 */ public class DingdingDriverTeamListener implements MessageListenerOrderly{ private Logger logger = LoggerFactory.getLogger(this.getClass()); private static final Integer ZIYING = 1;//ziying private static final Integer JIAMWNG = 2;//jiameng @Autowired private CarDriverTeamDao carDriverTeamDao; @Autowired private RelationSqycDingdingMapper relationSqycDingdingMapper; @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { try { for(MessageExt ext : msgs){ logger.info("修改钉钉班组信息,msgId:" + ext.getMsgId()); if(StringUtils.isEmpty(ext.getMsgId())){ logger.info("msgId is null"); continue; } String str = new String(ext.getBody()); if(StringUtils.isNotEmpty(str)){ JSONObject jsonObject = JSONObject.parseObject(str); String tag = ext.getTags(); //删除时候传的是id String departId = jsonObject.get("teamId") == null ? null : jsonObject.get("teamId").toString(); String teamName = jsonObject.get("teamName") == null ? null : jsonObject.get("teamName").toString(); if(DingdingConstants.INSERT.equals(tag)){ try { String pId = jsonObject.get("pId") == null ? null : jsonObject.get("pId").toString(); if(StringUtils.isEmpty(pId)){ //如果只是小队 String supplier = jsonObject.get("supplier") == null ? null : jsonObject.get("supplier").toString(); RelationSqycDingding fatherDingding = relationSqycDingdingMapper.getDingdingId(DingdingConstants.SUPPLIER,supplier,null); if(fatherDingding != null && fatherDingding.getDingdingId() > 0) { int dingdingId = DingdingHelper.createDepartment(teamName, fatherDingding.getDingdingId().toString(), null, false, false, true, this.orgType(fatherDingding.getCooperationType())); if(dingdingId > 0){ RelationSqycDingding dingDui = new RelationSqycDingding(); dingDui.setDingdingId(dingdingId); dingDui.setName(teamName); dingDui.setSourceId(departId); dingDui.setType(DingdingConstants.DUI); dingDui.setCooperationType(fatherDingding.getCooperationType()); int code = relationSqycDingdingMapper.insertDingding(dingDui); if(code > 0){ logger.info("添加小队到dingding表数据成功"); } } } }else {//增加班组 RelationSqycDingding relationSqycDingding = relationSqycDingdingMapper.getDingdingId(DingdingConstants.DUI,pId,null); if(relationSqycDingding !=null && relationSqycDingding.getDingdingId() > 0){ int dingdingId = DingdingHelper.createDepartment(teamName,relationSqycDingding.getDingdingId().toString(),null,false,false,true,orgType(relationSqycDingding.getCooperationType())); if(dingdingId > 0){ RelationSqycDingding dingDui = new RelationSqycDingding(); dingDui.setDingdingId(dingdingId); dingDui.setName(teamName); dingDui.setSourceId(departId); dingDui.setType(DingdingConstants.TEAM); dingDui.setCooperationType(relationSqycDingding.getCooperationType()); int code = relationSqycDingdingMapper.insertDingding(dingDui); if(code > 0){ logger.info("添加班组到dingding数据成功"); } } } } } catch (Exception e) { logger.info("调用dingding接口异常" + e.getMessage()); continue; } }else if(DingdingConstants.UPDATE.equals(tag)){ //调用更新dingding的接口 部门员工只能看到自己,显示部门 try { RelationSqycDingding relationSqycDingding = relationSqycDingdingMapper.getDingdingId(DingdingConstants.DUI,departId,null); if(relationSqycDingding == null){//修改时候传的teamId不清楚是班组还是车队的 relationSqycDingding = relationSqycDingdingMapper.getDingdingId(DingdingConstants.TEAM,departId,null); } String openCloseFlag = jsonObject.get("openCloseFlag") == null ? null : jsonObject.get("openCloseFlag").toString(); if(relationSqycDingding != null && relationSqycDingding.getDingdingId() > 0 ){ if(StringUtils.isNotEmpty(openCloseFlag) && "2".equals(openCloseFlag) ){ DingdingHelper.deleteDepartment(relationSqycDingding.getDingdingId().toString(),orgType(relationSqycDingding.getCooperationType())); }else { DingdingHelper.updateDepartment(teamName,relationSqycDingding.getDingdingId().toString(),null,false,true,orgType(relationSqycDingding.getCooperationType())); } } } catch (Exception e) { logger.info("调用dingding接口异常" + e.getMessage()); continue; } }else if(DingdingConstants.DELETE.equals(tag)){ try { String teamId = jsonObject.get("teamId") == null ? null : jsonObject.get("teamId").toString(); RelationSqycDingding fatherDingding = relationSqycDingdingMapper.getDingdingByBanZu(teamId,null); if(fatherDingding != null && fatherDingding.getDingdingId() > 0) { //获取钉钉下面的子部门列表 String subList = DingdingHelper.listIds(fatherDingding.getDingdingId(),orgType(fatherDingding.getCooperationType())); if(StringUtils.isNotEmpty(subList) && subList.length()>2){ String[] sub = subList.substring(1,subList.length()-1).split(","); for(String dep : sub){ DingdingHelper.deleteDepartment(dep,orgType(fatherDingding.getCooperationType())); relationSqycDingdingMapper.deleteByDingDingId(dep); } } Integer code = DingdingHelper.deleteDepartment(fatherDingding.getDingdingId().toString(),orgType(fatherDingding.getCooperationType())); if(code == null || code != 0){ logger.info("失败次数:" + ext.getReconsumeTimes()); if(ext.getReconsumeTimes() == 10){ return ConsumeOrderlyStatus.SUCCESS; } return null; } relationSqycDingdingMapper.deleteByTeamId(departId); } } catch (Exception e) { logger.info("调用dingding接口异常" + e.getMessage()); continue; } } } } } catch (NumberFormatException e) { return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } private int orgType(Integer cooperationType){ Integer orgType = 1; if("5".equals(cooperationType.toString()) || "18".equals(cooperationType.toString()) || "20".equals(cooperationType.toString())){ orgType = ZIYING; }else { orgType = JIAMWNG; } return orgType; } }
7.rocketmq配置
<bean id="dingdingTeamConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown"> <property name="consumerGroup" value="${mq.dingdingteam.consumergroup}"/> <property name="namesrvAddr" value="${mq.dingdingteam.nameservAdd}"/> <property name="instanceName" value="dingdingTeamMessage"/> <property name="messageModel" value="CLUSTERING"/> <property name="consumeFromWhere" value="CONSUME_FROM_LAST_OFFSET"/> <property name="vipChannelEnabled" value="false"/> <property name="subscription"> <map> <entry key="car_driver_team" value="*"/> </map> </property> <property name="messageListener" ref="dingdingTeamListener"/> </bean> <bean id="dingdingTeamListener" class="com.zhuanche.message.listener.DingdingDriverTeamListener"/> <bean id="dingdingSupplierConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown"> <property name="consumerGroup" value="${mq.dingdingSupplier.consumergroup}"/> <property name="namesrvAddr" value="${mq.dingdingSupplier.nameservAdd}"/> <property name="instanceName" value="dingdingSupplierMessage"/> <property name="messageModel" value="CLUSTERING"/> <property name="consumeFromWhere" value="CONSUME_FROM_LAST_OFFSET"/> <property name="vipChannelEnabled" value="false"/> <property name="subscription"> <map> <entry key="car_driver_supplier" value="*"/> </map> </property> <property name="messageListener" ref="dingdingSupplierListener"/> </bean> <bean id="dingdingSupplierListener" class="com.zhuanche.message.listener.DingdingSupplierListener"/>
遇到的问题:mq无法消费,原因 comsumerGroup 配置过相同的。另外instanceName 每台机器上面只能有一个。