client的工作过程,需要我们自己去编写对应的逻辑,我们目前只能从example写的例子来看。目前examle中提供了两个例子,一个是单机的,一个是集群的cluster,我们后续如果需要进行开发的话,其实也是开发我们自己的client,以及client的一些逻辑。我们主要看下集群的client是如何实现和消费的,又是怎么和server进行数据交互的。

我们来看看具体的代码:

protected void process() {
    int batchSize = 5 * 1024;
    while (running) {
        try {
            MDC.put("destination", destination);
            connector.connect();
            connector.subscribe();
            waiting = false;
            while (running) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    // try {
                    // Thread.sleep(1000);
                    // } catch (InterruptedException e) {
                    // }
                } else {
                    printSummary(message, batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } catch (Exception e) {
            logger.error("process error!", e);
        } finally {
            connector.disconnect();
            MDC.remove("destination");
        }
    }
}

这个的这样的过程是这样的

  • 连接,connector.connect()
  • 订阅,connector.subscribe
  • 获取数据,connector.getWithoutAck()
  • 业务处理
  • 提交确认,connector.ack()
  • 回滚,connector.rollback()
  • 断开连接,connector.disconnect()

我们具体来看下。

一、建立连接

CanalConnector主要有两个实现,一个是SimpleCanalConnector,一个是ClusterCanalConnector,我们主要看下ClusterCanalConnector,这也是我们要用的一个模式。

我们用的时候,通过一个工厂类生成我们需要的Connector,这里的工厂类是CanalConnectors,里面包含了生成ClusterCanalConnector的方法。

public static CanalConnector newClusterConnector(String zkServers, String destination, String username,
                                                 String password) {
    ClusterCanalConnector canalConnector = new ClusterCanalConnector(username,
        password,
        destination,
        new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
    canalConnector.setSoTimeout(30 * 1000);
    return canalConnector;
}

用到的参数有zk的地址,canal的名称,数据库的账号密码。里面有个ClusterNodeAccessStrategy是用来选择client的策略,这个ClusterNodeAccessStrategy的构造方法里面有些东西需要我们关注下。

1.1 ClusterNodeAccessStrategy

public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
    this.zkClient = zkClient;
    childListener = new IZkChildListener() {

        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
            initClusters(currentChilds);
        }

    };

    dataListener = new IZkDataListener() {

        public void handleDataDeleted(String dataPath) throws Exception {
            runningAddress = null;
        }

        public void handleDataChange(String dataPath, Object data) throws Exception {
            initRunning(data);
        }

    };

    String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination);
    this.zkClient.subscribeChildChanges(clusterPath, childListener);
    initClusters(this.zkClient.getChildren(clusterPath));

    String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination);
    this.zkClient.subscribeDataChanges(runningPath, dataListener);
    initRunning(this.zkClient.readData(runningPath, true));
}

这边起了两个监听器,都是监听server端的活动服务器的。一个是获取所有的server列表,一个是获取活动的server服务器,都是从zk的对应节点上去取的。

1.2 连接connect

获取到CanalConnector之后,就是真正的连接了。在ClusterCanalConnector中,我们可以看到,其实他底层用的也是SimpleCanalConnector,只不过加了一个选择的策略。

public void connect() throws CanalClientException {
    if (connected) {
        return;
    }

    if (runningMonitor != null) {
        if (!runningMonitor.isStart()) {
            runningMonitor.start();
        }
    } else {
        waitClientRunning();
        if (!running) {
            return;
        }
        doConnect();
        if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
            subscribe(filter);
        }
        if (rollbackOnConnect) {
            rollback();
        }
    }

    connected = true;
}

如果是集群模式的客户端,那么这边的runningMonitor不为空,因为他进行了初始化。我们主要看下runningMonitor.start()里面的操作。

public void start() {
    super.start();

    String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
    zkClient.subscribeDataChanges(path, dataListener);
    initRunning();
}

这边监听的路径是:/otter/canal/destinations/{destination}/{clientId}/running。如果有任何的变化,或节点的删除,那么执行dataListener里面的操作。

