您现在的位置是: 首页 > 基于Spark SQL对本站nginx日志进行简单统计


基于Spark SQL对本站nginx日志进行简单统计

纯真年代 2018-10-06 20:52:07 0 883人围观

日志下载:access.log(截止时间201818-10-06)直达车

需求分析

    1)按照地区统计用户访问,以天进行分区统计


    2)访问平台


    3)统计浏览器


    4)我们也可以进行流量统计

日志分析

日志:150.138.216.76 - - [28/Sep/2018:17:18:46 +0800] "GET /lay/system/edit.js HTTP/1.1" 200 6590 "http://www.bblog.vip/article_detail/1536997960837" "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0"

我们可以拿到访问ip,访问url,UserAgent,流量等信息

我们可以使用ip解析出用户地区

我们可以使用UserAgent解析出用户访问其他信息,借助直达车

数据清洗

    1)我们清除一些非国内地区的ip

    2)清洗访问路径是js,css,txt,html,jpg,png,woff之类的,我直接过滤掉了非首页地址的数据

需求业务统计实现

    1)需求一上代码:直达车

统计结果入库(这里入库到Mysql)

    1)例子:

def insertAreaBatch(list: ListBuffer[DayAreaTimes]) = {

var connection: Connection = null
var pstmt: PreparedStatement = null
try {
connection = MysqlUtils.getConnetction()

val sql = "insert into day_area_times_stat(day,area,times) values (?,?,?) "
pstmt = connection.prepareStatement(sql)

//关闭自动提交
connection.setAutoCommit(false)
for (data <- list) {
pstmt.setString(1, data.day)
pstmt.setString(2, data.area)
pstmt.setLong(3, data.times)
pstmt.addBatch()
}
//执行批处理
pstmt.executeBatch()
connection.commit()
} catch {
case e: Exception => e.printStackTrace()
} finally {
MysqlUtils.release(connection, pstmt)
}
}


结果可视化展示

    1)echarts官网


数据清洗、统计运行在YARN上

1)数据清洗泡在YARN上

打包并上传(使用的这个插件)

<!-- mvn assembly:assembly -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>

将日志上传到hdfs(hadoop fs -put access.log /spark/access/input) 

将.csv,.xlsx上传到服务器 

命令运行(记得启动hadoop)YARN运行命令直达车

这个可以配置到系统,也可以临时使用
export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.9.3/etc/hadoop

./bin/spark-submit \
--class com.blog.spark.job.SparkStatCleanJobYARN \
--name SparkStatCleanJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--files /root/lib/files/ipDatabase.csv,/root/lib/files/ipRegion.xlsx \
/root/lib/spark-sql-1.0-jar-with-dependencies.jar \
hdfs://hadoop000:8020/spark/access/input/* hdfs://hadoop000:8020/spark/access/output

过程及结果查看



spark.read.format("json").load("hdfs://hadoop000:8020/spark/access/output/part-00000-6d55a973-cfa9-4fcf-afcd-6a1be74c089f-c000.json").show(false)


2)将统计任务跑在YARN上

打包并上传

命令运行

./bin/spark-submit \
--class com.blog.spark.job.StatJobYARN \
--name StatJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
/root/lib/spark-sql-1.0-jar-with-dependencies.jar \
hdfs://hadoop000:8020/spark/access/output 20181008 20181008

数据库查看结果

性能调优

1)存储格式的选择

spark-sql中我们可以通过format("parquet")控制读写的格式,json等

2)压缩格式的选择,可参考直达车

spark-sql中我们可以通过配置来设置,直达车

例如:sparkSession.config("spark.sql.parquet.compression.codec","gzip")

3)代码优化,比如批量插入数据库。复用已有的数据

4)参数优化

并行度:spark.sql.shuffle.partitions 默认为200 直达车
例如:--conf spark.sql.shuffle.partitions=100

关闭分区字段类型推测
例如:sparkSession.config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")


说明:整理于网络 
本文章作为个人笔记,也供大家监督 


上一篇: 本站静态资源

下一篇: Spark环境搭建上手

全部评论