flink中的时间戳如何使用?---Watermark使用及原理
1.Watermark简介
Watermark是flink为了处理eventTime窗口计算提出的一种机制,本质上也是一种时间戳.
2.Watermark 作用
watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
3.如何利用Watermark
3.1 StreamExecutionEnvironment设置时间标识
// -------------------------------------------------------------------------------------------- // Time characteristic // -------------------------------------------------------------------------------------------- /** * Sets the time characteristic for all streams create from this environment, e.g., processing * time, event time, or ingestion time. * * <p>If you set the characteristic to IngestionTime of EventTime this will set a default * watermark update interval of 200 ms. If this is not applicable for your application * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}. * * @param characteristic The time characteristic. */ @PublicEvolving public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { this.timeCharacteristic = Preconditions.checkNotNull(characteristic); if (characteristic == TimeCharacteristic.ProcessingTime) { getConfig().setAutoWatermarkInterval(0); } else { getConfig().setAutoWatermarkInterval(200); } } /** * Gets the time characteristic. * * @see #setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic) * * @return The time characteristic. */ @PublicEvolving public TimeCharacteristic getStreamTimeCharacteristic() { return timeCharacteristic; }
其中,TimeCharacteristic是个枚举类,定义了系统的基准时间
/** * The time characteristic defines how the system determines time for time-dependent * order and operations that depend on time (such as time windows). */ @PublicEvolving public enum TimeCharacteristic { /** * Processing time for operators means that the operator uses the system clock of the machine * to determine the current time of the data stream. Processing-time windows trigger based * on wall-clock time and include whatever elements happen to have arrived at the operator at * that point in time. * * <p>Using processing time for window operations results in general in quite non-deterministic * results, because the contents of the windows depends on the speed in which elements arrive. * It is, however, the cheapest method of forming windows and the method that introduces the * least latency. */ ProcessingTime, /** * Ingestion time means that the time of each individual element in the stream is determined * when the element enters the Flink streaming data flow. Operations like windows group the * elements based on that time, meaning that processing speed within the streaming dataflow * does not affect windowing, but only the speed at which sources receive elements. * * <p>Ingestion time is often a good compromise between processing time and event time. * It does not need any special manual form of watermark generation, and events are typically * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can * only be introduced by streaming shuffles or split/join/union operations. The fact that * elements are not very much out-of-order means that the latency increase is moderate, * compared to event * time. */ IngestionTime, /** * Event time means that the time of each individual element in the stream (also called event) * is determined by the event's individual custom timestamp. These timestamps either exist in * the elements from before they entered the Flink streaming dataflow, or are user-assigned at * the sources. The big implication of this is that it allows for elements to arrive in the * sources and in all operators out of order, meaning that elements with earlier timestamps may * arrive after elements with later timestamps. * * <p>Operators that window or order data with respect to event time must buffer data until they * can be sure that all timestamps for a certain time interval have been received. This is * handled by the so called "time watermarks". * * <p>Operations based on event time are very predictable - the result of windowing operations * is typically identical no matter when the window is executed and how fast the streams * operate. At the same time, the buffering and tracking of event time is also costlier than * operating with processing time, and typically also introduces more latency. The amount of * extra cost depends mostly on how much out of order the elements arrive, i.e., how long the * time span between the arrival of early and late elements is. With respect to the * "time watermarks", this means that the cost typically depends on how early or late the * watermarks can be generated for their timestamp. * * <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the * event's original time, rather than the time assigned at the data source. Practically, that * means that event time has generally more meaning, but also that it takes longer to determine * that all elements for a certain time have arrived. */ EventTime }
三个时间分别代表:事件生成时间EventTime,事件接入时间IngestionTime,事件处理时间ProcessingTime
3.2 Watermark的产生
/** * A Watermark tells operators that no elements with a timestamp older or equal * to the watermark timestamp should arrive at the operator. Watermarks are emitted at the * sources and propagate through the operators of the topology. Operators must themselves emit * watermarks to downstream operators using * {@link org.apache.flink.streaming.api.operators.Output#emitWatermark(Watermark)}. Operators that * do not internally buffer elements can always forward the watermark that they receive. Operators * that buffer elements, such as window operators, must forward a watermark after emission of * elements that is triggered by the arriving watermark. * * <p>In some cases a watermark is only a heuristic and operators should be able to deal with * late elements. They can either discard those or update the result and emit updates/retractions * to downstream operations. * * <p>When a source closes it will emit a final watermark with timestamp {@code Long.MAX_VALUE}. * When an operator receives this it will know that no more input will be arriving in the future. */
其中,Output定义如下:
/** * A {@link org.apache.flink.streaming.api.operators.StreamOperator} is supplied with an object * of this interface that can be used to emit elements and other messages, such as barriers * and watermarks, from an operator. * * @param <T> The type of the elements that can be emitted. */
Watermark的产生方法
/** * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream * operators. * * <p>A watermark specifies that no element with a timestamp lower or equal to the watermark * timestamp will be emitted in the future. */ void emitWatermark(Watermark mark);
3.3 Operator 处理Watermark
OneInputStreamOperator#processElement
TwoInputStreamOperator#processElement1
TwoInputStreamOperator#processElement2
operator的关系类图
以WindowOperator为例
@Override public void processElement(StreamRecord<IN> element) throws Exception { final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { MergingWindowSet<W> mergingWindows = getMergingWindowSet(); for (W window: elementWindows) { // adding the new window might result in a merge, in that case the actualWindow // is the merged window and we work with that. If we don't merge then // actualWindow == window W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() { @Override public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception { if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { throw new UnsupportedOperationException("The end timestamp of an " + "event-time window cannot become earlier than the current watermark " + "by merging. Current watermark: " + internalTimerService.currentWatermark() + " window: " + mergeResult); } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) { throw new UnsupportedOperationException("The end timestamp of a " + "processing-time window cannot become earlier than the current processing time " + "by merging. Current processing time: " + internalTimerService.currentProcessingTime() + " window: " + mergeResult); } triggerContext.key = key; triggerContext.window = mergeResult; triggerContext.onMerge(mergedWindows); for (W m: mergedWindows) { triggerContext.window = m; triggerContext.clear(); deleteCleanupTimer(m); } // merge the merged state windows into the newly resulting state window windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); } }); // drop if the window is already late if (isWindowLate(actualWindow)) { mergingWindows.retireWindow(actualWindow); continue; } isSkippedElement = false; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = actualWindow; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(actualWindow); } // need to make sure to update the merging state in state mergingWindows.persist(); } else { for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(window); } } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){ sideOutput(element); } else { this.numLateRecordsDropped.inc(); } } }
总结:
参考资料
【1】https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html