零距离接触Flink:全面解读流计算框架入门与实操指南

前言

Apache Flink作为开源的分布式流处理框架,受到了广泛的关注和应用。本文将分享如何从零开始搭建一个Flink运行环境,并在其上运行一个“WordCount”的例子程序。

Flink环境搭建

1. 环境准备

Flink支持在Linux、MacOS和Windows三大平台上部署。本文以Linux环境为例。

需要的软件依赖如下:

  • JDK 8或以上版本
  • Maven 3.5+
  • Flink 1.14.5版本
代码语言:javascript
复制
# 安装JDK
yum install -y java-1.8.0-openjdk-devel

安装Maven

yum install -y maven

接着下载Flink压缩包进行解压:

代码语言:javascript
复制

2.单机模式运行Flink

单机模式下,JobManager和TaskManager均运行在同一台机器上。

代码语言:javascript
复制
# 启动JobManager
./bin/start-cluster.sh

提交并运行WordCount程序

./bin/flink run examples/streaming/WordCount.jar

本文以单机模式为例进行讲解。实际生产环境中,建议部署在集群模式下运行。

3. 分布式集群模式

在集群模式下,JobManager和TaskManager会部署在不同节点上。

  • 首先在一台机器上启动ResourceManager
  • 在其他Worker节点上启动TaskManager
  • 提交Job到JobManager进行调度和运行

以此实现Flink在分布式环境下高可靠且高性能的计算。

4. 编写WordCount程序

WordCount是一个流式WordCount程序,读取文本源头,以单词为单位进行计数统计。

代码语言:javascript
复制
// 定义文本源DataStream
DataStream<String> text = env.socketTextStream("localhost", 9999);

//将每行内容切分转换成单词列表
DataStream<String> words = text
.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) {
String[] split = value.toLowerCase().split("\W+");
// ...
}
});

//按单词进行计数统计
DataStream<Tuple2<String, Long>> counts = words
.keyBy(value -> value)
.sum(1);

//输出结果
counts.print();

5. 运行和结果

编译打包项目,使用FlinkClient提交Job:

代码语言:javascript
复制
mvn clean package

bin/flink run target/wordcount-1.0-SNAPSHOT.jar

运行程序,使用netcat工具发送输入字符串,可以实时看到统计结果:

代码语言:javascript
复制
nc localhost 9999
hello world bye
hello again

6.代码示例

这里提供一个完整的WordCount流处理程序代码示例:

