StructredStreaming+Kafka+Mysql(Spark实时计算| 天猫双十一实时报表分析)

前言

在这里插入图片描述

每年天猫双十一购物节,都会有一块巨大的实时作战大屏,展现当前的销售情况。这种炫酷的页面背后,其实有着非常强大的技术支撑,而这种场景其实就是实时报表分析

1、业务需求概述

​ 模拟交易订单数据,发送至分布式消息队列Kafka,实时消费交易订单数据进行分析处理,业务流程图如下所示:

在这里插入图片描述

实时从Kafka消费交易订单数据,按照不同维度实时统计【销售订单额】,最终报表Report结果存储MySQL数据库;

在这里插入图片描述

二 项目代码

1.模拟交易数据

编写程序,实时产生交易订单数据,使用Json4J类库转换数据为JSON字符,发送Kafka Topic中,代码如下:

代码语言:javascript
复制
// =================================== 订单实体类 =================================
package cn.itcast.spark.mock

/**

  • 订单实体类(Case Class)
  • @param orderId 订单ID
  • @param userId 用户ID
  • @param orderTime 订单日期时间
  • @param ip 下单IP地址
  • @param orderMoney 订单金额
  • @param orderStatus 订单状态
    */
    case class OrderRecord(
    orderId: String,
    userId: String,
    orderTime: String,
    ip: String,
    orderMoney: Double,
    orderStatus: Int
    )

// ================================== 模拟订单数据 ==================================
package cn.itcast.spark.mock

import java.util.Properties

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json

import scala.util.Random

/**

  • 模拟生产订单数据,发送到Kafka Topic中
  • Topic中每条数据Message类型为String,以JSON格式数据发送
    
  • 数据转换:
  • 将Order类实例对象转换为JSON格式字符串数据(可以使用json4s类库)
    

*/
object MockOrderProducer {

def main(args: Array[String]): Unit = {
    
    var producer: KafkaProducer[String, String] = null
    try {
        // 1. Kafka Client Producer 配置信息
        val props = new Properties()
        props.put("bootstrap.servers", "node1.itcast.cn:9092")
        props.put("acks", "1")
        props.put("retries", "3")
        props.put("key.serializer", classOf[StringSerializer].getName)
        props.put("value.serializer", classOf[StringSerializer].getName)
        
        // 2. 创建KafkaProducer对象,传入配置信息
        producer = new KafkaProducer[String, String](props)
        
        // 随机数实例对象
        val random: Random = new Random()
        // 订单状态:订单打开 0,订单取消 1,订单关闭 2,订单完成 3
        val allStatus =Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
        
        while(true){
            // 每次循环 模拟产生的订单数目
            val batchNumber: Int = random.nextInt(1) + 5
            (1 to batchNumber).foreach{number =>
                val currentTime: Long = System.currentTimeMillis()
                val orderId: String = s"${getDate(currentTime)}%06d".format(number)
                val userId: String = s"${1 + random.nextInt(5)}%08d".format(random.nextInt(1000))
                val orderTime: String = getDate(currentTime, format="yyyy-MM-dd HH:mm:ss.SSS")
                val orderMoney: String = s"${5 + random.nextInt(500)}.%02d".format(random.nextInt(100))
                val orderStatus: Int = allStatus(random.nextInt(allStatus.length))
                // 3. 订单记录数据
                val orderRecord: OrderRecord = OrderRecord(
                    orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus
                )
                // 转换为JSON格式数据
                val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)
                println(orderJson)
                // 4. 构建ProducerRecord对象
                val record = new ProducerRecord[String, String]("orderTopic", orderJson)
                // 5. 发送数据:def send(messages: KeyedMessage[K,V]*), 将数据发送到Topic
                producer.send(record)
            }
            Thread.sleep(random.nextInt(100) + 500)
        }
    }catch {
        case e: Exception => e.printStackTrace()
    }finally {
        if(null != producer) producer.close()
    }
}

/**=================获取当前时间=================*/
def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = {
    val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)
    val formatDate: String = fastFormat.format(time)  // 格式化日期
    formatDate
}

