Spark Streaming 整合 Kafka
  • 使用版本
  • Receiver-based
  • Direct Approach (No Receivers) 推荐
  • 指定偏移量读取 Kafka 信息

使用版本

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<!-- 个人感觉 10 版本比较好用 但是下面的 API 就不一样了 -->
<!-- 官网上有详细文档,为了保证工程一致性,下面的 API使用 8 版本 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency

Receiver-based

这种方法使用接收器来接收数据。接收器是使用 Kafka 高级消费者 API 实现的;与所有接收器一样,通过接收器从 Kafka 接收的数据存储在 Spark 执行器中,然后由 Spark Streaming 启动的作业处理数据

操作步骤

  1. 启动 ZK
  2. 启动 Kafka
  3. 创建 topic(kafka_streaming_topic_cty)

代码示例

/**
* @Author Chentyit
* @Date 2019/10/2 18:21
* @Description: SparkStreaming 对接 Kafka —— Receiver
*/
object KafkaReceiverWordCount {

def main(args: Array[String]): Unit = {

if (args.length != 4) {
System.err.println("KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}

val Array(zkQuorum, group, topics, numThreads) = args

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("OFF")

val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
messages map(_._2) flatMap(_.split(" ")) map((_, 1)) reduceByKey(_ + _) print()

ssc.start()
ssc.awaitTermination()
}
}

提交集群运行

spark-submit \
--class com.imooc.spark.KafkaReceiverWordCount \
--master local[2] \
--name KafkaReceiverWordCount \
# 这个必须得加,在生产环境中需要用 maven 下载好 jar 包直接添加到 kafka 的 lib 里面
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/home/hadoop/lib/sparktrain-1.0.jar hadoop000:2181 test kafka_streaming_topic 1

Direct Approach (No Receivers) 推荐

与基于接收器的方法(即方法1)相比,该方法具有以下优点:(官网翻译)

  • 简化的并行性:无需创建多个输入Kafka流并将它们合并。使用 directStream,Spark Streaming 将创建与要使用的Kafka分区一样多的 RDD 分区,所有这些分区都将从 Kafka 并行读取数据。因此,Kafka 和 RDD 分区之间存在一对一的映射,这更易于理解和调整
  • 效率:在第一种方法中,要实现零数据丢失,需要将数据存储在预写日志中,从而进一步复制数据。这实际上是低效的,因为数据被有效地复制了两次-一次是通过 Kafka 复制,另一次是通过 “预写日志” 复制。第二种方法消除了该问题,因为没有接收器,因此不需要预写日志。只要您有足够的 Kafka 保留时间,就可以从 Kafka 中恢复信息
  • 只执行一次精确的语义:第一种方法使用Kafka的高级 API 将偏移量存储在 Zookeeper 中。传统上,这是从 Kafka 消费数据的方式。尽管这种方法(与预写日志结合使用)可以确保零数据丢失(即至少一次语义),但在某些故障下某些记录可能会被消耗两次的可能性很小。发生这种情况是由于 Spark Streaming 可靠接收的数据与 Zookeeper 跟踪的偏移量之间存在不一致。因此,在第二种方法中,我们使用不使用 Zookeeper 的简单 Kafka API。Spark Streaming 在其检查点内跟踪偏移。这样可以消除 Spark Streaming 与 Zookeeper / Kafka 之间的不一致,因此即使出现故障,Spark Streaming 也会有效地一次接收每条记录。为了获得结果输出的一次语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务(请参见主程序中的输出操作的语义)有关更多信息的指南)

代码示例

/**
* @Author Chentyit
* @Date 2019/10/2 19:12
* @Description: SparkStreaming 对接 Kafka —— Direct
*/
object KafkaDirectWordCount {

def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("Usage: KafkaDirectWordCount <brokers> <topics>")
System.exit(1)
}

val Array(brokers, topics) = args

val sparkConf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("OFF")

val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

ssc.start()
ssc.awaitTermination()
}
}

指定偏移量读取 Kafka 信息

代码示例

object Offset02App extends App {

val kafkaParams = Map[String, String](
"metadata.broker.list" -> "192.168.1.8:9092",
"auto.offset.reset" -> "smallest"
)
val topics = "imooc_cty_offset".split(", ").toSet

val checkpointDirectory = "E:\\test\\ck_point"

def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.sparkContext.setLogLevel("OFF")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
/**
* set checkpoint directory
* 将偏移量存储到外部介质中
*/
ssc.checkpoint(checkpointDirectory)
messages.checkpoint(Duration(10 * 1000))

messages.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
println("慕课 CTY: " + rdd.count())
}
})

ssc
}

val sparkConf = new SparkConf().setAppName("Offset01App").setMaster("local[2]")
val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

ssc.start()
ssc.awaitTermination()
}
文章作者: Chentyit
文章链接: http://www.chentyit.cn/2019/10/02/Spark-Streaming-%E6%95%B4%E5%90%88-Kafka/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Hexo