1. 窗口 1.1 概念
注意:Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。
1.2 窗口分类
1.3 窗口分配器 定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。
窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。
窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。
时间窗口(处理时间、事件时间)
1 2 3 stream.keyBy(...) .window(TumblingProcessingTimeWindows.of(Time.seconds(5 ))) .aggregate(...)
1 2 3 stream.keyBy(...) .window(SlidingProcessingTimeWindows.of(Time.seconds(10 ),Time.seconds(5 ))) .aggregate(...)
1 2 3 stream.keyBy(...) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10 ))) .aggregate(...)
1 2 3 stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.seconds(5 ))) .aggregate(...)
1 2 3 stream.keyBy(...) .window(SlidingEventTimeWindows.of(Time.seconds(10 ),Time.seconds(5 ))) .aggregate(...)
1 2 3 stream.keyBy(...) .window(EventTimeSessionWindows.withGap(Time.seconds(10 ))) .aggregate(...)
计数窗口
1 2 3 4 stream.keyBy(...) .countWindow(10 )
1 2 3 stream.keyBy(...) .countWindow(10 ,3 )
1 2 3 stream.keyBy(...) .window(GlobalWindows.create());
1.4 窗口函数 窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数.
1.4.1 增量聚合函数(ReduceFunction / AggregateFunction) 窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class WindowReduceDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); env .socketTextStream("hadoop102" , 7777 ) .map(new WaterSensorMapFunction ()) .keyBy(r -> r.getId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10 ))) .reduce(new ReduceFunction <WaterSensor>() { @Override public WaterSensor reduce (WaterSensor value1, WaterSensor value2) throws Exception { System.out.println("调用reduce方法,之前的结果:" +value1 + ",现在来的数据:" +value2); return new WaterSensor (value1.getId(), System.currentTimeMillis(),value1.getVc()+value2.getVc()); } }) .print(); env.execute(); } }
ReduceFunction可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
Flink Window API中的aggregate就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类作为参数。
AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型IN就是输入流中元素的数据类型;累加器类型ACC则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。
接口中有四个方法:
createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
add():将输入的元素添加到累加器中。
getResult():从累加器中提取聚合的输出结果。
merge():合并两个累加器,并将合并后的状态作为一个累加器返回。
所以可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class WindowAggregateDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102" , 7777 ) .map(new WaterSensorMapFunction ()); KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId()); WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10 ))); SingleOutputStreamOperator<String> aggregate = sensorWS .aggregate( new AggregateFunction <WaterSensor, Integer, String>() { @Override public Integer createAccumulator () { System.out.println("创建累加器" ); return 0 ; } @Override public Integer add (WaterSensor value, Integer accumulator) { System.out.println("调用add方法,value=" +value); return accumulator + value.getVc(); } @Override public String getResult (Integer accumulator) { System.out.println("调用getResult方法" ); return accumulator.toString(); } @Override public Integer merge (Integer a, Integer b) { System.out.println("调用merge方法" ); return null ; } } ); aggregate.print(); env.execute(); } }
另外,Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的。
1.4.2 全窗口函数(full window funtions) 有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。
所以,我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
在Flink中,全窗口函数也有两种:WindowFunction和ProcessWindowFunction。
WindowFunction字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于WindowedStream调用.apply()方法,传入一个WindowFunction的实现类。
1 2 3 4 5 6 7 stream .keyBy(<key selector>) .window(<window assigner>) .apply(new MyWindowFunction ());
这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。
不过WindowFunction能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用。
处理窗口函数ProcessWindowFunction
ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class WindowProcessDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102" , 7777 ) .map(new WaterSensorMapFunction ()); KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId()); WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10 ))); SingleOutputStreamOperator<String> process = sensorWS .process( new ProcessWindowFunction <WaterSensor, String, String, TimeWindow>() { @Override public void process (String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long count = elements.spliterator().estimateSize(); long windowStartTs = context.window().getStart(); long windowEndTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS" ); String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS" ); out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } ); process.print(); env.execute(); } }
1.4.3 增量聚合和全窗口函数的结合使用 示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public <R> SingleOutputStreamOperator<R> reduce ( ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function) public <R> SingleOutputStreamOperator<R> reduce ( ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function) public <ACC,V,R> SingleOutputStreamOperator<R> aggregate ( AggregateFunction<T,ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction) public <ACC,V,R> SingleOutputStreamOperator<R> aggregate ( AggregateFunction<T,ACC,V> aggFunction, ProcessWindowFunction<V,R,K,W> windowFunction)
处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 public class WindowAggregateAndProcessDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102" , 7777 ) .map(new WaterSensorMapFunction ()); KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId()); WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10 ))); SingleOutputStreamOperator<String> result = sensorWS.aggregate( new MyAgg (), new MyProcess () ); result.print(); env.execute(); } public static class MyAgg implements AggregateFunction <WaterSensor, Integer, String>{ @Override public Integer createAccumulator () { System.out.println("创建累加器" ); return 0 ; } @Override public Integer add (WaterSensor value, Integer accumulator) { System.out.println("调用add方法,value=" +value); return accumulator + value.getVc(); } @Override public String getResult (Integer accumulator) { System.out.println("调用getResult方法" ); return accumulator.toString(); } @Override public Integer merge (Integer a, Integer b) { System.out.println("调用merge方法" ); return null ; } } public static class MyProcess extends ProcessWindowFunction <String,String,String,TimeWindow>{ @Override public void process (String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS" ); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS" ); long count = elements.spliterator().estimateSize(); out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } }
1.5 其他API Flink还提供了其他一些可选的API,让我们可以更加灵活地控制窗口行为。
1.5.1 触发器Trigger 触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
1 2 3 stream.keyBy(...) .window(...) .trigger(new MyTrigger ())
a. 常见触发器如下:
EventTimeTrigger :通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
ProcessTimeTrigger :通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
ProcessingTimeoutTrigger :可以将任何触发器转变为超时触发器。
ContinuousEventTimeTrigger :根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
ContinuousProcessingTimeTrigger :根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
CountTrigger :根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
DeltaTrigger :根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
PurgingTrigger :可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
NeverTrigger :任何时候都不触发窗口计算
b. Trigger的抽象类
Trigger 接口提供了五个方法来响应不同的事件:
onElement() 方法在每个元素被加入窗口时调用。
onEventTime() 方法在注册的 event-time timer 触发时调用。
onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
canMerge() 方法判断是否可以合并。
onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
clear() 方法处理在对应窗口被移除时所需的逻辑。
触发器源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 @PublicEvolving public abstract class Trigger <T, W extends Window > implements Serializable { private static final long serialVersionUID = -4104633972991191369L ; public abstract TriggerResult onElement (T element, long timestamp, W window, TriggerContext ctx) throws Exception; public abstract TriggerResult onProcessingTime (long time, W window, TriggerContext ctx) throws Exception; public abstract TriggerResult onEventTime (long time, W window, TriggerContext ctx) throws Exception; public boolean canMerge () { return false ; } public void onMerge (W window, OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException ("This trigger does not support merging." ); } public abstract void clear (W window, TriggerContext ctx) throws Exception; public interface TriggerContext { } public interface OnMergeContext extends TriggerContext { <S extends MergingState <?, ?>> void mergePartitionedState ( StateDescriptor<S, ?> stateDescriptor) ; } }
其中,
(1)前三个方法返回TriggerResult枚举类型,其包含四个枚举值:
CONTINUE:表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
FIRE:触发窗口计算,但是保留窗口元素。
PURGE:不触发窗口计算,丢弃窗口,并且删除窗口的元素。
FIRE_AND_PURGE:触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。
TriggerResult源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public enum TriggerResult { CONTINUE(false , false ), FIRE_AND_PURGE(true , true ), FIRE(true , false ), PURGE(false , true ); private final boolean fire; private final boolean purge; TriggerResult(boolean fire, boolean purge) { this .purge = purge; this .fire = fire; } public boolean isFire () { return fire; } public boolean isPurge () { return purge; } }
(2)每一个窗口分配器都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除,当定时器触发后,会调用对应的回调返回,返回TriggerResult。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存
c. ProcessingTimeTrigger源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @PublicEvolving public class ProcessingTimeTrigger extends Trigger <Object, TimeWindow> { private static final long serialVersionUID = 1L ; private ProcessingTimeTrigger () {} @Override public TriggerResult onElement ( Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime (long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime (long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } @Override public void clear (TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge () { return true ; } @Override public void onMerge (TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } @Override public String toString () { return "ProcessingTimeTrigger()" ; } public static ProcessingTimeTrigger create () { return new ProcessingTimeTrigger (); } }
在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())将会注册一个ProcessingTime定时器,时间参数是window.maxTimestamp(),也就是窗口的最终时间,当时间到达这个窗口最终时间,定时器触发并调用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,触发窗口中数据的计算,但是会保留窗口元素。
需要注意的是ProcessingTimeTrigger类只会在窗口的最终时间到达的时候触发窗口函数的计算,计算完成后并不会清除窗口中的数据,这些数据存储在内存中,除非调用PURGE或FIRE_AND_PURGE,否则数据将一直存在内存中。实际上,Flink中提供的Trigger类,除了PurgingTrigger类,其他的都不会对窗口中的数据进行清除。
EventTimeTriggerr在onElement设置的定时器:
EventTime通过registerEventTimeTimer注册定时器,在内部Watermark达到或超过Timer设定的时间戳时触发。
1.5.2 移除器Evictor 移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
1 2 3 stream.keyBy(...) .window(...) .evictor(new MyEvictor ())
当一个元素进入stream中之后,一般要经历Window(开窗)、Trigger(触发器)、Evitor(移除器)、Windowfunction(窗口计算操作),具体过程如下:
Window中的WindowAssigner(窗口分配器)定义了数据应该被分配到哪个窗口中,每一个 WindowAssigner都会有一个默认的Trigger,如果用户在代码中指定了窗口的trigger,默认的 trigger 将会被覆盖。
Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。
当Trigger fire了,窗口中的元素集合就会交给Evictor(如果指定了的话)。Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 Evictor 的话,窗口中的所有元素会一起交给WindowFunction进行计算。
WindowFunction收到了窗口的元素(可能经过了 Evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口计算操作有很多,比如预定义的sum(),min(),max(),还有 ReduceFunction,WindowFunction。WindowFunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。
现在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪个位置,让我们继续看为何使用Evictor。
Evictor接口定义如下:
evictBefore()包含要在窗口函数之前应用的清除逻辑,而evictAfter()包含要在窗口函数之后应用的清除逻辑。应用窗口函数之前清除的元素将不会被窗口函数处理。
窗格是具有相同Key和相同窗口的元素组成的桶,即同一个窗口中相同Key的元素一定属于同一个窗格。一个元素可以在多个窗格中(当一个元素被分配给多个窗口时),这些窗格都有自己的清除器实例。
注:window默认没有evictor,一旦把window指定Evictor,该window会由EvictWindowOperator类来负责操作。
2.2 Flink内置的Evitor
CountEvictor:保留窗口中用户指定的元素数量,并丢弃窗口缓冲区剩余的元素。
DeltaEvictor:依次计算窗口缓冲区中的最后一个元素与其余每个元素之间的delta值,若delta值大于等于指定的阈值,则该元素会被移除。使用DeltaEvictor清除器需要指定两个参数,一个是double类型的阈值;另一个是DeltaFunction接口的实例,DeltaFunction用于指定具体的delta值计算逻辑。
TimeEvictor:传入一个以毫秒为单位的时间间隔参数(例如以size表示),对于给定的窗口,取窗口中元素的最大时间戳(例如以max表示),使用TimeEvictor清除器将删除所有时间戳小于或等于max-size的元素(即清除从窗口开头到指定的截止时间之间的元素)。
2.2.1 CountEvictor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void evict (Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { if (size <= maxCount) { return ; } else { int evictedCount = 0 ; for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){ iterator.next(); evictedCount++; if (evictedCount > size - maxCount) { break ; } else { iterator.remove(); } } } }
2.2.2 DeltaEvictor
DeltaEvictor通过计算DeltaFunction的值(依次传入每个元素和最后一个元素),并将其与threshold进行对比,如果DeltaFunction计算结果大于等于threshold,则该元素会被移除。DeltaEvictor的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void evict (Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) { TimestampedValue<T> lastElement = Iterables.getLast(elements); for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){ TimestampedValue<T> element = iterator.next(); if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this .threshold) { iterator.remove(); } } }
2.2.3 TimeEvictor
TimeEvictor以时间为判断标准,决定元素是否会被移除。TimeEvictor会获取窗口中所有元素的最大时间戳currentTime,currentTime减去窗口大小(windowSize) 可得到能保留最久的元素的时间戳evictCutoff,然后再遍历窗口中的元素,如果元素的时间戳小于evictCutoff,就执行移除操作,否则不移除。具体逻辑如下图所示:
TimeEvictor的代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { // 如果element没有timestamp,直接返回 if (!hasTimestamp(elements)) { return; } // 获取elements中最大的时间戳(到来最晚的元素的时间) long currentTime = getMaxTimestamp(elements); // 截止时间为: 到来最晚的元素的时间 - 窗口大小(可以理解为保留最近的多久的元素) long evictCutoff = currentTime - windowSize; for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) { TimestampedValue<Object> record = iterator.next(); // 清除所有时间戳小于截止时间的元素 if (record.getTimestamp() <= evictCutoff) { iterator.remove(); } } }
2. 时间语义
2.1 时间语义分类
Event Time指的是数据流中每个元素或者每个事件自带的时间属性,一般是事件发生的时间。由于事件从发生到进入Flink时间算子之间有很多环节,一个较早发生的事件因为延迟可能较晚到达,因此使用Event Time意味着事件到达有可能是乱序的。
使用Event Time时,最理想的情况下,我们可以一直等待所有的事件到达后再进行时间窗口的处理。假设一个时间窗口内的所有数据都已经到达,基于Event Time的流处理会得到正确且一致的结果:无论我们是将同一个程序部署在不同的计算环境还是在相同的环境下多次计算同一份数据,都能够得到同样的计算结果。我们根本不同担心乱序到达的问题。但这只是理想情况,现实中无法实现,因为我们既不知道究竟要等多长时间才能确认所有事件都已经到达,更不可能无限地一直等待下去。在实际应用中,当涉及到对事件按照时间窗口进行统计时,Flink会将窗口内的事件缓存下来,直到接收到一个Watermark,以确认不会有更晚数据的到达。Watermark意味着在一个时间窗口下,Flink会等待一个有限的时间,这在一定程度上降低了计算结果的绝对准确性,而且增加了系统的延迟。在流处理领域,比起其他几种时间语义,使用Event Time的好处是某个事件的时间是确定的,这样能够保证计算结果在一定程度上的可预测性。
一个基于Event Time的Flink程序中必须定义Event Time,以及如何生成Watermark。我们可以使用元素中自带的时间,也可以在元素到达Flink后人为给Event Time赋值。
使用Event Time的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。
Processing Time - 当前机器的系统时钟事件
对于某个算子来说,Processing Time指算子使用当前机器的系统时钟来定义时间。在Processing Time的时间窗口场景下,无论事件什么时候发生,只要该事件在某个时间段达到了某个算子,就会被归结到该窗口下,不需要Watermark机制。对于一个程序在同一个计算环境来说,每个算子都有一定的耗时,同一个事件的Processing Time,第n个算子和第n+1个算子不同。如果一个程序在不同的集群和环境下执行时,限于软硬件因素,不同环境下前序算子处理速度不同,对于下游算子来说,事件的Processing Time也会不同,不同环境下时间窗口的计算结果会发生变化。因此,Processing Time在时间窗口下的计算会有不确定性。
Processing Time只依赖当前执行机器的系统时钟,不需要依赖Watermark,无需缓存。Processing Time是实现起来非常简单也是延迟最小的一种时间语义。
Ingestion Time - 事件到达flink source的时间
Ingestion Time是事件到达Flink Souce的时间。从Source到下游各个算子中间可能有很多计算环节,任何一个算子的处理速度快慢可能影响到下游算子的Processing Time。而Ingestion Time定义的是数据流最早进入Flink的时间,因此不会被算子处理速度影响。
Ingestion Time通常是Event Time和Processing Time之间的一个折中方案。比起Event Time,Ingestion Time可以不需要设置复杂的Watermark,因此也不需要太多缓存,延迟较低。比起Processing Time,Ingestion Time的时间是Souce赋值的,一个事件在整个处理过程从头至尾都使用这个时间,而且后续算子不受前序算子处理速度的影响,计算结果相对准确一些,但计算成本稍高。
注:Ingestion Time1.13 版本已经不再提了,这也是为啥官网的图没看到Ingestion Time的原因。目前推荐Event Time的时间语义。
调用 setStreamTimeCharacteristic 设置时间域,枚举类 TimeCharacteristic 预设了三种时间域,不显式设置的情况下,默认使用 TimeCharacteristic.EventTime(1.12 版本以前默认是 TimeCharacteristic.ProcessingTime)。
1 2 env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
在 1.12 以后版本默认是使用 EventTime,如果要显示使用 ProcessingTime,可以关闭 watermark(自动生成 watermark 的间隔设置为 0),设置
1 env.getConfig().setAutoWatermarkInterval(0 );
2.2 数据处理系统中的时间语义 在实际应用中,事件时间语义 会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。
在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。
3. 水位线 3.1 事件时间和窗口
这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的结果都是正确的。在一般的实时流处理场景中,事件时间可以基本与处理时间保持同步,只是略有延迟,同时保证了计算结果的正确性。
3.2 什么是水位线 引入水位线的原因:
分布式系统的网络传输的不确定性;
数据是乱序的;
支持事件时间的流处理器需要一种测量事件时间进度的方法,用以正确的处理窗口等操作;
在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
特性:
3.3 水位线和窗口的工作原理
flink中窗口并不是静态准备好的,而是动态创建–当有落在这个窗口区间范围的数据到达时,才创建对应的窗口。(我们认为到达窗口结束时间时,窗口就触发计算关闭。)事实上,触发计算、窗口关闭两个行为也可以分开。
3.4 生成水位线 a. 生成水位线的总体原则
完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
b. 水位线生成策略
在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。具体使用如下:
1 2 3 4 5 6 7 DataStream<Event> stream = env.addSource(new ClickSource ()); DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<watermark strategy>);
说明:WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public interface WatermarkStrategy <T> extends TimestampAssignerSupplier <T>, WatermarkGeneratorSupplier<T>{ @Override TimestampAssigner<T> createTimestampAssigner (TimestampAssignerSupplier.Context context) ; @Override WatermarkGenerator<T> createWatermarkGenerator (WatermarkGeneratorSupplier.Context context) ; }
c. flink内置水位线
对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class WatermarkMonoDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102" , 7777 ) .map(new WaterSensorMapFunction ()); WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy .<WaterSensor>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner <WaterSensor>() { @Override public long extractTimestamp (WaterSensor element, long recordTimestamp) { System.out.println("数据=" + element + ",recordTs=" + recordTimestamp); return element.getTs() * 1000L ; } }); SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy); sensorDSwithWatermark.keyBy(sensor -> sensor.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(10 ))) .process( new ProcessWindowFunction <WaterSensor, String, String, TimeWindow>() { @Override public void process (String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS" ); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS" ); long count = elements.spliterator().estimateSize(); out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } ) .print(); env.execute(); } }
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class WatermarkOutOfOrdernessDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102" , 7777 ) .map(new WaterSensorMapFunction ()); WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3 )) .withTimestampAssigner( (element, recordTimestamp) -> { System.out.println("数据=" + element + ",recordTs=" + recordTimestamp); return element.getTs() * 1000L ; }); SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy); sensorDSwithWatermark.keyBy(sensor -> sensor.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(10 ))) .process( new ProcessWindowFunction <WaterSensor, String, String, TimeWindow>() { @Override public void process (String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS" ); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS" ); long count = elements.spliterator().estimateSize(); out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } ) .print(); env.execute(); } }
d. 自定义水位线生成器
周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
下面是一段自定义周期性生成水位线的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class CustomPeriodicWatermarkExample { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .addSource(new ClickSource ()) .assignTimestampsAndWatermarks(new CustomWatermarkStrategy ()) .print(); env.execute(); } public static class CustomWatermarkStrategy implements WatermarkStrategy <Event> { @Override public TimestampAssigner<Event> createTimestampAssigner (TimestampAssignerSupplier.Context context) { return new SerializableTimestampAssigner <Event>() { @Override public long extractTimestamp (Event element,long recordTimestamp) { return element.timestamp; } }; } @Override public WatermarkGenerator<Event> createWatermarkGenerator (WatermarkGeneratorSupplier.Context context) { return new CustomBoundedOutOfOrdernessGenerator (); } } public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator <Event> { private Long delayTime = 5000L ; private Long maxTs = -Long.MAX_VALUE + delayTime + 1L ; @Override public void onEvent (Event event,long eventTimestamp,WatermarkOutput output) { maxTs = Math.max(event.timestamp,maxTs); } @Override public void onPeriodicEmit (WatermarkOutput output) { output.emitWatermark(new Watermark (maxTs - delayTime - 1L )); } } }
我们在onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次。
如果想修改默认周期时间,可以通过下面方法修改。例如:修改为400ms
1 env.getConfig().setAutoWatermarkInterval(400L);
断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下:
1 2 3 env.fromSource( kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3 )), "kafkasource" )
3.5 水位线的传播 在实际计算过程中,Flink的算子一般分布在多个并行的分区(或者称为实例)上,Flink需要将Watermark在并行环境下向前传播。如下图所示,Flink的每个并行算子子任务会维护针对该子任务的Event Time时钟,这个时钟记录了这个算子子任务Watermark处理进度,随着上游Watermark数据不断向下发送,算子子任务的Event Time时钟也要不断向前更新。由于上游各分区的处理速度不同,到达当前算子的Watermark也会有先后快慢之分,每个算子子任务会维护来自上游不同分区的Watermark信息,这是一个列表,列表内对应上游算子各分区的Watermark时间戳等信息。
当上游某分区有Watermark进入该算子子任务后,Flink先判断新流入的Watermark时间戳是否大于Partition Watermark列表内记录的该分区的历史Watermark时间戳,如果新流入的更大,则更新该分区的Watermark。例如,某个分区新流入的Watermark时间戳为4,算子子任务维护的该分区Watermark为1,那么Flink会更新Partition Watermark列表为最新的时间戳4。接着,Flink会遍历Partition Watermark列表中的所有时间戳,选择最小的一个作为该算子子任务的Event Time。同时,Flink会将更新的Event Time作为Watermark发送给下游所有算子子任务。算子子任务Event Time的更新意味着该子任务将时间推进到了这个时间,该时间之前的事件已经被处理并发送到下游。例如,图中第二步和第三步,Partition Watermark列表更新后,导致列表中最小时间戳发生了变化,算子子任务的Event Time时钟也相应进行了更新。整个过程完成了数据流中的Watermark推动算子子任务Watermark的时钟更新过程。Watermark像一个幕后推动者,不断将流处理系统的Event Time向前推进。
总结:
Flink某算子子任务根据各上游流入的Watermark来更新Partition Watermark列表。
选取Partition Watermark列表中最小的时间作为该算子的Event Time,并将这个时间发送给下游算子。
这样的设计机制满足了并行环境下Watermark在各算子中的传播问题,但是假如某个上游分区的Watermark一直不更新,Partition Watermark列表其他地方都在正常更新,唯独个别分区的时间停留在很早的某个时间,这会导致算子的Event Time时钟不更新,相应的时间窗口计算也不会被触发,大量的数据积压在算子内部得不到处理,整个流处理处于空转状态。这种问题可能出现在使用数据流自带的Watermark,自带的Watermark在某些分区下没有及时更新。针对这种问题,一种解决办法是根据机器当前的时钟周期性地生成Watermark。
此外,在union等多数据流处理时,Flink也使用上述Watermark更新机制,那就意味着,多个数据流的时间必须对齐,如果一方的Watermark时间较老,那整个应用的Event Time时钟也会使用这个较老的时间,其他数据流的数据会被积压。一旦发现某个数据流不再生成新的Watermark,我们要在SourceFunction中的SourceContext里调用markAsTemporarilyIdle设置该数据流为空闲状态。
3.6 迟到水位线的处理 我们思考一个问题:怎样避免乱序数据带来计算不正确性?
常用的解决办法是:当最大的事件时间maxEventTime达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口。
但是,我们应该等待多久的时间呢?由于网络、分布式等原因造成的延时,一般大多数迟到的数据都会在最近一段时间到来,这个最近一段时间一般是毫秒级的,Watermark就是做到了这样的保障。还有很少的一部分数据会迟到很久,我们可以通过allowedLateness和sideOutputLateData来兜底。
处理乱序数据,三重保证机制:
a. Watermark 能够保证迟到很短的时间的数据到来后(一般是迟到毫秒级别内的数据,最大不超过1s),触发窗口关闭并输出。(即能够hold住短时间内迟到的数据)
b. allowedLateness allowedLateness(lateness: Time):设置允许的延迟时间,默认为0,该方法仅对事件时间窗口有效。在水印通过窗口结尾后(即水印>=窗口结束时间),该方法指定的允许延迟时间才开始生效。该延迟时间与水印指定的允许延迟时间不冲突,相当于在水印延迟时间的基础上进行累加。落入该方法指定的允许延迟时间范围内的元素可能会导致窗口再次触发(例如EventTimeTrigger)。为了使这些元素正常被计算,Flink会保持窗口的状态,直到允许的延迟过期为止。一旦延迟过期,Flink将删除该窗口并删除其状态。
c. sideOutputLateData sideOutputLateData(outputTag: OutputTag[T]):将延迟到达的数据保存到outputTag对象中,OutputTag是一种类型化的命名标签,用于标记算子的侧道输出,单独收集延迟数据。后面可通过DataStream的getSideOutput(outputTag)方法得到被丢弃数据组成的数据流。
当指定的允许延迟大于0时,在水印通过窗口结尾后,将保留窗口及其内容。在这种情况下,当一个迟到但未被丢弃的元素到达时,它可能会导致该窗口的另一次触发。这次触发称为延迟触发,因为是由延迟事件触发的,与主触发(即窗口的第一次触发)相反。对于会话窗口,后期触发会进一步导致窗口合并,因为可能缩小两个预先存在的未合并窗口之间的间隙。当使用全局窗口时,没有数据是延迟的,因为全局窗口的结束时间戳是Long.MAX_VALUE。
注意:
后期触发的元素应更新先前计算的结果,即数据流将包含同一计算的多个结果。根据你的应用程序,需要考虑这些重复的结果或对它们进行重复数据删除。
在水印的基础上设置允许延迟机制后,数据可以延迟的时间范围是多少?在只设置了水印的情况下,如果满足当前进入Flink的最大事件时间>=窗口结束时间+允许的最大延迟时间,则触发窗口计算,发射计算结果并销毁窗口。在水印的基础上设置了允许延迟机制后,如果满足当前进入Flink的最大事件时间>=窗口结束时间+允许的最大延迟时间(水印指定的),则触发窗口计算,发射计算结果,但不会销毁窗口,窗口会保留计算状态并继续等待延迟数据;每条延迟数据到达后,如果落入窗口内,都会再次触发窗口计算,更新计算状态,发射出最新计算结果,直到满足条件:当前进入Flink的最大事件时间>=窗口结束时间+允许的最大延迟时间(水印指定的)+允许延迟机制指定的延迟时间,则关闭并销毁窗口。此后到达的延迟数据,由于窗口已经关闭,数据将进入侧道输出流进行单独存放,后期根据业务单独处理即可。
举例说明:
假设有乱序数据按照ABCDEFG的顺序依次到达Flink应用程序,并且设置了水印允许的最大延迟时间为3分钟,在水印的基础上又通过allowedLateness(Time.minutes(3))方法设置了允许的延迟时间为3分钟,使用sideOutputLateData(lateOutputTag)方法设置侧道输出,如下图所示。
当数据A到达时,由于窗口开始时间<=数据A的事件时间<窗口结束时间,因此数据A落入窗口内。
当数据B到达时,由于其事件时间>=窗口结束时间,因此数据B不属于该窗口。此时Watermark=进入Flink的当前最大事件时间‒允许的最大延迟时间=9:11‒3分钟=9:08。水印在窗口内,不会触发窗口计算。
当数据C到达时,由于窗口开始时间<=数据C的事件时间<窗口结束时间,因此数据C落入窗口内。
当数据D到达时,由于其事件时间>=窗口结束时间,因此数据D不属于该窗口。此时Watermark=进入Flink的当前最大事件时间‒允许的最大延迟时间=9:15‒3(分钟)=9:12>=窗口结束时间。水印在窗口外,触发窗口计算并发射计算结果。由于设置了允许延迟机制的延迟时间为3分钟,此时的窗口结束时间+允许的最大延迟时间(水印指定的)+允许延迟机制指定的延迟时间=9:10+3(分钟)+3(分钟)=9:16>9:15(进入Flink的当前最大事件时间),不满足窗口关闭的条件,因此窗口会继续等待延迟数据,并保留计算状态(此处的计算状态指的就是计算结果,例如窗口内数据的聚合结果)。
当数据E到达时,由于进入Flink的当前最大事件时间没有改变,窗口不会关闭,而是继续等待。窗口开始时间<=数据E的事件时间<窗口结束时间,因此数据E落入窗口内,并触发窗口计算,与上次计算的结果进行合并,发射出新的计算结果,如下图所示。
当数据F到达时,此时的窗口结束时间+允许的最大延迟时间(水印指定的)+允许延迟机制指定的延迟时间=9:10+3(分钟)+3(分钟)=9:16<=9:16(进入Flink的当前最大事件时间),满足窗口关闭的条件,因此窗口会关闭并销毁。
当数据G到达时,窗口开始时间<=数据G的事件时间<窗口结束时间,但是窗口已经关闭了,因此数据G将进入侧道输出流进行单独存放。通过侧道输出API可从侧道输出流中取出延迟严重的数据进行相应的业务处理。
4. 基于时间的合流-双流连接 可以发现,根据某个key合并两条流,与关系型数据库中表的join操作非常相近。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义实现各种需求,其实已经能够处理双流join的大多数场景。
不过处理函数是底层接口,所以尽管connect能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink的DataStrema API提供了内置的join算子。
4.1 窗口连结 Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
1)窗口联结的调用
窗口联结在代码中的实现,首先需要调用DataStream的.join()方法来合并两条流,得到一个JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算。通用调用形式如下:
1 2 3 4 5 stream1.join(stream2) .where(<KeySelector>) .equalTo(<KeySelector>) .window(<WindowAssigner>) .apply(<JoinFunction>)
上面代码中.where()的参数是键选择器(KeySelector),用来指定第一条流中的key;而.equalTo()传入的KeySelector则指定了第二条流中的key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。
这里.window()传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。
而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply(),没有其他替代的方法。
传入的JoinFunction也是一个函数类接口,使用时需要实现内部的.join()方法。这个方法有两个参数,分别表示两条流中成对匹配的数据。
其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:
1 SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
这句SQL中where子句的表达,等价于inner join … on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。
2)窗口联结实例
代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class WindowJoinDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env .fromElements( Tuple2.of("a" , 1 ), Tuple2.of("a" , 2 ), Tuple2.of("b" , 3 ), Tuple2.of("c" , 4 ) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L ) ); SingleOutputStreamOperator<Tuple3<String, Integer,Integer>> ds2 = env .fromElements( Tuple3.of("a" , 1 ,1 ), Tuple3.of("a" , 11 ,1 ), Tuple3.of("b" , 2 ,1 ), Tuple3.of("b" , 12 ,1 ), Tuple3.of("c" , 14 ,1 ), Tuple3.of("d" , 15 ,1 ) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Integer,Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L ) ); DataStream<String> join = ds1.join(ds2) .where(r1 -> r1.f0) .equalTo(r2 -> r2.f0) .window(TumblingEventTimeWindows.of(Time.seconds(10 ))) .apply(new JoinFunction <Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() { @Override public String join (Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception { return first + "<----->" + second; } }); join.print(); env.execute(); } }
4.2 间隔联结 在有些场景下,我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧,于是窗口内就都没有匹配了;会话窗口虽然时间不固定,但也明显不适合这个场景。基于时间的窗口联结已经无能为力了。
为了应对这样的需求,Flink提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。
1)间隔联结的原理
间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
如下图所示,我们可以清楚地看到间隔联结的方式:
下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。
所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。
2)间隔联结的调用
间隔联结在代码中,是基于KeyedStream的联结(join)操作。DataStream在keyBy得到KeyedStream之后,可以调用.intervalJoin()来合并两条流,传入的参数同样是一个KeyedStream,两者的key类型应该一致;得到的是一个IntervalJoin类型。后续的操作同样是完全固定的:先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”ProcessJoinFunction。
通用调用形式如下:
1 2 3 4 5 6 7 8 9 10 11 stream1 .keyBy(<KeySelector>) .intervalJoin(stream2.keyBy(<KeySelector>)) .between(Time.milliseconds(-2 ), Time.milliseconds(1 )) .process (new ProcessJoinFunction <Integer, Integer, String(){ @Override public void processElement (Integer left, Integer right, Context ctx, Collector<String> out) { out.collect(left + "," + right); } });
可以看到,抽象类ProcessJoinFunction就像是ProcessFunction和JoinFunction的结合,内部同样有一个抽象方法.processElement()。与其他处理函数不同的是,它多了一个参数,这自然是因为有来自两条流的数据。参数中left指的就是第一条流中的数据,right则是第二条流中与它匹配的数据。每当检测到一组匹配,就会调用这里的.processElement()方法,经处理转换之后输出结果。
3)间隔联结实例
案例需求:在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。
(1)代码实现:正常使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public class IntervalJoinDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env .fromElements( Tuple2.of("a" , 1 ), Tuple2.of("a" , 2 ), Tuple2.of("b" , 3 ), Tuple2.of("c" , 4 ) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L ) ); SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env .fromElements( Tuple3.of("a" , 1 , 1 ), Tuple3.of("a" , 11 , 1 ), Tuple3.of("b" , 2 , 1 ), Tuple3.of("b" , 12 , 1 ), Tuple3.of("c" , 14 , 1 ), Tuple3.of("d" , 15 , 1 ) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L ) ); KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0); KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0); ks1.intervalJoin(ks2) .between(Time.seconds(-2 ), Time.seconds(2 )) .process( new ProcessJoinFunction <Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() { @Override public void processElement (Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception { out.collect(left + "<------>" + right); } }) .print(); env.execute(); } }
(2)代码实现:处理迟到数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public class IntervalJoinWithLateDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env .socketTextStream("hadoop102" , 7777 ) .map(new MapFunction <String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map (String value) throws Exception { String[] datas = value.split("," ); return Tuple2.of(datas[0 ], Integer.valueOf(datas[1 ])); } }) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3 )) .withTimestampAssigner((value, ts) -> value.f1 * 1000L ) ); SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env .socketTextStream("hadoop102" , 8888 ) .map(new MapFunction <String, Tuple3<String, Integer, Integer>>() { @Override public Tuple3<String, Integer, Integer> map (String value) throws Exception { String[] datas = value.split("," ); return Tuple3.of(datas[0 ], Integer.valueOf(datas[1 ]), Integer.valueOf(datas[2 ])); } }) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3 )) .withTimestampAssigner((value, ts) -> value.f1 * 1000L ) ); KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0); KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0); OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag <>("ks1-late" , Types.TUPLE(Types.STRING, Types.INT)); OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag <>("ks2-late" , Types.TUPLE(Types.STRING, Types.INT, Types.INT)); SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2) .between(Time.seconds(-2 ), Time.seconds(2 )) .sideOutputLeftLateData(ks1LateTag) .sideOutputRightLateData(ks2LateTag) .process( new ProcessJoinFunction <Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() { @Override public void processElement (Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception { out.collect(left + "<------>" + right); } }); process.print("主流" ); process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据" ); process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据" ); env.execute(); } }