热点新闻
Apache Flink——侧输出流(side output)
2023-07-11 03:02  浏览:1952  搜索引擎搜索“手机低淘网”
温馨提示:信息一旦丢失不一定找得到,请务必收藏信息以备急用!本站所有信息均是注册会员发布如遇到侵权请联系文章中的联系方式或客服删除!
联系我时,请说明是在手机低淘网看到的信息,谢谢。
展会发布 展会网站大全 报名观展合作 软文发布

前言

flink处理数据流时,经常会遇到这样的情况:处理一个数据源时,往往需要将该源中的不同类型的数据做分割(分流)处理,假如使用 filter算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费;

flink中的侧输出,就是将数据流进行分割,而不对流进行复制的一种分流机制。flink的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据;

简单理解就是,根据业务上的一定规则,将一个源中的数据拆分成不同的流,即主流和侧输出流。

侧输出流(side output)

大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。

除了从DataStream操作输出主结果流外,也可以生成任一数量的额外的侧输出流。结果流可以和主输出流的类型可以不匹配,并且侧输出流可以有不同类型。侧输出流的操作当你分流时非常有用,之前你需要先复制一个流再过滤出来,有了侧输出流,就不需要这样操作。

具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文
的.output()方法就可以了。

DataStream<Integer> stream = env.addSource(...); SingleOutputStreamOperator<Long> longStream stream.process(new ProcessFunction<Integer, Long>() { @Override public void processElement( Integer value, Context ctx, Collector<Integer> out) throws Exception { // 转换成 Long,输出到主流中 out.collect(Long.valueOf(value)); // 转换成 String,输出到侧输出流中 ctx.output(outputTag, "side-output: " + String.valueOf(value)); } });

当使用侧输出流时,首先需要定义一个OutputTag,它将要被用来确定一个侧输出流。

OutputTag<String> outputTag = new OutputTag<String>("side-output") {};

注意:侧输出流的类型是根据侧输出流包括元素的类型来确定。

如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用.getSideOutput() 方法,传入对应的OutputTag,这个方式与窗口API 中获取侧输出流是完全一样的。

DataStream<String> stringStream = longStream.getSideOutput(outputTag);

可以从以下方法中来把数据输出到侧输出流

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • KeyedCoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

在以上的函数中可以用参数Context来暴露给用户发送数据到侧输出流。下面例子是用ProcessFunction来发送数据到侧输出流。

import com.yibo.flink.datastream.Event; import com.yibo.flink.sourcecustom.ClickSource; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import scala.Tuple3; import java.time.Duration; public class SideOutputStreamTest { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //设置生成水位线的时间间隔 env.getConfig().setAutoWatermarkInterval(100); //乱序流的Watermark生成 SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()) // 插入水位线的逻辑 设置 watermark 延迟时间,2 秒 .assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 2s WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 抽取时间戳的逻辑 .withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp()) ); OutputTag<Tuple3<String, String, Long>> maryTag = new OutputTag<Tuple3<String, String, Long>>("Mary"){}; OutputTag<Tuple3<String, String, Long>> boboTag = new OutputTag<Tuple3<String, String, Long>>("Bobo"){}; SingleOutputStreamOperator<Event> processStream = stream.process(new ProcessFunction<Event, Event>() { @Override public void processElement(Event event, Context context, Collector<Event> out) throws Exception { if ("Mary".equals(event.getUser())) { context.output(maryTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp())); } else if ("Bobo".equals(event.getUser())) { context.output(boboTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp())); } else { out.collect(event); } } }); processStream.print("else"); processStream.getSideOutput(maryTag).print("Mary"); processStream.getSideOutput(boboTag).print("Bobo"); env.execute(); } }

import com.yibo.flink.datastream.Event; import com.yibo.flink.sourcecustom.ClickSource; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.time.Duration; public class LateDataTest { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //设置生成水位线的时间间隔 env.getConfig().setAutoWatermarkInterval(100); //乱序流的Watermark生成 SingleOutputStreamOperator<Event> streamOperator = env.addSource(new ClickSource()) // 插入水位线的逻辑 设置 watermark 延迟时间,2 秒 .assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 2s WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 抽取时间戳的逻辑 .withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp()) ); //定义一个输出标签 OutputTag<Event> lateTag = new OutputTag<Event>("late"){}; //统计每个url的访问量 SingleOutputStreamOperator<UrlViewCount> result = streamOperator.keyBy(Event::getUrl) //滚动事件时间窗口 .window(TumblingEventTimeWindows.of(Time.seconds(10))) //允许窗口处理迟到数据 允许1分钟的延迟 .allowedLateness(Time.minutes(1)) //将最后的迟到数据输出到侧输出流 .sideOutputLateData(lateTag) .aggregate(new UrlViewCountAgg(), new UrlViewCountResult()); result.print("result"); result.getSideOutput(lateTag).print("late"); env.execute(); } public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> { @Override public Long createAccumulator() { return 0L; } @Override public Long add(Event value, Long accumulator) { return accumulator + 1; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return null; } } public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> { @Override public void process(String url, Context context, Iterable<Long> iterable, Collector<UrlViewCount> out) throws Exception { Long urlCount = iterable.iterator().next(); //集合窗口信息输出 long start = context.window().getStart(); long end = context.window().getEnd(); UrlViewCount urlCountView = new UrlViewCount(); urlCountView.setUrl(url); urlCountView.setCount(urlCount); urlCountView.setWindowStart(start); urlCountView.setWindowEnd(end); out.collect(urlCountView); } } }

Flink侧输出流有两个作用

  • 1、分隔过滤:充当filter算子功能,将源中的不同类型的数据做分割处理。因为使用filter 算子对数据源进行筛选分割的话,会造成数据流的多次复制,导致不必要的性能浪费,过滤后不需要的数据可以重新写入Pulsar或Kafka的topic中供其他应用消费处理。

  • 2、延时数据处理:在做对延时迟窗口计算时,对延时迟到的数据进行处理,即时数据迟到也不会造成丢失。

参考:
https://blog.csdn.net/rustwei/article/details/121102439

发布人:2a07****    IP:117.173.23.***     举报/删稿
展会推荐
让朕来说2句
评论
收藏
点赞
转发