Spark Streaming 整合 Flume & Kafka

  • 大致流程
  • 使用代码生成 Log4j 日志文件
  • Flume 配置文件

大致流程

大致流程

使用代码生成 Log4j 日志文件

在 log4j 的配置文件中指定将日志文件发送到 flume

Java 代码

1
2
3
4
5
6
7
8
9
10
11
12
public class LoggerGenerator {

private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());

public static void main(String[] args) throws InterruptedException {
int index = 0;
while (true) {
Thread.sleep(1000);
logger.info("value : " + index++);
}
}
}

log4j.properties

1
2
3
4
5
6
7
8
9
10
11
log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.10.114
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

Flume 配置文件

streaming.conf(用来测试 log4j -> flume)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=hadoop000
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.log-sink.type=logger

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

streaming2.conf(log4j -> flume -> kafka)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=hadoop000
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = streamingtopic_cty
agent1.sinks.kafka-sink.brokerList = hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks = 1
# 到达 20 条数据才 sink
agent1.sinks.kafka-sink.batchSize = 20

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

测试代码

KafkaStreamingApp.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/** 输入参数 192.168.10.114:9092 streamingtopic_cty */
object KafkaStreamingApp {

def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("Usage: KafkaStreamingApp <brokers> <topics>")
System.exit(1)
}

val Array(brokers, topics) = args

val sparkConf = new SparkConf().setAppName("KafkaStreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("OFF")

val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

messages.map(_._2).count().print()

ssc.start()
ssc.awaitTermination()
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!