/**================= 获取随机IP地址 =================*/
def getRandomIp: String = {
    // ip范围
    val range: Array[(Int, Int)] = Array(
        (607649792,608174079), //36.56.0.0-36.63.255.255
        (1038614528,1039007743), //61.232.0.0-61.237.255.255
        (1783627776,1784676351), //106.80.0.0-106.95.255.255
        (2035023872,2035154943), //121.76.0.0-121.77.255.255
        (2078801920,2079064063), //123.232.0.0-123.235.255.255
        (-1950089216,-1948778497),//139.196.0.0-139.215.255.255
        (-1425539072,-1425014785),//171.8.0.0-171.15.255.255
        (-1236271104,-1235419137),//182.80.0.0-182.92.255.255
        (-770113536,-768606209),//210.25.0.0-210.47.255.255
        (-569376768,-564133889) //222.16.0.0-222.95.255.255
    )
    // 随机数:IP地址范围下标
    val random = new Random()
    val index = random.nextInt(10)
    val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)

    // 转换Int类型IP地址为IPv4格式
    number2IpString(ipNumber)
}

/**=================将Int类型IPv4地址转换为字符串类型=================*/
def number2IpString(ip: Int): String = {
    val buffer: Array[Int] = new Array[Int](4)
    buffer(0) = (ip >> 24) & 0xff
    buffer(1) = (ip >> 16) & 0xff
    buffer(2) = (ip >> 8) & 0xff
    buffer(3) = ip & 0xff
    // 返回IPv4地址
    buffer.mkString(".")
}

}

2.创建Maven模块

创建Maven模块,加入相关依赖,具体内如如下:

代码语言:javascript
复制
    <repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>

&lt;properties&gt;
    &lt;scala.version&gt;2.11.12&lt;/scala.version&gt;
    &lt;scala.binary.version&gt;2.11&lt;/scala.binary.version&gt;
    &lt;spark.version&gt;2.4.5&lt;/spark.version&gt;
    &lt;hadoop.version&gt;2.6.0-cdh5.16.2&lt;/hadoop.version&gt;
    &lt;kafka.version&gt;2.0.0&lt;/kafka.version&gt;
    &lt;mysql.version&gt;8.0.19&lt;/mysql.version&gt;
&lt;/properties&gt;

&lt;dependencies&gt;

    &lt;!-- 依赖Scala语言 --&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.scala-lang&lt;/groupId&gt;
        &lt;artifactId&gt;scala-library&lt;/artifactId&gt;
        &lt;version&gt;${scala.version}&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;!-- Spark Core 依赖 --&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
        &lt;artifactId&gt;spark-core_${scala.binary.version}&lt;/artifactId&gt;
        &lt;version&gt;${spark.version}&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;!-- Spark SQL 依赖 --&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
        &lt;artifactId&gt;spark-sql_${scala.binary.version}&lt;/artifactId&gt;
        &lt;version&gt;${spark.version}&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;!-- Structured Streaming + Kafka  依赖 --&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
        &lt;artifactId&gt;spark-sql-kafka-0-10_${scala.binary.version}&lt;/artifactId&gt;
        &lt;version&gt;${spark.version}&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;!-- Hadoop Client 依赖 --&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.apache.hadoop&lt;/groupId&gt;
        &lt;artifactId&gt;hadoop-client&lt;/artifactId&gt;
        &lt;version&gt;${hadoop.version}&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;!-- Kafka Client 依赖 --&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
        &lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
        &lt;version&gt;2.0.0&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;!-- 根据ip转换为省市区 --&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;org.lionsoul&lt;/groupId&gt;
        &lt;artifactId&gt;ip2region&lt;/artifactId&gt;
        &lt;version&gt;1.7.2&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;!-- MySQL Client 依赖 --&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;mysql&lt;/groupId&gt;
        &lt;artifactId&gt;mysql-connector-java&lt;/artifactId&gt;
        &lt;version&gt;${mysql.version}&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;!-- JSON解析库:fastjson --&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;com.alibaba&lt;/groupId&gt;
        &lt;artifactId&gt;fastjson&lt;/artifactId&gt;
        &lt;version&gt;1.2.47&lt;/version&gt;
    &lt;/dependency&gt;

&lt;/dependencies&gt;

