客快物流大数据项目(五十七):创建Kudu-ETL流式计算程序

创建Kudu-ETL流式计算程序

实现步骤:

  • realtime目录创建 KuduStreamApp 单例对象,继承自 StreamApp 特质
  • 重写特质内的方法
  • 编写代码接入kafka集群消费其数据
代码语言:javascript
复制
package cn.it.logistics.etl.realtime
import cn.itcast.logistics.common.{Configuration, SparkUtils}
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}

/**

  • Kudu数据管道应用
  • 实现KUDU数据库的实时ETL操作
    */
    object KuduStreamApp extends StreamApp {

/**

  • 入口方法
  • @param args
    */
    def main(args: Array[String]): Unit = {
    //创建sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
    SparkUtils.sparkConf(this.getClass.getSimpleName)
    )
//数据处理
execute(sparkConf)

}

/**

  • 数据的处理
  • @param sparkConf
    /
    override def execute(sparkConf: SparkConf): Unit = {
    /
    *
    • 实现步骤:
    • 1)创建sparksession对象
    • 2)获取数据源(获取物流相关数据以及crm相关数据)
    • 3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
    • 4)抽取每条数据的字段信息
    • 5)将过滤出来的每张表写入到kudu数据库
      */
      //1)创建sparksession对象
      val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
      sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//2)获取数据源(获取物流相关数据以及crm相关数据)
//2.1:获取物流系统相关的数据
val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)

//2.2:获取客户关系系统相关的数据
val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)

// 设置Streaming应用输出及启动
logisticsDF.writeStream.outputMode(OutputMode.Update())
  .format("console").queryName("logistics").start()

crmDF.writeStream.outputMode(OutputMode.Update())
  .format("console").queryName("crm").start()

//8)启动运行等待停止
val stream = sparkSession.streams
//stream.active:获取当前活动流式查询的列表
stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
//线程阻塞,等待终止
stream.awaitAnyTermination()

}

/**

  • 数据的保存
  • @param dataFrame
  • @param tableName
  • @param isAutoCreateTable
    */
    override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
    }
    }