dataListener = new IZkDataListener() {

    public void handleDataChange(String dataPath, Object data) throws Exception {
        MDC.put("destination", destination);
        ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
        if (!isMine(runningData.getAddress())) {
            mutex.set(false);
        }

        if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
            release = true;
            releaseRunning();// 彻底释放mainstem
        }

        activeData = (ClientRunningData) runningData;
    }

    public void handleDataDeleted(String dataPath) throws Exception {
        MDC.put("destination", destination);
        mutex.set(false);
        // 触发一下退出,可能是人为干预的释放操作或者网络闪断引起的session expired timeout
        processActiveExit();
        if (!release && activeData != null && isMine(activeData.getAddress())) {
            // 如果上一次active的状态就是本机,则即时触发一下active抢占
            initRunning();
        } else {
            // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
            delayExector.schedule(new Runnable() {

                public void run() {
                    initRunning();
                }
            }, delayTime, TimeUnit.SECONDS);
        }
    }

};

这里的注释比较清楚,基本上如果数据发生了变化,那么进行节点释放后,将运行节点置为活动节点。如果发生了数据删除,那么直接触发退出,如果上一次的active状态是本机,那么触发一下active抢占,否则等待delayTime,默认5s后重试。下面我们主要看下initRunning。

1.3 initRunning

这块主要是创建运行节点的临时节点。节点路径是/otter/canal/destinations/{destination}/{clientId},节点内容是ClientRunningData的json序列化结果。连接的代码:

public InetSocketAddress processActiveEnter() {
    InetSocketAddress address = doConnect();
    mutex.set(true);
    if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
        subscribe(filter);
    }

    if (rollbackOnConnect) {
        rollback();
    }

    return address;
}

这块有几段逻辑,我们慢慢看下。

1.3.1 doConnect

这里是client直接连上了server,通过socket连接,也就是server暴露的socket端口。

private InetSocketAddress doConnect() throws CanalClientException {
    try {
        channel = SocketChannel.open();
        channel.socket().setSoTimeout(soTimeout);
        SocketAddress address = getAddress();
        if (address == null) {
            address = getNextAddress();
        }
        channel.connect(address);
        readableChannel = Channels.newChannel(channel.socket().getInputStream());
        writableChannel = Channels.newChannel(channel.socket().getOutputStream());
        Packet p = Packet.parseFrom(readNextPacket());
        if (p.getVersion() != 1) {
            throw new CanalClientException("unsupported version at this client.");
        }

        if (p.getType() != PacketType.HANDSHAKE) {
            throw new CanalClientException("expect handshake but found other type.");
        }
        //
        Handshake handshake = Handshake.parseFrom(p.getBody());
        supportedCompressions.addAll(handshake.getSupportedCompressionsList());
        //
        ClientAuth ca = ClientAuth.newBuilder()
            .setUsername(username != null ? username : "")
            .setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
            .setNetReadTimeout(soTimeout)
            .setNetWriteTimeout(soTimeout)
            .build();
        writeWithHeader(Packet.newBuilder()
            .setType(PacketType.CLIENTAUTHENTICATION)
            .setBody(ca.toByteString())
            .build()
            .toByteArray());
        //
        Packet ack = Packet.parseFrom(readNextPacket());
        if (ack.getType() != PacketType.ACK) {
            throw new CanalClientException("unexpected packet type when ack is expected");
        }

        Ack ackBody = Ack.parseFrom(ack.getBody());
        if (ackBody.getErrorCode() > 0) {
            throw new CanalClientException("something goes wrong when doing authentication: "
                                       + ackBody.getErrorMessage());
        }

        connected = true;
        return new InetSocketAddress(channel.socket().getLocalAddress(), channel.socket().getLocalPort());
    } catch (IOException e) {
        throw new CanalClientException(e);
    }
}

这边采用NIO编程,建立和server的socket连接后,发送了握手包和认证包,当收到ack包后,认为连接成功。认证包的服务端处理在ClientAuthenticationHandler类中,握手处理在HandshakeInitializationHandler类。

server接收到认证的消息后,会做如下的处理:

