你听说过任督二脉吗?像这样~

咳咳~今天不讲武功,讲电商平台设计的功夫~

背景

当今的电商可不仅仅是B2C商城,接下来还会有O2O,往后可能还会有商超、奥莱、二手交易。。。且称之为业务模式~而每个业务模式下还会有预售、竞拍、拼团等不同组合的子模式。

可是我商城的商品列表页不想展示O2O的商品啊,商品列表的数据希望按一定规则相互隔离。其他模块,有的出于操作习惯的考虑不隔离,有的出于用户行为的考虑需要隔离。

各模块数据隔离需求如下

 

列表页

商详页

商品组

优惠券

活动

订单

原商城

隔离

隔离

隔离

暂时不隔离

暂时不隔离

隔离

 

O2O

隔离

隔离

隔离

暂时不隔离

暂时不隔离

隔离

 

各模块流程差异

 

新建商品

列表页

购物车

订单

原商城

店铺创建,门店设置库存

基于item建es文档

跨门店

状态流转走快递

 

O2O

门店创建(沿用原模型但弱化店铺的概念)

基于item建es文档

单个门店

状态流转走配送

 

于是我们就会面临不同的改造的场景。

场景1,新建商品就是新建商品啊!!!

例如商品的新建保存,是基础服务,已经具备通用存储模型。为了支持新模式我还得改服务接口、发布二方包?咱可不可以这样?

商品服务

Integer bizMode = BizModeContext.getBizMode();
itemDO.setBizMode(bizMode);
// ...
itemDAO.save(itemDO);

 

场景2,下单就是下单啊!!!

例如创建订单,虽然商品维度、订单类型、优惠方式有很多,但我修改一下B2C下单的字段计算,还要引发O2O模式的回归测试?咱可不可以这样?

甚至这样~

实现类路由

@BizModeService( bizMode=BizMode.B2C, srvClz=OrderTradeService.class )
public class MallOrderTradeServiceImpl extends AbstractOrderTradeService { }

//使用时
Integer bizMode = BizModeContext.getBizMode();
OrderTradeService srv = BizModeRouter.routeBean(bizMode, OrderTradeService.class);

眼尖的小哥哥可能已经发现,要是能再搭配个热加载的bean容器,都可以做成插件了!emmm…那是远景~

 

如何打通任督二脉?

首先要舌尖抵住上颚,再来三个深呼吸~然后拿起一本《Thinking In Java》或《Core Java》假装在修炼。。。等等。。。什么是任督二脉?

Java老司机都知道,我们通常会把ApplicationContext比作Spring的任督二脉,它贯穿始终,管理着bean的生命周期和传递。

所以电商平台的任督二脉就是BizModeContext啦!它的经脉图大概长这样~

文章出处

 

所以我们通过下面一二三四,入口处打标、dubbo服务间传递、RocketMQ传递、本机线程池内传递,一步一步打通整个标的透传。

步骤1-打标

aop按包路径切面+注解覆盖,满足你不同的定制需求~于是,在用户点击页面操作的那一刻,每个接口都被打上了“模式标”。

注解打标

@Configuration
public class ControllerConfig {
        @Aspect
        @Component
        public static class CxcAdvice implements BizModeControllerAspect {
                 @Override
                 public Integer getBizMode() {
                         return 300;
                 }
                 @Override
                 @Pointcut("execution(* com.mall.web.controller..*(..))")
                 public void pointcut() {
                 }
        }
}
 
@Slf4j
@RestController
@MarkBizMode(bizMode = 200)
public class AdminOldController2 {
        @RequestMapping("/admin_anno_byclass")
        public String annoByClass() {
                 log.info("annoByClass got bizmode: " + BizModeContext.getBizMode());
                 return "this is " + this.getClass().toString();
        }
        @RequestMapping("/admin_anno_bymethod")
        @MarkBizMode(bizMode = 100)
        public String annoByMethod() {
                 log.info("annoByMethod got bizmode: " + BizModeContext.getBizMode());
                 return "this is " + this.getClass().toString();
        }
}

 

步骤2-dubbo服务传递

借助dubbo自带的Filter和RpcContext可以轻松实现。那是因为dubbo的设计中已经充分考虑了。

Filter的使用

filter定义

@Activate(group = Constants.CONSUMER)
public class BizModeDubboConsumerFilter implements Filter { }

filter配置扫描发现: /src/main/resources/META-INF/dubbo/com.alibaba.dubbo.rpc.Filter

filter的装配原理: List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

dubbo的SPI扩展机制就不具体展开啦~

RpcContext的生命周期

RpcContext -> RpcInvocation —服务调用— RpcInvocation -> RpcContext

业务扩展的调用:RpcContext.getContext().setAttachment(“bizMode”, (bizMode.toString()));

RpcContext.java

//创建一个线程隔离的上下文实例
    private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() {
        @Override
        protected RpcContext initialValue() {
            return new RpcContext();
        }
    };
    public static RpcContext getContext() {
        return LOCAL.get();
    }

 

