SparkStreaming 进阶
  • 带状态的算子(UpdateStateByKey)
  • 基于 window 的统计
  • 实例测试

带状态的算子(UpdateStateByKey)

updateStateByKey 操作使您可以保持任意状态,同时用新信息连续更新它。要使用此功能,将必须执行两个步骤:(官网内容)

  1. 定义状态:状态可以是任意数据类型
  2. 定义状态更新功能:使用功能指定如何使用输入流中的先前状态和新值来更新状态

基于 window 的统计

Spark Streaming 还提供窗口化计算,使您可以在数据的滑动窗口上应用转换

window

如图所示,每当窗口在源 DStream 上滑动时,落入窗口内的源 RDD 就会合并并进行操作,以生成窗口 DStream 的 RDD

在这种特定情况下,该操作将应用于数据的最后 3 个时间单位,并以 2 个时间单位滑动

这表明任何窗口操作都需要指定两个参数

  • 窗口长度:窗口的持续时间
  • 滑动间隔:进行窗口操作的间隔

实例测试

1. 统计到目前为止累计出现的单词的个数(需要保持住以前的状态)

/**
* @Author Chentyit
* @Date 2019/9/28 18:34
* @Description: 使用 SparkStreaming 完成有状态统计
*/
object StatefulWordCount {

/**
* 把当前的数据去更新已有的或者是老的数据
* @param currentValues 当前的
* @param preValues 老的
* @return
*/
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("OFF")

// 如果是用了 stateful 的算子,必须要设置 checkpoint
// 在生产环境中,建议把 checkpoint 设置到 HDFS 的某个文件夹中
ssc.checkpoint(".")

val lines = ssc.socketTextStream("192.168.1.10", 6789)
val result = lines.flatMap(_.split(" ")).map((_,1))
val state = result.updateStateByKey[Int](updateFunction _)
state.print()

ssc.start()
ssc.awaitTermination()
}
}

2. 计算到目前为止累计出现的单词个数写入到 MySQL

  • 使用 Spark Streaming 进行统计分析
  • Spark Streaming 统计结果写入到 MySQL
/**
* @Author Chentyit
* @Date 2019/9/28 18:34
* @Description: 使用 SparkStreaming 完成有词频统计并将结果写入到 MySQL 数据库中
*/
object ForeachRDDApp {

/**
* 获取 MySQL 的连接
*
* @return
*/
def createConnection(): Connection = {
Class.forName("com.mysql.cj.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://localhost:3306/imoocbootscala?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai", "root", "Chentyit123456")
}

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("OFF")

// 如果是用了 stateful 的算子,必须要设置 checkpoint
// 在生产环境中,建议把 checkpoint 设置到 HDFS 的某个文件夹中
ssc.checkpoint(".")

val lines = ssc.socketTextStream("192.168.1.10", 6789)
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

result.print()

/**
* 将结果写入到 MySQL
* 这里的代码有两个问题:
* 1. 对于已有的数据不会更新(改进方法:使用 Hbase 或者 Redis,再或者去数据库中查询,如果存在就更新,不存在就添加)
* 2. 数据库连接没有使用连接池,会造成程序对资源的开销很大
*/
result.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createConnection()
partitionOfRecords.foreach(recode => {
val sql = "insert into wordcount(word, wordcount) values('" + recode._1 + "'," + recode._2 + ")"
connection.createStatement().execute(sql)
})
connection.close()
})
})

ssc.start()
ssc.awaitTermination()
}
}

3. 黑名单过滤

  • transform 算子的使用
  • Spark Streaming 整合 RDD 进行操作
/**
* @Author Chentyit
* @Date 2019/9/28 19:41
* @Description: 黑名单过滤
*/
object TransformApp {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("TransformApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("OFF")

/**
* 构建黑名单
*/
val blacks = List("zs", "ls")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))

val lines = ssc.socketTextStream("192.168.1.10", 6789)
lines.map(x => (x.split(",")(1), x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD).filter(!_._2._2.getOrElse(false)).map(x=>x._2._1)
}).print()

ssc.start()
ssc.awaitTermination()
}
}

4. Spring Streaming 整合 Spark SQL

/**
* @Author Chentyit
* @Date 2019/9/28 20:09
* @Description: Spark Streaming 整合 Spark SQL 完成词频统计操作
*/
object SqlNetworkWordCount {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SqlNetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("OFF")

val lines = ssc.socketTextStream("192.168.1.10", 6789)
val words = lines.flatMap(_.split(" "))

// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._

// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()

// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")

// Do word count on table using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
}

ssc.start()
ssc.awaitTermination()
}

/** Case class for converting RDD to DataFrame */
case class Record(word: String)

/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {

/**
* 这个注解一般用于序列化的时候,标识某个字段不用被序列化
*/
@transient private var instance: SparkSession = _

def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
}
文章作者: Chentyit
文章链接: http://www.chentyit.cn/2019/09/28/SparkStreaming-%E8%BF%9B%E9%98%B6/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Hexo