public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
    final Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
    switch (packet.getVersion()) {
        case SUPPORTED_VERSION:
        default:
            final ClientAuth clientAuth = ClientAuth.parseFrom(packet.getBody());
            // 如果存在订阅信息
            if (StringUtils.isNotEmpty(clientAuth.getDestination())
                && StringUtils.isNotEmpty(clientAuth.getClientId())) {
                ClientIdentity clientIdentity = new ClientIdentity(clientAuth.getDestination(),
                    Short.valueOf(clientAuth.getClientId()),
                    clientAuth.getFilter());
                try {
                    MDC.put("destination", clientIdentity.getDestination());
                    embeddedServer.subscribe(clientIdentity);
                    ctx.setAttachment(clientIdentity);// 设置状态数据
                    // 尝试启动,如果已经启动,忽略
                    if (!embeddedServer.isStart(clientIdentity.getDestination())) {
                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
                        if (!runningMonitor.isStart()) {
                            runningMonitor.start();
                        }
                    }
                } finally {
                    MDC.remove("destination");
                }
            }

            NettyUtils.ack(ctx.getChannel(), new ChannelFutureListener() {

                public void operationComplete(ChannelFuture future) throws Exception {
                    //忽略
                }

            });
            break;
    }
}

主要的逻辑在subscribe里面。如果metaManager没有启动,那么需要进行启动。启动时,会从zk节点下面拉取一些数据,包括客户端的消费位点情况等等。然后就是订阅,订阅是新建一个zk节点,路径为/otter/canal/destinations/{destination}/{clientId}。然后还有一些过滤器,也需要写到zk中。之后就是获取一下本client的位点信息,如果原来zk中包含,那么直接从内存中获取,否则取eventStore的第一条数据。

1.3.2 subscribe

发送订阅消息给server,通过socket的方式。这边是判断,如果filter不为空,才发送订阅消息。服务端的处理过程是这样的:

case SUBSCRIPTION:
    Sub sub = Sub.parseFrom(packet.getBody());
    if (StringUtils.isNotEmpty(sub.getDestination()) && StringUtils.isNotEmpty(sub.getClientId())) {
        clientIdentity = new ClientIdentity(sub.getDestination(),
                            Short.valueOf(sub.getClientId()),
                            sub.getFilter());
        MDC.put("destination", clientIdentity.getDestination());

        // 尝试启动,如果已经启动,忽略
        if (!embeddedServer.isStart(clientIdentity.getDestination())) {
            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
            if (!runningMonitor.isStart()) {
                runningMonitor.start();
            }
        }

        embeddedServer.subscribe(clientIdentity);
        ctx.setAttachment(clientIdentity);// 设置状态数据
        NettyUtils.ack(ctx.getChannel(), null);
    } else {
        NettyUtils.error(401,
            MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage(),
            ctx.getChannel(),
            null);
}
break;

类似于connect的过程,不过这边带上了filter的参数。这边启动了server以及他的监听器。

1.3.3 rollback

这里的回滚是指回滚server端记录的本client的位点信息。

public void rollback() throws CanalClientException {
    waitClientRunning();
    rollback(0);// 0代笔未设置
}

这里发送了rollback的指令。服务端是这么处理的:

case CLIENTROLLBACK:
    ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());
    MDC.put("destination", rollback.getDestination());
    if (StringUtils.isNotEmpty(rollback.getDestination())
        && StringUtils.isNotEmpty(rollback.getClientId())) {
        clientIdentity = new ClientIdentity(rollback.getDestination(),
            Short.valueOf(rollback.getClientId()));
        if (rollback.getBatchId() == 0L) {
            embeddedServer.rollback(clientIdentity);// 回滚所有批次
        } else {
            embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
        }
    } else {
        NettyUtils.error(401,
            MessageFormatter.format("destination or clientId is null", rollback.toString())
                .getMessage(),
            ctx.getChannel(),
            null);
    }
    break;

这里的batchId传入的是0,也就是要回滚所有的批次。我们来看下这个回滚的动作:

