ELK流量分析

发布 : 2017-12-21 分类 : ELK 浏览 :
1
服务器环境:Centos7

ELK流量分析流程

1
2
3
4
5
6
7
1.通过hive对昨日的流量日志数据,进行离线批处理,按维度将一些指标预先聚合出来,将结果写入mysql,默认有一些预先处理好的数据已经存在mysql

2.手动准备一些样例数据,然后写入mysql中,装一个mysql,模拟成是hive导入mysql的一份数据

3.通过logstash,将mysql中的数据导入es中

4.通过kibana+各种es聚合语法,生成各种各样的报表出来

All text

安装Logstash

创建目录

1
2
[root@elasticsearch01 ~]# mkdir -p /data/server
[root@elasticsearch01 ~]# chown -R elasticsearch:elasticsearch /data/server/

解压压缩文件

1
2
3
tar -zxvf logstash-5.6.3.tar.gz -C /data/server/
mv logstash-5.6.3/ logstash
cd logstash

logstash pipeline

1
两个组成部分:input和output,也可以包含一个可选的filter
1
2
3
4
5
input plugin负责从数据源中获取数据

filter plugin负责对数据进行定制化的处理和修改

output plugin负责将数据写入目的地中

运行最基础的logstash pipeline

1
./bin/logstash -e 'input { stdin { } } output { stdout {} }'

All text

-e:直接在命令行对logstash进行配置,从命令行接受输入,将输出写入命令行

输入:hello world,可以看到输出,logstash会补充timestamp和ip地址

All text

用ctrl-d可以结束这个piepline

安装Mysql

1
2
3
wget http://dev.mysql.com/get/mysql-community-release-el7-5.noarch.rpm
rpm -ivh mysql-community-release-el7-5.noarch.rpm
yum install -y mysql-community-server

重启mysql服务

1
service mysqld restart

All text

设置mysql密码

1
2
3
mysql -u root 
mysql> set password for 'root'@'localhost' =password('123456');
mysql> quit

登录mysql

1
mysql -u root -p123456

创建数据库

1
mysql> create database patpat_demo;

使用数据库

1
use patpat_demo;
1
2
datekey cookie section userid province city pv is_return_visit is_bounce_visit visit_time visit_page_cnt
日期 cookie 版块 用户id 省份 城市 pv 是否老用户回访 是否跳出 访问时间 访问页面数量

创建表

1
2
3
4
5
6
7
8
9
10
11
12
13
create table user_access_log_aggr (
datekey varchar(255),
cookie varchar(255),
section varchar(255),
userid int,
province varchar(255),
city varchar(255),
pv int,
is_return_visit int,
is_bounce_visit int,
visit_time int,
visit_page_cnt int
);

插入数据

1
2
3
4
5
6
7
8
9
10
insert into user_access_log_aggr values('20171001', 'dasjfkaksdfj33', 'game', 1, 'beijing', 'beijing', 10, 0, 1, 600000, 3);
insert into user_access_log_aggr values('20171001', 'dasjadfssdfj33', 'game', 2, 'jiangsu', 'nanjing', 5, 0, 0, 700000, 5);
insert into user_access_log_aggr values('20171001', 'dasjffffksfj33', 'sport', 1, 'beijing', 'beijing', 8, 1, 0, 800000, 6);
insert into user_access_log_aggr values('20171001', 'dasjdddksdfj33', 'sport', 2, 'jiangsu', 'nanjing', 20, 0, 1, 900000, 7);
insert into user_access_log_aggr values('20171001', 'dasjeeeksdfj33', 'sport', 3, 'jiangsu', 'nanjing', 30, 1, 0, 600000, 10);
insert into user_access_log_aggr values('20171001', 'dasrrrrksdfj33', 'news', 3, 'jiangsu', 'nanjing', 40, 0, 0, 600000, 12);
insert into user_access_log_aggr values('20171001', 'dasjtttttdfj33', 'news', 4, 'shenzhen', 'shenzhen', 50, 0, 1, 500000, 4);
insert into user_access_log_aggr values('20171001', 'dasjfkakkkfj33', 'game', 4, 'shenzhen', 'shenzhen', 20, 1, 0, 400000, 3);
insert into user_access_log_aggr values('20171001', 'dasjyyyysdfj33', 'sport', 5, 'guangdong', 'guangzhou', 10, 0, 0, 300000, 1);
insert into user_access_log_aggr values('20171001', 'dasjqqqksdfj33', 'news', 5, 'guangdong', 'guangzhou', 9, 0, 1, 200000, 2);