&lt;build&gt;
    &lt;outputDirectory&gt;target/classes&lt;/outputDirectory&gt;
    &lt;testOutputDirectory&gt;target/test-classes&lt;/testOutputDirectory&gt;
    &lt;resources&gt;
        &lt;resource&gt;
            &lt;directory&gt;${project.basedir}/src/main/resources&lt;/directory&gt;
        &lt;/resource&gt;
    &lt;/resources&gt;
    &lt;!-- Maven 编译的插件 --&gt;
    &lt;plugins&gt;
        &lt;plugin&gt;
            &lt;groupId&gt;org.apache.maven.plugins&lt;/groupId&gt;
            &lt;artifactId&gt;maven-compiler-plugin&lt;/artifactId&gt;
            &lt;version&gt;3.0&lt;/version&gt;
            &lt;configuration&gt;
                &lt;source&gt;1.8&lt;/source&gt;
                &lt;target&gt;1.8&lt;/target&gt;
                &lt;encoding&gt;UTF-8&lt;/encoding&gt;
            &lt;/configuration&gt;
        &lt;/plugin&gt;
        &lt;plugin&gt;
            &lt;groupId&gt;net.alchim31.maven&lt;/groupId&gt;
            &lt;artifactId&gt;scala-maven-plugin&lt;/artifactId&gt;
            &lt;version&gt;3.2.0&lt;/version&gt;
            &lt;executions&gt;
                &lt;execution&gt;
                    &lt;goals&gt;
                        &lt;goal&gt;compile&lt;/goal&gt;
                        &lt;goal&gt;testCompile&lt;/goal&gt;
                    &lt;/goals&gt;
                &lt;/execution&gt;
            &lt;/executions&gt;
        &lt;/plugin&gt;
    &lt;/plugins&gt;
&lt;/build&gt;</code></pre></div></div><h4 id="3nnid" name="%E9%A1%B9%E7%9B%AE%E7%BB%93%E6%9E%84%E5%A6%82%E4%B8%8B:">项目结构如下:</h4><figure class=""><div class="rno-markdown-img-url" style="text-align:center"><div class="rno-markdown-img-url-inner" style="width:49.49%"><div style="width:100%"><img src="https://cdn.static.attains.cn/app/developer-bbs/upload/1723345258131983949.png" /></div><div class="figure-desc">在这里插入图片描述</div></div></div></figure><h3 id="8tehc" name="3.%E6%A0%B8%E5%BF%83%E4%BB%A3%E7%A0%81">3.核心代码</h3><p>RealTimeOrderReport.java</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">package cn.itcast.spark.report

import java.util.concurrent.TimeUnit

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.expressions.{UserDefinedAggregateFunction, UserDefinedFunction}
import org.apache.spark.sql.types.{DataType, DataTypes}
import org.lionsoul.ip2region.{DataBlock, DbConfig, DbSearcher}