@Override
public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    // 因为存在第一次链接时自动rollback的情况,所以需要忽略未订阅
    boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
    if (!hasSubscribe) {
        return;
    }

    synchronized (canalInstance) {
        // 清除batch信息
        canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
        // rollback eventStore中的状态信息
        canalInstance.getEventStore().rollback();
        logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() });
    }
}

这里回滚的,其实是eventStore中的指针,把get的指针设置为之前ack的指针。

二、订阅数据

当client连接server完成后,就需要进行binlog数据的订阅。

public void subscribe() throws CanalClientException {
    subscribe(""); // 传递空字符即可
}

public void subscribe(String filter) throws CanalClientException {
    int times = 0;
    while (times < retryTimes) {
        try {
            currentConnector.subscribe(filter);
            this.filter = filter;
            return;
        } catch (Throwable t) {
            if (retryTimes == -1 && t.getCause() instanceof InterruptedException) {
                logger.info("block waiting interrupted by other thread.");
                return;
            } else {
                logger.warn(String.format(
                        "something goes wrong when subscribing from server: %s",
                        currentConnector != null ? currentConnector.getAddress() : "null"),
                        t);
                times++;
                restart();
                logger.info("restart the connector for next round retry.");
            }

        }
    }

    throw new CanalClientException("failed to subscribe after " + times + " times retry.");
}

订阅这块的内容不再赘述,在上面的connect过程中有提到。这边还有一个失败重试的机制,当异常不是中断异常的情况下,会重试重启client connector,直到达到了阈值retryTimes。

三、获取数据

在建立连接和进行数据订阅之后,就可以开始进行binlog数据的获取了。主要的方法是getWithOutAck这个方法,这种是需要client自己进行数据ack的,保证了只有数据真正的被消费,而且进行了业务逻辑处理之后,才会ack。当然,如果有了异常,也会进行一定次数的重试和重启。

public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
    waitClientRunning();
    try {
        ...//忽略
        writeWithHeader(Packet.newBuilder()
            .setType(PacketType.GET)
            .setBody(Get.newBuilder()
            .setAutoAck(false)
            .setDestination(clientIdentity.getDestination())
            .setClientId(String.valueOf(clientIdentity.getClientId()))
                .setFetchSize(size)
                .setTimeout(time)
                .setUnit(unit.ordinal())
                .build()
                .toByteString())
            .build()
            .toByteArray());
        return receiveMessages();
    } catch (IOException e) {
        throw new CanalClientException(e);
    }
}

我们可以看到,其实是发送了一个GET命令给server端,然后传递了一个参数batchSize,还有超时时间,而且不是自动提交的。服务端的处理是这样的:

embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());

也是调用的这个方法:

@Override
public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
                                                throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    checkSubscribe(clientIdentity);

    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    synchronized (canalInstance) {
        // 获取到流式数据中的最后一批获取的位置
        PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);

        Events<Event> events = null;
        if (positionRanges != null) { // 存在流数据
            events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit);
        } else {// ack后第一次获取
            Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
            if (start == null) { // 第一次,还没有过ack记录,则获取当前store中的第一条
                start = canalInstance.getEventStore().getFirstPosition();
            }

            events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
        }

        if (CollectionUtils.isEmpty(events.getEvents())) {
            logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
                    clientIdentity.getClientId(), batchSize);
            return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
        } else {
            // 记录到流式信息
            Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
            List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {

                public Entry apply(Event input) {
                    return input.getEntry();
                }
            });
            if (logger.isInfoEnabled()) {
                logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
                        clientIdentity.getClientId(),
                        batchSize,
                        entrys.size(),
                        batchId,
                        events.getPositionRange());
            }
            return new Message(batchId, entrys);
        }

    }
}

