Spark Streaming 整合 Flume
  • Flume-style Push-based Approach
  • Pull-based Approach using a Custom Sink(推荐使用)

Flume-style Push-based Approach

基于Flume的推送的方法

(管网翻译)

Flume 旨在在 Flume 代理之间推送数据。在这种方法中,Spark Streaming 本质上设置了一个接收器,该接收器充当 Flume 的 Avro 代理,Flume 可以将数据推送到该接收器。

  • 启动 Flume + Spark Streaming 应用程序时,其中一个 Spark 辅助程序必须在该计算机上运行
  • 可以将 Flume 配置为将数据推送到该计算机上的端口

Flume Agent 的编写:

flume_push_stream.conf

simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = hadoop000
simple-agent.sources.netcat-source.port = 44444

simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname = hadoop000
simple-agent.sinks.avro-sink.port = 41414
simple-agent.sinks.avro-sink.connect-timeout = 30000

simple-agent.channels.memory-channel.type = memory

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel

Spark Streaming 代码

/**
* @Author Chentyit
* @Date 2019/10/2 10:18
* @Description: Spark Streaming 整合 Flume —— Push
*/
object FlumePushWordCount {

def main(args: Array[String]): Unit = {

// 提交到集群运行时需要判断参数
if (args.length != 2) {
System.err.println("Usage: FlumePushWordCount <hostname> <port>")
System.exit(1)
}

val Array(hostname, port) = args

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("OFF")

// 这里的 IP 地址是 Flume 机器的地址
// 将参数传递到这来
val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)
flumeStream.map(x => new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

ssc.start()
ssc.awaitTermination()
}
}

提交集群运行方式(也可以在本地运行调试再打包到集群中,注意下文中的踩坑

  1. Spark 的 jar 包运行

    spark-submit \
    --class com.imooc.spark.FlumePushWordCount \
    --master local[2] \
    --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
    /home/hadoop/lib/sparktrain-1.0.jar \
    hadoop000 41414
  2. 启动 Flume

    flume-ng agent \
    --name simple-agent \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/conf/flume_push_stream.conf \
    -Dflume.root.logger=INFO,console

踩坑

  1. sink 的 IP 地址应该是 SparkStreaming 程序运行的那台机器的 IP 地址,如果这台机器在集群上,就要把程序打包上传集群运行(注:flume 所在的机器必须和程序运行的机器能 ping 通)
  2. 先启动 SparkStreaming 程序,再启动 flume

Pull-based Approach using a Custom Sink(推荐使用)

基于 Pull 的方法自定义接收器

这种方法不是运行 Flume 将数据直接推送到 Spark Streaming,而是运行自定义的 Flume 接收器

  • Flume 将数据推入接收器,并且数据保持缓冲状态
  • Spark Streaming 使用可靠的 Flume 接收器和事务从接收器中提取数据。只有在 Spark Streaming 接收并复制了数据之后,事务才能成功

与以前的方法相比,这确保了更强的可靠性和容错保证。但是,这需要将Flume配置为运行自定义接收器

Flume Agent 的编写:

flume_pull_streaming.conf

simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = hadoop000
simple-agent.sources.netcat-source.port = 44444

simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = hadoop000
simple-agent.sinks.spark-sink.port = 41414

simple-agent.channels.memory-channel.type = memory

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel
flume-ng agent  \
--name simple-agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume_pull_streaming.conf \
-Dflume.root.logger=INFO,console

Spark Streaming 代码

/**
* @Author Chentyit
* @Date 2019/10/2 12:10
* @Description: Spark Streaming 整合 Flume - Pull
*/
object FlumePullWordCount {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("OFF")

// 这里的 IP 地址是 Flume 机器的地址
val flumeStream = FlumeUtils.createPollingStream(ssc, "192.168.10.120", 41414)
flumeStream.map(x => new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

ssc.start()
ssc.awaitTermination()
}
}

踩坑

  1. 先启动 FLume,再启动 SparkStreaming 程序
文章作者: Chentyit
文章链接: http://www.chentyit.cn/2019/10/02/Spark-Streaming-%E6%95%B4%E5%90%88-Flume/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Hexo