前言
flink处理数据流时,经常会遇到这样的情况:处理一个数据源时,往往需要将该源中的不同类型的数据做分割(分流)处理,假如使用 filter算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费;
flink中的侧输出,就是将数据流进行分割,而不对流进行复制的一种分流机制。flink的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据;
简单理解就是,根据业务上的一定规则,将一个源中的数据拆分成不同的流,即主流和侧输出流。
侧输出流(side output)
大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。
除了从DataStream操作输出主结果流外,也可以生成任一数量的额外的侧输出流。结果流可以和主输出流的类型可以不匹配,并且侧输出流可以有不同类型。侧输出流的操作当你分流时非常有用,之前你需要先复制一个流再过滤出来,有了侧输出流,就不需要这样操作。
具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文
的.output()方法就可以了。
DataStream<Integer> stream = env.addSource(...);
SingleOutputStreamOperator<Long> longStream stream.process(new ProcessFunction<Integer, Long>() {
@Override
public void processElement( Integer value, Context ctx, Collector<Integer> out) throws Exception {
// 转换成 Long,输出到主流中
out.collect(Long.valueOf(value));
// 转换成 String,输出到侧输出流中
ctx.output(outputTag, "side-output: " + String.valueOf(value));
}
});
当使用侧输出流时,首先需要定义一个OutputTag,它将要被用来确定一个侧输出流。
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
注意:侧输出流的类型是根据侧输出流包括元素的类型来确定。
如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用.getSideOutput() 方法,传入对应的OutputTag,这个方式与窗口API 中获取侧输出流是完全一样的。
DataStream<String> stringStream = longStream.getSideOutput(outputTag);
可以从以下方法中来把数据输出到侧输出流
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
在以上的函数中可以用参数Context来暴露给用户发送数据到侧输出流。下面例子是用ProcessFunction来发送数据到侧输出流。
import com.yibo.flink.datastream.Event;
import com.yibo.flink.sourcecustom.ClickSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import scala.Tuple3;
import java.time.Duration;
public class SideOutputStreamTest {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置生成水位线的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
//乱序流的Watermark生成
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
// 插入水位线的逻辑 设置 watermark 延迟时间,2 秒
.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 2s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
// 抽取时间戳的逻辑
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
);
OutputTag<Tuple3<String, String, Long>> maryTag = new OutputTag<Tuple3<String, String, Long>>("Mary"){};
OutputTag<Tuple3<String, String, Long>> boboTag = new OutputTag<Tuple3<String, String, Long>>("Bobo"){};
SingleOutputStreamOperator<Event> processStream = stream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
if ("Mary".equals(event.getUser())) {
context.output(maryTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp()));
} else if ("Bobo".equals(event.getUser())) {
context.output(boboTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp()));
} else {
out.collect(event);
}
}
});
processStream.print("else");
processStream.getSideOutput(maryTag).print("Mary");
processStream.getSideOutput(boboTag).print("Bobo");
env.execute();
}
}
import com.yibo.flink.datastream.Event;
import com.yibo.flink.sourcecustom.ClickSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class LateDataTest {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置生成水位线的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
//乱序流的Watermark生成
SingleOutputStreamOperator<Event> streamOperator = env.addSource(new ClickSource())
// 插入水位线的逻辑 设置 watermark 延迟时间,2 秒
.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 2s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
// 抽取时间戳的逻辑
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
);
//定义一个输出标签
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};
//统计每个url的访问量
SingleOutputStreamOperator<UrlViewCount> result = streamOperator.keyBy(Event::getUrl)
//滚动事件时间窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//允许窗口处理迟到数据 允许1分钟的延迟
.allowedLateness(Time.minutes(1))
//将最后的迟到数据输出到侧输出流
.sideOutputLateData(lateTag)
.aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
result.print("result");
result.getSideOutput(lateTag).print("late");
env.execute();
}
public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
@Override
public void process(String url, Context context, Iterable<Long> iterable, Collector<UrlViewCount> out) throws Exception {
Long urlCount = iterable.iterator().next();
//集合窗口信息输出
long start = context.window().getStart();
long end = context.window().getEnd();
UrlViewCount urlCountView = new UrlViewCount();
urlCountView.setUrl(url);
urlCountView.setCount(urlCount);
urlCountView.setWindowStart(start);
urlCountView.setWindowEnd(end);
out.collect(urlCountView);
}
}
}
Flink侧输出流有两个作用
1、分隔过滤:充当filter算子功能,将源中的不同类型的数据做分割处理。因为使用filter 算子对数据源进行筛选分割的话,会造成数据流的多次复制,导致不必要的性能浪费,过滤后不需要的数据可以重新写入Pulsar或Kafka的topic中供其他应用消费处理。
2、延时数据处理:在做对延时迟窗口计算时,对延时迟到的数据进行处理,即时数据迟到也不会造成丢失。
参考:
https://blog.csdn.net/rustwei/article/details/121102439