1.Watermark简介

  Watermark是flink为了处理eventTime窗口计算提出的一种机制,本质上也是一种时间戳.

2.Watermark 作用

watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
3.如何利用Watermark 

   3.1 StreamExecutionEnvironment设置时间标识

// --------------------------------------------------------------------------------------------
    //  Time characteristic
    // --------------------------------------------------------------------------------------------

    /**
     * Sets the time characteristic for all streams create from this environment, e.g., processing
     * time, event time, or ingestion time.
     *
     * <p>If you set the characteristic to IngestionTime of EventTime this will set a default
     * watermark update interval of 200 ms. If this is not applicable for your application
     * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
     *
     * @param characteristic The time characteristic.
     */
    @PublicEvolving
    public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
        this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
        if (characteristic == TimeCharacteristic.ProcessingTime) {
            getConfig().setAutoWatermarkInterval(0);
        } else {
            getConfig().setAutoWatermarkInterval(200);
        }
    }

    /**
     * Gets the time characteristic.
     *
     * @see #setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
     *
     * @return The time characteristic.
     */
    @PublicEvolving
    public TimeCharacteristic getStreamTimeCharacteristic() {
        return timeCharacteristic;
    }

其中,TimeCharacteristic是个枚举类,定义了系统的基准时间

/**
 * The time characteristic defines how the system determines time for time-dependent
 * order and operations that depend on time (such as time windows).
 */
@PublicEvolving
public enum TimeCharacteristic {

    /**
     * Processing time for operators means that the operator uses the system clock of the machine
     * to determine the current time of the data stream. Processing-time windows trigger based
     * on wall-clock time and include whatever elements happen to have arrived at the operator at
     * that point in time.
     *
     * <p>Using processing time for window operations results in general in quite non-deterministic
     * results, because the contents of the windows depends on the speed in which elements arrive.
     * It is, however, the cheapest method of forming windows and the method that introduces the
     * least latency.
     */
    ProcessingTime,

    /**
     * Ingestion time means that the time of each individual element in the stream is determined
     * when the element enters the Flink streaming data flow. Operations like windows group the
     * elements based on that time, meaning that processing speed within the streaming dataflow
     * does not affect windowing, but only the speed at which sources receive elements.
     *
     * <p>Ingestion time is often a good compromise between processing time and event time.
     * It does not need any special manual form of watermark generation, and events are typically
     * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
     * only be introduced by streaming shuffles or split/join/union operations. The fact that
     * elements are not very much out-of-order means that the latency increase is moderate,
     * compared to event
     * time.
     */
    IngestionTime,

    /**
     * Event time means that the time of each individual element in the stream (also called event)
     * is determined by the event's individual custom timestamp. These timestamps either exist in
     * the elements from before they entered the Flink streaming dataflow, or are user-assigned at
     * the sources. The big implication of this is that it allows for elements to arrive in the
     * sources and in all operators out of order, meaning that elements with earlier timestamps may
     * arrive after elements with later timestamps.
     *
     * <p>Operators that window or order data with respect to event time must buffer data until they
     * can be sure that all timestamps for a certain time interval have been received. This is
     * handled by the so called "time watermarks".
     *
     * <p>Operations based on event time are very predictable - the result of windowing operations
     * is typically identical no matter when the window is executed and how fast the streams
     * operate. At the same time, the buffering and tracking of event time is also costlier than
     * operating with processing time, and typically also introduces more latency. The amount of
     * extra cost depends mostly on how much out of order the elements arrive, i.e., how long the
     * time span between the arrival of early and late elements is. With respect to the
     * "time watermarks", this means that the cost typically depends on how early or late the
     * watermarks can be generated for their timestamp.
     *
     * <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the
     * event's original time, rather than the time assigned at the data source. Practically, that
     * means that event time has generally more meaning, but also that it takes longer to determine
     * that all elements for a certain time have arrived.
     */
    EventTime
}

