题目
我们有如下的用户访问数据:
userID visitDate visitCount
u01 2017/1/21 5
u02 2017/1/23 6
u03 2017/1/22 8
u04 2017/1/20 3
u01 2017/1/23 6
u01 2017/2/21 8
u02 2017/1/23 6
u01 2017/2/22 4
要求使用SQL统计出每个用户的累计访问次数,如下所示:
用户 月份 小计 累计
u01 2017-01 11 11
u01 2017-02 12 23
u02 2017-01 12 12
u03 2017-01 8 8
u04 2017-01 3 3
解释:小计为单月访问次数,累计为在原有单月访问次数基础上累加 将计算结果写入到mysql的表中,自己设计对应的表结构
实现代码
采用spark local模式,基于scala语言编写
import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object WorkVisiter {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Read Text File with Header")
.master("local")
.getOrCreate()// 定义结构化数据的字段名和类型 val schema = StructType(Seq( StructField("userID", StringType, nullable = true), StructField("visitDate", StringType, nullable = true), StructField("visitCount", IntegerType, nullable = true) )) // 读取文本文件,并按照空格进行拆分 val df = spark.read .option("header", "true") .option("delimiter", " ") .schema(schema) .csv("D:\\tmp\\work0614\\visit.csv") // 打印 DataFrame 数据
// df.show(false)
// 注册 DataFrame 为临时表 df.createOrReplaceTempView("tb_user") // 使用窗口函数计算小计和累计访问次数 val result = spark.sql( """ |SELECT | userID | , date | , MAX(max_sumAgg) MaxSumAgg |FROM ( | select | userID | ,date | ,visitCount | ,sumAgg | ,MAX(sumAgg) OVER (PARTITION BY userID order by date,userID) AS max_sumAgg | From ( | select | userID | ,date | ,visitCount | ,sumAgg | from ( | select | userID | ,CONCAT(SUBSTRING(visitDate, 1, 4), '-', LPAD(SUBSTRING(visitDate, 6, 1), 2, '0')) AS date | ,visitCount | ,sum(visitCount) over(partition by userID order by visitDate) as sumAgg | from | tb_user | ) t1 | ) t2 |)t3 |GROUP BY userID, date |ORDER BY userID, date | """.stripMargin) // 打印结果 result.show(false)
}
}
t2表打印内容
我的思路是首先将日期截取拼接为yyyy-mm
的格式;
使用sum(访问量)开窗,根据用户ID分区,按照月份排序,得出每月的累加,如下表;
|userID|date |visitCount|sumAgg|
+------+-------+----------+------+
|u01 |2017-01|5 |5 |
|u01 |2017-01|6 |11 |
|u01 |2017-02|8 |19 |
|u01 |2017-02|4 |23 |
|u02 |2017-01|6 |12 |
|u02 |2017-01|6 |12 |
|u03 |2017-01|8 |8 |
|u04 |2017-01|3 |3 |
+------+-------+----------+------+
###t3表打印内容
再用max最大值对累加结果开窗,根据用户ID分区,按照日期和用户ID排序,因为有重复的日期,所以需要两个限制条件,这一步计算出了用户每个月的最大访问量,但是未分区排序去重;
+------+-------+----------+------+----------+
|userID|date |visitCount|sumAgg|max_sumAgg|
+------+-------+----------+------+----------+
|u01 |2017-01|5 |5 |11 |
|u01 |2017-01|6 |11 |11 |
|u01 |2017-02|8 |19 |23 |
|u01 |2017-02|4 |23 |23 |
|u02 |2017-01|6 |12 |12 |
|u02 |2017-01|6 |12 |12 |
|u03 |2017-01|8 |8 |8 |
|u04 |2017-01|3 |3 |3 |
+------+-------+----------+------+----------+
最终输出结果
GROUP BY userID, date 语句的作用是将结果集按照 userID 和 date 进行分组。这意味着具有相同 userID 和 date 值的行将被归为同一组。
ORDER BY userID, date 语句的作用是对分组后的结果集进行排序。它按照 userID 和 date 的升序对结果进行排序,使得相同 userID 的行按照 date 的顺序排列。
这样做的功能是确保结果集中的行按照 userID 和 date 的顺序进行排列,使得相同用户的不同日期的记录按照日期的先后顺序呈现,方便查看和分析数据。
+------+-------+---------+
|userID|date |MaxSumAgg|
+------+-------+---------+
|u01 |2017-01|11 |
|u01 |2017-02|23 |
|u02 |2017-01|12 |
|u03 |2017-01|8 |
|u04 |2017-01|3 |
+------+-------+---------+