安装ElasticSearch

解压压缩文件

1
tar -zxvf elasticsearch-5.6.3.tar.gz -C /data/server/
1
2
[root@elasticsearch01 server]# mv elasticsearch-5.6.3/ elasticsearch
[root@elasticsearch01 server]# chown -R elasticsearch:elasticsearch /data/server/elasticsearch

修改es配置文件

1
cd /data/server/elasticsearch

编辑<elasticsearch根目录>/config/jvm.options配置文件

1
[elasticsearch@elasticsearch01 elasticsearch]$ vi config/jvm.options
1
2
-Xms512m
-Xmx512m

编辑<elasticsearch根目录>/config/elasticsearch.yml配置文件

1
[elasticsearch@elasticsearch01 elasticsearch]$ vi config/elasticsearch.yml
1
2
3
4
path.data: /data/server/elasticsearch/data
path.logs: /data/server/elasticsearch/logs
# 设置回环地址
network.host: 0.0.0.0

启动es

1
cd /data/server/elasticsearch
1
./bin/elasticsearch -d

All text

在浏览器地址栏查看:http://192.168.31.180:9200/

All text

1
[elasticsearch@elasticsearch01 ~]$ curl -XGET localhost:9200

All text

查看日志

1
cd /data/server/elasticsearch
1
tail -f elasticsearch.log

All text

安装logstash-input-jdbc插件

1
yum install -y gem
1
2
gem sources --add https://ruby.taobao.org/ --remove https://rubygems.org/
gem sources -l

All text

1
gem install bundler

All text

1
bundle config mirror.https://rubygems.org https://ruby.taobao.org

All text

在logstash目录下,编辑Gemfile文件

1
在logstash目录下,vi Gemfile,修改source的值为: "https://ruby.taobao.org"

All text

1
[elasticsearch@elasticsearch01 logstash]$ vi Gemfile

安装Logstash插件

1
[elasticsearch@elasticsearch01 logstash]$ ./bin/logstash-plugin install logstash-input-jdbc

All text

1
2
wget https://github.com/logstash-plugins/logstash-input-jdbc/archive/v4.2.4.zip
unzip v4.2.4.zip

All text

1
cd logstash-input-jdbc-4.2.4

修改source的值

修改source的值为: “https://ruby.taobao.org"

1
vi Gemfile

All text

1
vi logstash-input-jdbc.gemspec
1
2
3
4
找到 
s.files = `git ls-files`.split($\)
改为:
s.files = [".gitignore", "CHANGELOG.md", "Gemfile", "LICENSE", "NOTICE.TXT", "README.md", "Rakefile", "lib/logstash/inputs/jdbc.rb", "lib/logstash/plugin_mixins/jdbc.rb", "logstash-input-jdbc.gemspec", "spec/inputs/jdbc_spec.rb"]

All text

1
gem build logstash-input-jdbc.gemspec

All text

1
mv logstash-input-jdbc-4.2.4.gem /data/server/logstash/
1
2
[elasticsearch@elasticsearch01 logstash-input-jdbc-4.2.4]$ cd /data/server/logstash
[elasticsearch@elasticsearch01 logstash]$ ./bin/logstash-plugin install logstash-input-jdbc-4.2.4.gem

All text

启动logstash

1
在logstash目录中创建一份配置pipeline配置文件
1
vi conf/user-access-log-pipeline.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
input {
jdbc {
jdbc_driver_library => "/data/server/mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/patpat_demo"
jdbc_user => "root"
jdbc_password => "123456"
schedule => "* * * * *"
statement => "SELECT * from user_access_log_aggr"
}
}

output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}

All text

检查配置文件语法是否正确

1
[elasticsearch@elasticsearch01 logstash]$ ./bin/logstash -f conf/user-access-log-pipeline.conf --config.test_and_exit

All text

启动logstash

1
--config.reload.automatic,会自动重新加载配置文件的内容
1
./bin/logstash -f conf/user-access-log-pipeline.conf --config.reload.automatic

All text

All text

查看ES索引

1
[elasticsearch@elasticsearch01 ~]$ curl -XGET localhost:9200/_cat/indices

All text

查看es健康状态

1
[elasticsearch@elasticsearch01 ~]$ curl -XGET localhost:9200/_cat/health