代码语言:javascript
复制
// 导入Flink相关包
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class WordCount {

public static void main(String[] args) throws Exception {

// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 从文件读取文本行数据源
DataStream&lt;String&gt; text = env.addSource(new MySourceFunction());

// 将每行内容切分成单词
DataStream&lt;String&gt; words = text.flatMap(new FlatMapFunction&lt;String, String&gt;() {
  public void flatMap(String value, Collector&lt;String&gt; out) {
    String[] splits = value.split(&#34;\\s+&#34;);
    for (String word : splits) {
      out.collect(word);
    }
  }
});

// 按单词进行分组计数
DataStream&lt;Tuple2&lt;String, Long&gt;&gt; result = words.keyBy(e -&gt; e)
  .timeWindow(Time.seconds(5)) 
  .sum(1);

// 打印最终结果
result.print();

// 执行任务
env.execute(&#34;WordCount&#34;);

}

// 自定义文本数据源
public static class MySourceFunction implements SourceFunction<String> {

@Override
public void run(SourceContext&lt;String&gt; ctx) throws Exception {
  // 从文件或集合读取文本 
  // ...
  ctx.collect(&#34;hello world&#34;); 
}

@Override
public void cancel() {

}

}

}

该示例从文件读取文本行,进行词频统计,并以对象流的方式输出结果。希望能给您一个完整代码实例的参考!

Flink与Yarn集成

Flink可以利用Yarn资源管理器来管理和调度Flink作业的执行。主要有以下步骤:

1. 安装和配置Yarn

安装Hadoop并配置Yarn资源管理器。

2. 配置Flink支持Yarn

修改flink-conf.yaml配置文件,添加如下配置:

代码语言:javascript
复制
yarn.distributed.enabled: true

3. 打包Flink项目为Yarn应用

代码语言:javascript
复制
mvn package -Pyarn 

4. 提交Flink作业到Yarn

代码语言:javascript
复制
./bin/flink run -m yarn-cluster -yn 1 -ys 1 /path/to/job.jar

-m 参数指定使用Yarn作为资源管理器,-yn -ys 分配给任务的Container数量。

5. Yarn WebUI监控作业

可以在Yarn ResourceManager WebUI中查看和监控Flink作业状态。

6. 停止和重启作业

使用Flink Cli同样可以停止和重启在Yarn上运行的作业。

与此同时,Yarn也能根据负载自动扩缩容Flink作业上的Container数量。这样实现了Flink与Yarn的良好集成。

通过上述步骤就可以利用Yarn的资源管理能力来管理Flink分布式作业的执行了。

Flink通过时间窗口操作sql

Flink通过Table API和SQL来支持时间窗口的操作。

下面通过一个例子来说明:

1. 定义数据源

导入Flink的TableEnvironment:

代码语言:javascript
复制
TableEnvironment tableEnv = TableEnvironment.create(env);

从Kafka读取数据注册成Table:

代码语言:javascript
复制
tableEnv.connect(new FlinkKafkaConsumer<>(...)
.property(...));

2. 定义表结构

使用DDL定义Table结构:

代码语言:javascript
复制
CREATE TABLE inputTable (
id STRING,
timestamp TIMESTAMP,
...)
WITH (...);

3. 定义窗口

使用TUMBLE或HOP动态时间窗口

代码语言:javascript
复制
SELECT
id,
COUNT(*)
FROM
inputTable
GROUP BY
TUMBLE(timestamp, INTERVAL '5' MINUTE)

4. 窗口转换

支持窗口函数如SUM、COUNT、MAX等聚合计算:

代码语言:javascript
复制
SELECT
SUM(amount)
FROM
inputTable
GROUP BY
HOP(timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)

5. 输出结果

将结果输出到Kafka或打印:

代码语言:javascript
复制
tableEnv.toRetractStream[Row]...

通过Table API和SQL的时间窗口支持,可以更高效地操作和处理时间序列数据流。开发者可以使用熟悉的SQL语法进行流处理。

6. sql任务代码示例

这里提供一个完整的使用SQL实现单词计数的示例:

代码语言:javascript
复制
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(env);

// 从Kafka读取文本行数据
tableEnv.connect(new FlinkKafkaConsumer<>(...)
.topic("kafka_topic"))
.withFormat(new SimpleStringSchema())
.createTemporaryTable("lines");

// 分词表
tableEnv.executeSql(
"CREATE TABLE words WITH ('connector' = 'upsert', 'url' = '...") AS " +
"SELECT " +
" ROW_NUMBER() OVER() AS id, " +
" word " +
"FROM lines, LATERAL(FLATTEN(SPLIT(lines, ' ')))";

// 窗口聚合表
tableEnv.executeSql(
"CREATE TABLE word_counts WITH ('connector' = 'upsert', 'url' = '...'") AS " +
"SELECT " +
" word, " +
" COUNT(*) AS count " +
"FROM words " +
"GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), word");

// 输出结果
tableEnv.executeSql("INSERT INTO sink SELECT * FROM word_counts");

// 执行程序
env.execute();

这个完整示例包含数据输入、分词、窗口聚合和结果输出的全流程SQL定义。希望对您理解SQL实现流处理过程有帮助。

时间窗口说明

1. 滚动窗口

  • 滚动窗口分为定长窗口(TUMBLE)和滑动窗口(HOP)两种。
  • 定长窗口将事件锁定到连续的固定大小时间窗口中,窗口不重合。
  • 滑动窗口以固定时间间隔滑动,窗口重合部分可重复计算。

2. 窗口分配

  • 每条事件根据时间戳分配到对应的窗口份组中。
  • 窗口分配采用窗函数TIMESTAMP_WINDOW(timeField,窗口大小)实现。

3. 窗口聚合

  • 事件分配完毕后,对每个窗口执行聚合操作(如COUNT、SUM等)。
  • 窗口会将中间结果保存在状态后端(如RocksDB)。

4. 窗口结果输出

  • 窗口被关闭时(到期),将最终结果输出。
  • 也可以提前输出或定期输出中间结果。

5. 状态管理

  • 窗口状态会进行快照保存,实现断点续传重启能力。
  • 状态由KeyedStateBackend管理,比如RocksDB。

所以Flink时间窗口的原理就是:根据时间戳分配事件到窗口,窗口聚合操作更新状态,窗口关闭时输出结果。它独立于算子,为流处理引入了时间的概念。

6. 同批次时间窗口处理逻辑

如果一次从Kafka拉取的数据中,有一半的数据在当前时间窗口内,一半在窗口外,Flink会进行如下处理:

  • 先根据事件时间戳,将数据分配到对应的时间窗口分区组(keyed state)中。
  • 对每个时间窗口分区组单独处理:

    • 时间窗口内的数据按正常流程进行聚合计算。
    • 时间窗口外的数据不会参与当前窗口的聚合,但是会加入该key的back pressure。
  • 窗口结果输出时:
    • 只输出当前窗口已经关闭的分区组的结果。其他分区组处于开启状态,不会输出。
  • 周期性检查窗口状态:
    • 关闭那些超出时间范围的过期窗口。
    • 对还未到期的窗口继续累积状态,待到期后输出结果。

所以Flink可以正确区分时间窗口内外的数据:

  • 窗口内数据参与当前窗口计算
  • 窗口外数据加入back pressure,未来窗口处理
  • 只输出实际到期窗口的结果

这样保证了时间正确性,不会导致窗口结果计算错误