附代码|Flink实时计算TopN

这一章从实际的需求TopN来学习Flink中的窗口知识。

在上一章代码中使用了timeWindow,使得我们可以操作Flink流中的一个时间段内的数据,这就引出了Flink中的"窗口"概念:在大多数场景下,数据流都是"无限的",因引我们无法等待数据流终止后才进行一些统计计算,而通常的需求是对一段时间或是一定范围内的数据进行分析。

Flink提供了两种窗口:Time Window和Count Window,而本章涉及到Time Window的部分概念和用法。

代码语言:javascript
复制
package all.in.one.c03

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

object Chapter03 extends App {

// 使用createLocalEnvironmentWithWebUI可以在本地查看WebUI,在集群提交任务无需此方法
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
// Flink的输入为Source,这里我们构建一个定义Source:C03Source
val sourceDataStream = env.addSource(new C03Source())
// 接下来以品类做为key,计算每个品类的总价格
// 同样keyingBy来自org.apache.flink.streaming.api.scala.extensions.包,这里使用keyBy也可以
// keyBy操作后会返回一个KeyedStream,保存了key信息
sourceDataStream
.keyingBy(
.1)
// 与Chapter 02不同,这里我们调用window来设置窗口
// 以下代码说明参见README
.window(SlidingProcessingTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))
// 计算交易额的总和
.sum(1)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10L)))
.process(new ProcessAllWindowFunction[(String, Long), String, TimeWindow] {
override def process(context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {
val top3 = elements.toSeq
.sortBy(-
._2)
.take(3)
.zipWithIndex
.map { case ((item, price), idx) => s" {idx + 1}. item: $price" }
.mkString("\n")
out.collect(("-" * 16) + "\n" + top3)
}
})
.print()
env.execute("Chapter 03")

/**
* 每100ms产生一条"交易"数据,最终输出品类+价格(随机产生)
*/
class C03Source extends SourceFunction[(String, Long)] {
private val items = Array(
// 男装
"卫衣",
"T恤",
"牛仔裤",
"西服",
"风衣",
// 女装
"连衣裙",
"卫衣",
"衬衫",
"针织衫",
"休闲裤",
// 手机数码
"手机",
"手机配件",
"摄影摄像",
"影音娱乐",
"数码配件",
"智能设备",
"电子教育",
// 电脑办公
"电脑整机",
"电脑组件",
"外设",
"网络产品",
"办公设备",
"文具耗材",
// 家用电器
"电视",
"空调",
"洗衣机",
"冰箱",
"厨卫",
"生活电器",
// 户外运动
"运动鞋包",
"运行服饰",
"户外鞋服",
"户外装备",
"骑行",
"健身",
// 家具家装
"厨房卫浴",
"灯饰照明",
"五金工具",
"客厅家具",
"餐厅家具",
// 图书文娱
"少儿读物",
"文学",
"动漫",
"专业"
)
var running = true

/**
  * Flink会调用run来收集数据
  */
override def run(sourceContext: SourceFunction.SourceContext[(String, Long)]): Unit = {
  val random = new Random()
  do {
    val item = items(random.nextInt(items.length))
    val price = random.nextInt(3333) + 33
    // context.collect通知Flink新元素进入系统
    sourceContext.collect(item -> price.toLong)
    Thread.sleep(1000)
  } while (running)
}

override def cancel(): Unit = running = false

}

}

TopN需求

假设电商网站有这样一个榜单:展示1分钟内当前用户购买品类交易额的Top3,并且榜单要每10秒刷新一次。而我们现在可以拿到一个交易流,里面记录了交易品类和交易额,要如何实现呢?先看代码效果,启动all.in.one.c03.Chapter03后会看到输出如:

代码语言:javascript
复制
9> ----------------

  1. 厨卫: 3956
  2. 文具耗材: 3174
  3. 摄影摄像: 2738
    10> ----------------
  4. 厨卫: 3956
  5. 文具耗材: 3174
  6. 智能设备: 3108
    11> ----------------
  7. 影音娱乐: 4304
  8. 风衣: 4286
  9. 厨卫: 3956
    12> ----------------
  10. 牛仔裤: 5261
  11. 衬衫: 5155
  12. 厨卫: 4629
    ...
    ...