def printToConsole(dataFrame: DataFrame) = {
	dataFrame.writeStream
	.format(&#34;console&#34;)
	.outputMode(OutputMode.Update())
	.option(&#34;numRows&#34;,&#34;50&#34;)
	.option(&#34;truncate&#34;,&#34;false&#34;)
	.start()
}



def main(args: Array[String]): Unit = {
	//1.获取spark实例对象
	val spark: SparkSession = SparkSession.builder()
		.appName(&#34;isDemo&#34;)
		.master(&#34;local[3]&#34;)
		.config(&#34;spark.sql.shuffle.partitions&#34;, &#34;3&#34;)
		.getOrCreate()
	import spark.implicits._

	val dataFrame: DataFrame = spark.readStream
		.format(&#34;kafka&#34;)
		.option(&#34;kafka.bootstrap.servers&#34;, &#34;node1.itcast.cn:9092&#34;)
		.option(&#34;subscribe&#34;, &#34;orderTopic&#34;)
		.load()
		.selectExpr(&#34;CAST (value AS STRING)&#34;)

// printToConsole(dataFrame)

val ip_to_region: UserDefinedFunction = udf((ip: String) => {
// 1. 创建DbSearch对象,指定数据字典文件位置
val dbSearcher = new DbSearcher(new DbConfig(), "src/main/dataset/ip2region.db")
// 2. 传递IP地址,解析获取数据
val dataBlock: DataBlock = dbSearcher.btreeSearch(ip)
// 3. 获取解析省份和城市
val region: String = dataBlock.getRegion
//println(region) // 中国|0|海南省|海口市|教育网
val Array(_, _, pronvice, city, _) = region.split("\|")
(pronvice, city)
})
val frame: DataFrame = dataFrame
.select(
get_json_object(&#34;value&#34;, &#34;.ip").as("ip"),
get_json_object(&#34;value&#34;, &#34;.orderMoney")
.cast(DataTypes.createDecimalType(10, 2))
.as("money"),
get_json_object(&#34;value&#34;, &#34;.orderStatus").as("status")
)
.filter($"status" === 0)
.withColumn("region", ip_to_region($"ip"))
.select(
$"region._1".as("province"),
$"region._2".as("city"),
$"money"
)

// printToConsole(frame)

// /**
// * 订单实体类(Case Class)
// * @param orderId 订单ID
// * @param userId 用户ID
// * @param orderTime 订单日期时间
// * @param ip 下单IP地址
// * @param orderMoney 订单金额
// * @param orderStatus 订单状态
// */
//

// printToConsole(dframe)
//SELECT "国家" as type, SUM(money) as totalMoney FROM tmp_view
//SELECT province as type, SUM(money) as totalMoney FROM tmp_view GROUP BY province
//SELECT city as type, SUM(money) as totalMoney FROM (SELECT * FROM tmp_view WHERE city in ("北京市", "上海市", "深圳市", "广州市", "杭州市", "成都市", "南京市", "武汉市", "西安市"))t GROUP BY t.city
frame.createOrReplaceTempView("tmp_view")
val f: DataFrame = spark.sql(
"""
|SELECT "国家" as type, SUM(money) as totalMoney FROM tmp_view
""".stripMargin)
val f2: DataFrame = spark.sql(
"""
|SELECT province as type, SUM(money) as totalMoney FROM tmp_view GROUP BY province
""".stripMargin)
val f3: DataFrame = spark.sql(
"""
|SELECT city as type, SUM(money) as totalMoney FROM (SELECT * FROM tmp_view WHERE city in ("北京市", "上海市", "深圳市", "广州市", "杭州市", "成都市", "南京市", "武汉市", "西安市"))t GROUP BY t.city
""".stripMargin)
// printToConsole(f3)
saveToMySQL(f,"total")
saveToMySQL(f2,"totalprovince")
saveToMySQL(f3,"totalcity")

	spark.streams.awaitAnyTermination()
}
def saveToMySQL(streamDF:DataFrame,reportType:String)={
	streamDF.writeStream
	.outputMode(OutputMode.Complete())
	.queryName(s&#34;${reportType}&#34;)
	.foreachBatch((batchDF:DataFrame,batchId:Long)=&gt;{
			batchDF.coalesce(1)
			.write.mode(SaveMode.Overwrite)
			.format(&#34;jdbc&#34;)
			.option(&#34;url&#34;,&#34;jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&amp;characterEncoding=utf8&amp;useUnicode=true&#34;)
			.option(&#34;driver&#34;,&#34;com.mysql.cj.jdbc.Driver&#34;)
			.option(&#34;user&#34;,&#34;root&#34;)
			.option(&#34;password&#34;,&#34;123456&#34;)
			.option(&#34;dbtable&#34;,s&#34;db_spark.tb_order${reportType}&#34;)
			.save()
		}
		)
		.option(&#34;checkpointLocation&#34;, s&#34;datas/spark/structured-ckpt-${System.currentTimeMillis()}&#34;)
		.start()
}

}

OrderRecord.scala

代码语言:javascript
复制
package cn.itcast.spark.mock

/**

  • 订单实体类(Case Class)
  • @param orderId 订单ID
  • @param userId 用户ID
  • @param orderTime 订单日期时间
  • @param ip 下单IP地址
  • @param orderMoney 订单金额
  • @param orderStatus 订单状态
    */
    case class OrderRecord(
    orderId: String,
    userId: String,
    orderTime: String,
    ip: String,
    orderMoney: Double,
    orderStatus: Int
    )

总结

总结:
​ 实时报表分析是近年来很多公司采用的报表统计方案之一,其中最主要的应用就是实时大屏展示利用流式计算实时得出结果直接被推送到前端应用,实时显示出重要指标的变换情况。

​ 最典型的案例便是淘宝双十一活动,每年双十一购物节,除疯狂购物外,最引人注目的就是双十一大屏不停跳跃的成交总额。在整个计算链路中包括从天猫交易下单购买到数据采集,数据计算,数据校验,最终落到双十一大屏上展示的全链路时间压缩在5秒以内,顶峰计算性能高达数三十万笔订单/秒,通过多条链路流计算备份确保万无一失。

在这里插入图片描述
代码语言:javascript
复制
	这次的双十一实时报表分析实战主要用SQL编写,尚未用DSL编写,这是有待完善的地方.