三个时间分别代表:事件生成时间EventTime,事件接入时间IngestionTime,事件处理时间ProcessingTime

 

3.2 Watermark的产生

/**
 * A Watermark tells operators that no elements with a timestamp older or equal
 * to the watermark timestamp should arrive at the operator. Watermarks are emitted at the
 * sources and propagate through the operators of the topology. Operators must themselves emit
 * watermarks to downstream operators using
 * {@link org.apache.flink.streaming.api.operators.Output#emitWatermark(Watermark)}. Operators that
 * do not internally buffer elements can always forward the watermark that they receive. Operators
 * that buffer elements, such as window operators, must forward a watermark after emission of
 * elements that is triggered by the arriving watermark.
 *
 * <p>In some cases a watermark is only a heuristic and operators should be able to deal with
 * late elements. They can either discard those or update the result and emit updates/retractions
 * to downstream operations.
 *
 * <p>When a source closes it will emit a final watermark with timestamp {@code Long.MAX_VALUE}.
 * When an operator receives this it will know that no more input will be arriving in the future.
 */

 其中,Output定义如下:

/**
 * A {@link org.apache.flink.streaming.api.operators.StreamOperator} is supplied with an object
 * of this interface that can be used to emit elements and other messages, such as barriers
 * and watermarks, from an operator.
 *
 * @param <T> The type of the elements that can be emitted.
 */

Watermark的产生方法

    /**
     * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
     * operators.
     *
     * <p>A watermark specifies that no element with a timestamp lower or equal to the watermark
     * timestamp will be emitted in the future.
     */
    void emitWatermark(Watermark mark);

 3.3 Operator 处理Watermark

OneInputStreamOperator#processElement

TwoInputStreamOperator#processElement1 

TwoInputStreamOperator#processElement2 

operator的关系类图

 以WindowOperator为例

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<W> mergingWindows = getMergingWindowSet();

            for (W window: elementWindows) {

                // adding the new window might result in a merge, in that case the actualWindow
                // is the merged window and we work with that. If we don't merge then
                // actualWindow == window
                W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
                    @Override
                    public void merge(W mergeResult,
                            Collection<W> mergedWindows, W stateWindowResult,
                            Collection<W> mergedStateWindows) throws Exception {

                        if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
                            throw new UnsupportedOperationException("The end timestamp of an " +
                                    "event-time window cannot become earlier than the current watermark " +
                                    "by merging. Current watermark: " + internalTimerService.currentWatermark() +
                                    " window: " + mergeResult);
                        } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
                            throw new UnsupportedOperationException("The end timestamp of a " +
                                    "processing-time window cannot become earlier than the current processing time " +
                                    "by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
                                    " window: " + mergeResult);
                        }

                        triggerContext.key = key;
                        triggerContext.window = mergeResult;

                        triggerContext.onMerge(mergedWindows);

                        for (W m: mergedWindows) {
                            triggerContext.window = m;
                            triggerContext.clear();
                            deleteCleanupTimer(m);
                        }

                        // merge the merged state windows into the newly resulting state window
                        windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                    }
                });

                // drop if the window is already late
                if (isWindowLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                isSkippedElement = false;

                W stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }

                windowState.setCurrentNamespace(stateWindow);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = actualWindow;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(actualWindow, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(actualWindow);
            }

            // need to make sure to update the merging state in state
            mergingWindows.persist();
        } else {
            for (W window: elementWindows) {

                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }

        // side output input event if
        // element not handled by any window
        // late arriving tag has been set
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {
            if (lateDataOutputTag != null){
                sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

总结:

流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。此时就是watermark发挥作用了,它表示当达到watermark到达之后,在watermark之前的数据已经全部达到(即使后面还有延迟的数据)

 参考资料

【1】https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html

【2】https://blog.csdn.net/lmalds/article/details/52704170

【3】https://www.jianshu.com/p/7d524ef8143c

版权声明:本文为davidwang456原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/davidwang456/p/11102496.html