输入

之前的章节中,我们的输入是监听一个Socket地址读取数据(socketTextStream),这些都是Flink内置简单的输入方式,而本质上Flink Stream的输入就是实现相应的接口来接收数据:SourceFunction,它包括run(Flink调用run方法收集数据)和cancel(任务停止时调用),如socketTextStream就是创建了一个org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction

在代码中,我们实现了一个C03Source,它会约每100ms随机输出品类和价格数据。然后使用env.addSource(new C03Source())来得到相应的数据流DataStream。

窗口操作

根据需求,我们要计算过去60秒内的交易额,所以很容易想到:将时间窗口的时长设置为60秒,然后计算这段时间内每个品类的交易额的和,最后计算Top3就可以了。假设使用上一章的方法timeWindow(Time.seconds(60)),计算的结果是没有问题的,但是你会发现它是每60秒计算一次,无法满足需求每10秒更新一次榜单。此时会引出时间窗口的两个类型(这一章只介绍这两种):滚动(Tumbling)与滑动(Sliding)。

见上图,在定义窗口时指定它的大小,同时再指定触发窗口的间隔或者说滑动距离,这样创建的窗口就是滑动窗口。(timeWindow(Time.seconds(60))的方法实现就是创建一个滚动窗口)

在代码中,我们使用window(SlidingProcessingTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))创建窗口然后计算sum,此时得到了每一个品类过去一分钟内的总交易额。

在这之后,代码中使用windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10L)))方法指定了大小为10秒的滚动窗口。那么windowAllwindow的区别是什么呢?我们不能忘记一件事:Flink是分布式处理引擎,所以计算是同时发生在各个节点的,当使用windowAll时,数据会汇集一个节点去执行我们指定的计算。

windowAll方法返回的是AllWindowedStream类型的对象,使用process方法指定对数据进行何种操作。在process中,我们创建了ProcessAllWindowFunction的匿名子类对象,并将所有元素的Top3拼为字符串并交给Flink。

思考

  1. 计算TopN时我们用到了WindowAll,实际上它就是全局并发为1的操作,那么它的计算受单台机器的限制,且在实际的业务中业务的复杂和量级都可能会出现数据热点,这时要怎么解决呢?
  2. 观察创建时间窗口的类名称:SlidingProcessingTimeWindowsTumblingProcessingTimeWindows,时间窗口的“时间”是什么时间?假如某些数据有延迟很晚才出现在数据流中,如果你来设计Flink会怎么做?

以上问题会在后续的章节中找到答案。

WebUI

可以看到,本章我们使用以下代码创建了ENV:

代码语言:javascript
复制
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()

主要目的是为了在本地启动时可以看到WebUI(在集群提交任务无需此方法)。在启动后,日志中会输出类似以下内容:

代码语言:javascript
复制
[Chapter 03 - main] 17:07:13.338 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(RestServerEndpoint.java:139) - Starting rest endpoint.
[Chapter 03 - main] 17:07:13.862 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(RestServerEndpoint.java:242) - Rest endpoint listening at localhost:8081
[Chapter 03 - main] 17:07:13.865 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(WebMonitorEndpoint.java:702) - Web frontend listening at http://localhost:8081.
[Chapter 03 - mini-cluster-io-thread-1] 17:07:13.866 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(WebMonitorEndpoint.java:758) - http://localhost:8081 was granted leadership with leaderSessionID=30a5533d-fbf0-4b70-95d3-cd8813bb6492

说明WebUI启动成功,并且监听本地的8081端口,此时在浏览器中打开http://localhost:8081,在RunningJob选择刚刚启动的Job,可以看到类似以下页面:

可以先在页面上熟悉Flink WebUI提供的模块和可获取信息,后续会根据相应功能介绍页面的使用。