最主要的逻辑在这里:

  • 判断canalInstance是否已经启动:checkStart
  • 判断订阅列表中是否包含当前的client:checkSubscribe
  • 根据client信息从metaManager中获取最后消费的批次:getLastestBatch,这块在运行起来后,是从内存中取的,但是在instance启动时,是从zk中拉取的,是从/otter/canal/destinations/{destination}/{clientId}/mark下面获取的,后续也会定时(1s)刷新到这里面
  • 如果能获取到消费的批次,直接从eventStore的队列中获取数据。
  • 如果positionRanges为空,那么从metaManager中获取指针。如果指针也没有,说明原来没有ack过数据,需要从store中第一条开始获取。这个过程其实就是找start,也就是上一次ack的位置。
  • 调用getEvent,获取数据。根据传入的参数不同,调用不同的方法去获取数据,但是最终都是调用的goGet方法。这个doGet方法不是很复杂,主要是根据参数从store队列中获取数据,然后把指针进行新的设置。
  • 如果没有取到binlog数据,那么直接返回,批次号为-1。
  • 如果取到了数据,记录一下流式数据后返回。

结果封装在Messages中,最终改为Message,包含批次号和binlog列表。

四、业务处理

拿到message后,需要进行判断batchId,如果batchId=-1或者binlog大小为0,说明没有拿到数据。否则在message基础上进行逻辑处理。

Message的内容,后续我们再进行讨论。

五、提交确认

connector.ack(batchId); // 提交确认

提交批次id,底层发送CLIENTACK命令到server。server调用CanalServerWithEmbedded的ack方法来进行提交。

public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    checkSubscribe(clientIdentity);

    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    PositionRange<LogPosition> positionRanges = null;
    positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
    if (positionRanges == null) { // 说明是重复的ack/rollback
        throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
            clientIdentity.getClientId(),
            batchId));
    }

    // 更新cursor
    if (positionRanges.getAck() != null) {
        canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
        if (logger.isInfoEnabled()) {
            logger.info("ack successfully, clientId:{} batchId:{} position:{}",
                    clientIdentity.getClientId(),
                    batchId,
                    positionRanges);
        }
    }

    // 可定时清理数据
    canalInstance.getEventStore().ack(positionRanges.getEnd());

}

首先更新metaManager中的batch,然后更新ack指针,同时清理store中到ack指针位置的数据。

六、回滚

如果有失败的情况,需要进行回滚。发送CLIENTROLLBACK命令给server端,进行数据回滚。回滚单个批次时的处理逻辑是这样的:

@Override
public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
    checkStart(clientIdentity.getDestination());
    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());

    // 因为存在第一次链接时自动rollback的情况,所以需要忽略未订阅
    boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
    if (!hasSubscribe) {
        return;
    }
    synchronized (canalInstance) {
        // 清除batch信息
        PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity,
            batchId);
        if (positionRanges == null) { // 说明是重复的ack/rollback
            throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check",
                clientIdentity.getClientId(),
                batchId));
        }

        // lastRollbackPostions.put(clientIdentity,
        // positionRanges.getEnd());// 记录一下最后rollback的位置
        // TODO 后续rollback到指定的batchId位置
        canalInstance.getEventStore().rollback();// rollback
                                                 // eventStore中的状态信息
        logger.info("rollback successfully, clientId:{} batchId:{} position:{}",
            clientIdentity.getClientId(),
            batchId,
            positionRanges);
    }
}

这里的rollback到指定的batchId,其实是假的。他的rollback也是全量回滚到ack的指针位置。

七、断开连接

在发生异常情况时,client会断开与server的连接,也就是disconnect方法。

public void disconnect() throws CanalClientException {
    if (rollbackOnDisConnect && channel.isConnected()) {
        rollback();
    }

    connected = false;
    if (runningMonitor != null) {
        if (runningMonitor.isStart()) {
            runningMonitor.stop();
        }
    } else {
        doDisconnnect();
    }
}

判断是否在断开连接的时候回滚参数(默认false)和当前socket通道是否连接中,进行回滚。

否则调用runningMonitor.stop方法进行停止。主要的过程是这样的:

  • 取消监听/otter/canal/destinations/{destination}/{clientId}/running/节点变化信息
  • 删除上面这个节点
  • 关闭socket的读通道
  • 关闭socket的写通道
  • 关闭socket channel

版权声明:本文为f-zhao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/f-zhao/p/9105843.html