All text

1
[elasticsearch@elasticsearch01 ~]$ curl -XGET localhost:9200/logstash-2017.12.21

All text

1
[elasticsearch@elasticsearch01 ~]$ curl -XGET localhost:9200/logstash-2017.12.21/_search?pretty

All text

All text

安装和部署kibana

kibana解压缩

1
tar -zxvf kibana-5.6.3-linux-x86_64.tar.gz -C /data/server/

目录重命名

1
[root@elasticsearch01 server]# mv kibana-5.6.3-linux-x86_64/ kibana

配置kibana

1
[root@elasticsearch01 kibana]# vi config/kibana.yml
1
2
3
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.url: "http://localhost:9200"

运行kibana

1
[root@elasticsearch01 kibana]# ./bin/kibana

All text

访问kibana web管理工作台

1
访问浏览器地址栏:http://192.168.31.180:5601/

All text

设置index pattern

在kibana配置一个index pattern来匹配es中的索引名称,默认是logstash-*,匹配logstash写入es中的数据

同时还要配置一个time-field name,那个field是timestamp类型的,这是给kibana用来按照时间进行过滤的,kibana会自动加载出来给我们选择

All text

基于kibana制作流量分析报表

对指定的版块进行查询,然后统计出如下指标的汇总

1
2
3
4
5
pv: 所有人的pv相加
uv: 对userid进行去重
return_visit_uv: 回访uv
total_visit_time: 总访问时长
bounce_visit_uv: 跳出次数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
curl -XGET 'http://localhost:9200/logstash-2017.12.21/logs/_search?q=section:news&pretty' -d '
{
"size": 0,
"aggs": {
"pv": {"sum": {"field": "pv"}},
"uv": {"cardinality": {"field": "userid", "precision_threshold": 40000}},
"total_visit_time": {"sum": {"field": "visit_time"}},
"return_visit_uv": {
"filter": {"term": {"is_return_visit": 1}},
"aggs": {
"total_return_visit_uv": {"cardinality": {"field": "userid", "precision_threshold": 40000}}
}
},
"bounce_visit_uv": {
"filter": {"term": {"is_bounce_visit": 1}},
"aggs": {
"total_bounce_visit_uv": {"cardinality": {"field": "userid", "precision_threshold": 40000}}
}
}
}
}'

All text

All text

Logstash处理数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
input段:
 file:使用file 作为输入源
  path: 日志的路径,支持/var/log*.log,及[ "/var/log/messages", "/var/log/*.log" ] 格式
  start_position: 从文件的开始读取事件。另外还有end参数
  ignore_older: 忽略早于24小时(默认值86400)的日志,设为0,即关闭该功能,以防止文件中的事件由于是早期的被logstash所忽略。

filter段:
 grok:数据结构化转换工具
  match:匹配条件格式,将nginx日志作为message变量,并应用grok条件NGINXACCESS进行转换
 geoip:该过滤器从geoip中匹配ip字段,显示该ip的地理位置
  source:ip来源字段,这里我们选择的是日志文件中的最后一个字段,如果你的是默认的nginx日志,选择第一个字段即可(注:这里写的字段是/opt/logstash/patterns/nginx 里面定义转换后的)
  target:指定插入的logstash字断目标存储为geoip
  database:geoip数据库的存放路径
  add_field: 增加的字段,坐标经度
  add_field: 增加的字段,坐标纬度
 mutate: 数据的修改、删除、类型转换
  convert: 将坐标转为float类型
  convert: http的响应代码字段转换成 int
  convert: http的传输字节转换成int
  replace: 替换一个字段
  remove_field: 移除message 的内容,因为数据已经过滤了一份,这里不必在用到该字段了。不然会相当于存两份
 date: 时间处理,该插件很实用,主要是用你日志文件中事件的事件来对timestamp进行转换,导入老的数据必备!在这里曾让我困惑了很久哦。别再掉坑了
  match:匹配到timestamp字段后,修改格式为dd/MMM/yyyy:HH:mm:ss Z
 mutate:数据修改
  remove_field: 移除timestamp字段。

output段:
 elasticsearch:输出到es中
  host: es的主机ip+端口或者es 的FQDN+端口
  index: 为日志创建索引logstash-nginx-access-*,这里也就是kibana那里添加索引时的名称

查看Logstash插件

