您现在的位置是: 首页 > Spark Streaming整合Flume、Kafka案例实时数据处理


Spark Streaming整合Flume、Kafka案例实时数据处理

纯真年代 2018-10-30 11:25:01 0 375人围观

该文实例代码我的码云直达车

案例流程&图如下

日志产生
FLume日志收集
Kafka消息系统
Spark Streaming数据处理(消费者)
结果入库
UI展示

下面的hadoop000是我centos的hostname,用localhost都是可以的

日志产生

这里我们通过shell脚本产生简单的userAgent日志:

用户id | ip | 下单时间 | 商品价格 | 数量 | 所属公司

4adfc750-f6f9-4a42-861c-424a8794feda	73.112.104.164	2018-11-07 22:20:41	1799	2	1006

脚本(userOrderLogUtils.sh)

#!/bin/bash

num=10
if [ $1 ];then
num=$1
echo $num
companyIds=(1001 1002 1003 1004 1005 1006 1007 1008 1009 1010)
ips=(223 104 25 123 164 28 99 73 194 227 238 48 88 92 112 152 187 7 29 100)
prices=(799 899 999 1099 1299 1499 1599 1799 2499 2699 2999 3199 3299 3999 6499)
nums=(1 2 3)

function getOneLog(){
uuid=$(cat /proc/sys/kernel/random/uuid)
ip="${ips[`expr $RANDOM % 20`]}.${ips[`expr $RANDOM % 20`]}.${ips[`expr $RANDOM % 20`]}.${ips[`expr $RANDOM % 20`]}"
time="$(date "+%Y-%m-%d %H:%M:%S")"
price="${prices[`expr $RANDOM % 15`]}"
number="${nums[`expr $RANDOM % 3`]}"
company="${companyIds[`expr $RANDOM % 10`]}"
echo -e "${uuid}\t${ip}\t${time}\t${price}\t${number}\t${company}" >> /root/data/access_flume_kafka_streaming.log
}
#循环
int=1
while(( $int<=${num} ))
do
getOneLog
let "int++"
done


else
echo "please input logNum"
fi


shell教程直达车

运行命令

./userAgentLogUtils.sh 10

将产生10条日志

crontab定时日志产生

crontab -e,添加

*/1 * * * * /root/shell/userOrderLogUtils.sh 100


crontab -l,查看定时任务


Flume日志收集

Agent选型:exec source + memory-channel + kafka sink,Flume学习笔记直达车

这里我们将日志sink到控制台(可以略过), 可以先测试一波 

cd $FLUME_HOME/conf

vim accessLog-memory-avro.conf

accessLog-memory-avro.sources  =  access-source 
accessLog-memory-avro.channels = memory-channel
accessLog-memory-avro.sinks = flume-sink

accessLog-memory-avro.sources.access-source.type = exec
accessLog-memory-avro.sources.access-source.command = tail -F -n 0 /root/data/access_flume_kafka_streaming.log
accessLog-memory-avro.sources.access-source.shell = /bin/sh -c

accessLog-memory-avro.channels.memory-channel.type = memory

accessLog-memory-avro.sinks.flume-sink.type = logger

accessLog-memory-avro.sources.access-source.channels = memory-channel
accessLog-memory-avro.sinks.flume-sink.channel = memory-channel

启动Flume

flume-ng agent \
--name accessLog-memory-avro \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/accessLog-memory-avro.conf \
-Dflume.root.logger=INFO,console

运行日志shell脚本,注意日志路径

./userAgentLogUtils.sh 10

可以看到Flume窗口是有10条记录的,这里就不截图了

注意:exec source这种方式官方有提到可能会数据丢失的(这里我们测试小量数据没啥影响),直达车,请考虑Spooling Directory Source,Taildir Source或通过SDK直接与Flume集成

Flume整合Kafka

我们只需要修改上面conf的sink就ok了

脚本(userAgentLogUtils.sh)

accessLog-memory-avro.sources  =  access-source 
accessLog-memory-avro.channels = memory-channel
accessLog-memory-avro.sinks = kafka-sink

accessLog-memory-avro.sources.access-source.type = exec
accessLog-memory-avro.sources.access-source.command = tail -F -n 0 /root/data/access_flume_kafka_streaming.log
accessLog-memory-avro.sources.access-source.shell = /bin/sh -c

accessLog-memory-avro.channels.memory-channel.type = memory

accessLog-memory-avro.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
accessLog-memory-avro.sinks.kafka-sink.kafka.bootstrap.servers = hadoop000:9092
accessLog-memory-avro.sinks.kafka-sink.kafka.topic = access_flume_kafka_streaming
accessLog-memory-avro.sinks.kafka-sink.kafka.producer.acks = 1
accessLog-memory-avro.sinks.kafka-sink.flumeBatchSize = 100

accessLog-memory-avro.sources.access-source.channels = memory-channel
accessLog-memory-avro.sinks.kafka-sink.channel = memory-channel

启动Flume

flume-ng agent \
--name accessLog-memory-avro \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/accessLog-memory-avro.conf \
-Dflume.root.logger=INFO,console

启动Zookeeper,kafka需要(你得安装zookeeper,安装很简单,直达车)

./zkServer.sh start

启动Kafka,这里是单节点,Kafka学习笔记直达车

kafka-server-start.sh $KAFKA_HOME/config/server.properties

启动kafka之后,我们为该案例创建一个topic:access_flume_kafka_streaming

kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic access_flume_kafka_streaming

查看所有topic

kafka-topics.sh --list --zookeeper localhost:2181

启动Console Consumer

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic access_flume_kafka_streaming

运行./userAgentLogUtils.sh 10,查看Console Consumer控制台是否消费了10条数据

如图


我们可以看到已经消费了

Spark Streaming整合Kafka

官网直达车

代码

object FlumeKafkaStreamingUserAgentApp {
def main(args: Array[String]): Unit = {

if (args.length != 3) {
System.err.println("Usage: KafkaStreamingWordCountTest <bootstrapServers> <groupId> <topics>")
System.exit(1)
}

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumeKafkaStreamingUserAgentApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")

val Array(bootstrapServers, groupId, topics) = args

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> bootstrapServers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val message = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics.split(",").toArray, kafkaParams)
)

//TODO:业务逻辑
message.map(record => record.value()).print()

ssc.start()
ssc.awaitTermination()
}
}

结果



Spark Streaming业务处理

直达车


结果入库

可以入库到Mysql,Nosql(redis、hbase),下面入库的是本地mysql数据库统计信息(100000条消费数据),调度和处理时间都很短。由于全是在一台虚拟机里面跑,每秒的数据产生不是很快。中间有波动在我虚拟机中也正常


ps:我试了将数据入库到远程mysql数据库,结果总延迟都在3.4秒,主要是远程服务器各方面限制,平时查询一个sql都要0.67秒,

https://www.cnblogs.com/xlturing/p/6246538.html#spark-streamingkafka%E8%B0%83%E4%BC%98


UI展示

可以使用datav获取数据来实时展示信息,http://datav.aliyun.com/share/a7b8bfa0cd29864dd55c00b8eb62e65c



全部评论