dubbo对attachment的传递:

  • 本机(当前线程)的保存:RpcContext
  • 远程调用的保存和传递:RpcInvocation
  • 将RpcContext存入RpcInvocation:AbstractInvoker
public abstract class AbstractInvoker<T> implements Invoker<T> {
    @Override
    public Result invoke(Invocation inv) throws RpcException {
//节选。。。
        Map<String, String> context = RpcContext.getContext().getAttachments();
        if (context != null) {
          invocation.addAttachmentsIfAbsent(context);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
          invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//节选。。。
// return ...
    }
    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
}
  • 序列化与反序列化:DubboCodec (此处不展开)
  • 从RpcInvocation取出,存入提供方的RpcContext:ContextFilter
@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Map<String, String> attachments = invocation.getAttachments();
//节选。。。
                RpcContext.getContext().getAttachments().putAll(attachments);
//节选。。。
        try {
            RpcResult result = (RpcResult) invoker.invoke(invocation);
            // pass attachments to result
            result.addAttachments(RpcContext.getServerContext().getAttachments());
            return result;
        } finally {
            RpcContext.removeContext();
            RpcContext.getServerContext().clearAttachments();
        }
    }
}

步骤3-RocketMQ传递

RocketMQ设计时也预留了扩展打标的能力,只需要把模式标存入属性字段,就能跟随MQ把标传递到消费方。

消息体数据结构

org.apache.rocketmq.common.message.Message

private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;

 

//填入属性,仅包可见
void putProperty(final String name, final String value);

//填入自定义属性,与其他属性共享map,但对key过滤保留字
public void putUserProperty(final String name, final String value);

org.apache.rocketmq.common.message.MessageExt

是Message的子类

 private int queueId;

private int storeSize;

private long queueOffset;
private int sysFlag;
private long bornTimestamp;
private SocketAddress bornHost;

private long storeTimestamp;
private SocketAddress storeHost;
private String msgId;
private long commitLogOffset;
private int bodyCRC;
private int reconsumeTimes;

private long preparedTransactionOffset;

因此,可以在消息体的 Map<String,
String> properties 属性上附加打标信息。

 

发消息的扩展钩子

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.registerSendMessageHook(SendMessageHook)

 

收消息的扩展钩子

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.registerConsumeMessageHook(ConsumeMessageHook)

但由于收消息是一批一批收的,收到的是消息列表
List<MessageExt>,默认配置下只有一个元素,但允许配置多个,因此不能在这个钩子上做扩展。

因此,对starter做改造,在单个消息消费的位置增加了类似的hook扩展点。

ConsumerHook

public interface ConsumeOneMessageAdvice {
    String hookName();
    void consumeMessageBefore(final MessageExt msg);
    void consumeMessageAfter(final MessageExt msg);
}

 

步骤4-线程池子线程传递

BizModeContext的原理是用ThreadLocal存储线程范围的上下文,可是实际场景中,总会有些异步和并发的问题,需要使用到线程池。那么问题来了。

父线程context如何传递给子线程

jdk自带InheritableThreadLocal类解决了父子线程传递的问题。

Thread.init()

public class Thread implements Runnable {
    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
//节选。。。
        Thread parent = currentThread();
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
//节选。。。
    }
}
 
//子线程创建时会把父线程的ThreadLocalMap复制到子线程中
public class ThreadLocal<T> {
        private ThreadLocalMap(ThreadLocalMap parentMap) {
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];
            for (int j = 0; j < len; j++) {
                Entry e = parentTable[j];
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        int h = key.threadLocalHashCode & (len - 1);
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }
} 

 

线程池中子线程复用时怎样维护context

但如果使用了线程池,子线程运行完并不会销毁,被另一个父线程复用时不会重新初始化。

这时候我们需要借助一个开源框架 TransmittableThreadLocal  https://github.com/alibaba/transmittable-thread-local

(图片来自官网)

在获取子线程时重新读取父线程的上下文,子线程run()执行结束时清理子线程的上下文。

打通任督二脉后可以练什么武功?

打通模式标的透传后,能怎么使用呢?大家可以尽情发挥下想象力~何时何地只需要 BizModeContext.getBizMode()

  • 日志MDC打标:可以统一给日志记录加入模式标。
  • sql自动追加查询条件:通过mybatis插件扩展或甚至是数据源代理,可以给sql自动追加隔离标条件(虽然具体业务中并不那么好用)。
  • 全链路监控或压测:是的,如果打标的不是bizMode,而是traceId或影子标,就可以通过这个“任督二脉”透传整个系统!
  • 新模式插件化接入:各业务板块逐渐模块化后,可以通过给扩展点开发实现类的形式接入新模式。

远景-多模式插件化部署

我们期望,未来新的业务模式接入,就像安装插件一样无痛无感知。

 

新模式接入,只需要增加部署新的bizmodeX节点,其他业务不需要回归测试。

某个业务,例如bizmode100,部署重启时,其他业务不受影响。

这还需要一步一步来,目前我们先实现了“任督二脉”的打通,后面的故事,敬请期待哦~

版权声明:本文为syjkfind原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/syjkfind/p/10897012.html