1
[elasticsearch@elasticsearch01 logstash]$ bin/logstash-plugin list

All text

安装Logstash插件

grok:数据结构化转换工具

match:匹配条件格式,将nginx日志作为message变量,并应用grok条件NGINXACCESS进行转换

1
[elasticsearch@elasticsearch01 logstash]$ ./bin/logstash-plugin install logstash-filter-grok

All text

geoip:该过滤器从geoip中匹配ip字段,显示该ip的地理位置

1
[elasticsearch@elasticsearch01 logstash]$ ./bin/logstash-plugin install logstash-filter-geoip

All text

Filebeat采集数据传送给Logstash

All text

FileBeat的四种输出方式可以输出到

1
Elasticsearch,logstash,file和console

解压压缩文件

1
tar -zxvf filebeat-5.6.3-linux-x86_64.tar.gz -C /data/server/

目录重命名

1
[elasticsearch@elasticsearch01 server]$ mv filebeat-5.6.3-linux-x86_64/ filebeat

进入目录

1
[elasticsearch@elasticsearch01 server]$ cd filebeat/
1
2
[elasticsearch@elasticsearch01 filebeat]$ mkdir conf
[elasticsearch@elasticsearch01 filebeat]$ mkdir data

编辑配置文件

1
[elasticsearch@elasticsearch01 filebeat]$ vim conf/filebeat-to-logstash.yml
1
2
3
4
5
6
7
8
filebeat.prospectors:
- input_type: log
paths: /data/server/logs/access.log
document_type: "nginx_access"
registry_file: /data/server/filebeat/data/registry

output.logstash:
hosts: ["192.168.31.180:5044"]

授权

1
chmod go-w /data/server/filebeat/conf/filebeat-to-logstash.yml

启动

1
[elasticsearch@elasticsearch01 filebeat]$ ./filebeat -e -c conf/filebeat-to-logstash.yml

All text

Logstash获取filebeat传送来的数据并输出到控制台

1
vim conf/filebeat-logstash.conf
1
2
3
4
5
6
7
8
9
10
11
input {
beats {
port => 5044
}
}

output {
stdout {
codec => rubydebug # 直接将数据输出到控制台
}
}

启动Logstash

1
./bin/logstash -f conf/filebeat-logstash.conf

All text

Logstash获取filebeat传送来的数据并转发到kafka

1
vim conf/logstash-kafka.conf
1
2
3
4
5
6
7
8
9
10
11
12
input {
beats {
port => 5044
}
}

output {
kafka {
bootstrap_servers => "node01:9092,node02:9092,node03:9092"
topic_id => "nginx-acess-log-%{+YYYY.MM.dd}"
}
}

启动Logstash

1
./bin/logstash -f conf/logstash-kafka.conf

Logstash采集文件数据并将结果输出到控制台

1
[elasticsearch@elasticsearch01 logstash]$ vim conf/first-pipeline.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
input {
file {
path => "/data/server/logs/access.log"
start_position => beginning
ignore_older => 0
}
}

filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}

output {
stdout {
codec => rubydebug # 直接将数据输出到控制台
}
}

启动logstash

1
[elasticsearch@elasticsearch01 logstash]$ ./bin/logstash -f conf/first-pipeline.conf

All text

Logstash采集文件数据并将结果输出到ES

1
[elasticsearch@elasticsearch01 logstash]$ vim conf/first-pipeline-to-es.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
input {
file {
path => "/data/server/logs/access.log"
start_position => beginning
}
}

filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}

output {
elasticsearch {
hosts => [ "192.168.31.180:9200" ]
index => "logstash-nginx-access-%{+YYYY.MM.dd}"
}
stdout {codec => rubydebug}
}

启动logstash

1
[elasticsearch@elasticsearch01 logstash]$ ./bin/logstash -f conf/first-pipeline-to-es.conf

All text

查看ES索引

1
[elasticsearch@elasticsearch01 logs]$ curl -XGET localhost:9200/_cat/indices?v

All text

1
[elasticsearch@elasticsearch01 logs]$ curl -XGET localhost:9200/logstash-nginx-access-2017.12.24/_search?pretty

启动Kibana

1
[elasticsearch@elasticsearch01 kibana]$ ./bin/kibana

在浏览器地址栏访问:http://192.168.31.180:5601/

All text

本文作者 : Matrix
原文链接 : https://matrixsparse.github.io/2017/12/21/ELK流量分析/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