6.8版本elk日志管理系统部署elasticsearch + kibana + logstash + filebeat + kafka_2.12 + x-pack破解 + 各组件间的ssl认证
Filebeat是一个日志文件托运工具,在你的服务器上安装客户端后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读)
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据
Logstash是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景
ElasticSearch它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口
Kibana是ElasticSearch的用户界面
6.8版本elk日志管理系统部署elasticsearch + kibana + logstash + filebeat + kafka_2.12 + x-pack破解 + 各组件间的ssl认证
目录:
1.组件介绍 2.架构图 3.软件下载地址 4.安装elasticsearch 5.安装kibana 6.启用安全认证 7.安装kafka 8.安装logstash 9.安装filebeat 10.破解x-pack 11.启动各组件间的ssl认证
1.组件介绍
Filebeat是一个日志文件托运工具,在你的服务器上安装客户端后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读)
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据
Logstash是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景
ElasticSearch它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口
Kibana是ElasticSearch的用户界面
在实际应用场景下,为了满足大数据实时检索的场景,利用Filebeat去监控日志文件,将Kafka作为Filebeat的输出端,Kafka实时接收到Filebeat后以Logstash作为输出端输出,到Logstash的数据也许还不是我们想要的格式化或者特定业务的数据,这时可以通过Logstash的一些过了插件对数据进行过滤最后达到想要的数据格式以ElasticSearch作为输出端输出,数据到ElasticSearch就可以进行丰富的分布式检索了
2.架构图
3.软件下载地址:https://www.elastic.co/cn/products/ 【建议装在/data/elk/目录下,因为使用长时间后会产生很大的数据,需要较大的磁盘支撑】
jdk安装:https://www.cnblogs.com/chenjw-note/p/10838160.html
注意:所有配置文件中配置项后面的注释内容都需要去掉
4.安装elasticsearch:
解压
修改配置文件:vim /usr/local/elasticsearch/config/elasticsearch.yml
cluster.name: MyElk node.name: node-1 path.data: /usr/local/elasticsearch/data path.logs: /usr/local/elasticsearch/logs network.host: 本机地址 http.port: 9200
#因为Centos6不支持SecComp,而ES6默认bootstrap.system_call_filter为true进行检测,所以导致检测失败,失败后直接导致ES不能启动,需要禁用一下两行,centos7则不用
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
#xpack.security.enabled: true #启用安全认证 这两个参数在后面6.启用安全认证再启用
#xpack.security.transport.ssl.enabled: true
修改系统参数:
sysctl -p
net.ipv4.ip_forward = 0 net.ipv4.conf.default.rp_filter = 1 net.ipv4.conf.default.accept_source_route = 0 kernel.sysrq = 0 kernel.core_uses_pid = 1 net.ipv4.tcp_syncookies = 1 kernel.msgmnb = 65536 kernel.msgmax = 65536 kernel.shmmax = 68719476736 kernel.shmall = 4294967296 kernel.core_pattern = /data/corefile/core.%p.%e vm.max_map_count = 655360 vm.swappiness = 0
vim /etc/security/limits.conf
elastic soft memlock unlimited elastic hard memlock unlimited * soft nofile 65536 * hard nofile 131072 * soft nproc 2048 * hard nproc 4096
创建普通用户:useradd elastic passwd elastic
赋予权限:chown -R elastic:elastic /usr/local/elasticsearch
后台启动elasticsearch:su – elastic && cd /usr/local/elasticsearch/bin && ./elasticsearch -d
5.安装kibana:
解压
修改配置文件:egrep -Ev \’#|^$\’ kibana.yml
server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: ["http://es_ip:9200"]
汉化kibana:https://github.com/anbai-inc/Kibana_Hanization
下载汉化包:https://github.com/anbai-inc/Kibana_Hanization/archive/master.zip
拷贝此项目中的translations文件夹
到kibana目录下的src/legacy/core_plugins/kibana/
目录下
修改kibana配置文件kibana.yml中的配置项: egrep -Ev \’#|^$\’ kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://es_ip:9200"]
i18n.locale: "zh-CN"
7.x版本官方自带汉化资源文件(位于kibana目录下的node_modules/x-pack/plugins/translations/translations/
目录
直接修改kibana配置文件kibana.yml中的配置项:i18n.locale: “zh-CN” 即可
启动kibana:cd /usr/local/kibana/bin && ./kibana
6.启用安全认证:
浏览器访问kibana:http://IP地址:5601,点击【管理】,点击【许可管理】,启用【试用30白金版】,启用后再次刷新不能正常进入kibana
修改elasticsearch配置文件:vim /usr/local/elasticsearch/config/elasticsearch.yml,添加
xpack.security.enabled: true #启用安全认证 xpack.security.transport.ssl.enabled: true
重启elasticsearch
初始化elasticsearch各组件密码:cd /usr/local/elasticsearch/bin && ./elasticsearch-setup-passwords interactive 输入你的密码
注意:配置elasticsearch ssl认证集群可参考这篇文章:https://www.cnblogs.com/chenjw-note/articles/10901632.html
修改kibana配置文件:egrep -Ev \’#|^$\’ /usr/local/kibana/config/kibana.yml
server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: ["http://es_ip:9200"] elasticsearch.username: "elastic" elasticsearch.password: "你的密码" i18n.locale: "zh-CN"
重启kibana:cd /usr/local/kibana/bin && ./kibana
访问登陆页面,超级管理员是:elastic 你的密码
完成安全认证
7.安装kafka:
下载kafka:http://kafka.apache.org/downloads
解压
修改配置文件:
egrep -Ev \’#|^$\’ /usr/local/kafka/config/zookeeper.properties
dataDir=/usr/local/kafka/data dataLogDir=/usr/local/kafka/logs clientPort=2181 maxClientCnxns=100 tickTime=2000 initLimit=10
egrep -Ev \’#|^$\’ /usr/local/kafka/config/server.properties
broker.id=0 listeners=PLAINTEXT://:9092 host.name=本机ip地址 #最好用域名 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 #最好用域名地址,多个地址则用逗号隔开 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
启动kafka:cd /usr/local/kafka/bin && sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties && sh kafka-server-start.sh -daemon ../config/server.properties
8.安装logstash:
解压
编写规则文件:vim /usr/local/logstash/bin/logstash_template.conf
#输入为kafka,topics是test,查询时间是1秒 【旧】 input { kafka { enable_auto_commit => true auto_commit_interval_ms => "1000" codec => "json" bootstrap_servers => "kafka_地址:9092" topics => ["test"]
auto_offset_reset => "latest"
group_id => "logstash-g1" #组建集群要用相同group_id
}
}
#使用filter 过滤出来我们想要的日志(未启用)
filter {
#需要在filebeat配置文件中定义自己的tags
if “tags_name” in [tags] {
grok {
#自定义日志格式,括号里边包含所有一个key和value,例子:(?<key>value)
match => { “message” => “(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+)\.\d+\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<database
s><\w+>):(?<SQL>.*)”}
#过滤完成之后把之前的message 移出掉,节约磁盘空间
remove_field => [“message”]
}
}
}
#stdout{ codec=>rubydebug}用来调试使用,运行时会输出到屏幕,调试完成注释掉 #在filebeat里面我们定义了fields的logtype,在这里可以用来区别不同的日志 #index为ES的数据库index名
output {
#stdout{ codec=>rubydebug}
elasticsearch {
hosts => [“es_ip:9200”]
user => “elastic”
password => “xxxxx”
index => “%{[fields][project]}-%{[fields][logtype]}-%{+YYYY.MM.dd}”
}
file {
path => “/usr/local/logstash/logs/%{[fields][project]}-%{[fields][logtype]}-%{+YYYY.MM.dd}.log” #输出到日志文件
}
}
input { 【新】 kafka { codec => "json" topics => ["k8s1","k8s2","k8s3","k8s4"] auto_offset_reset => "latest" group_id => "logstash-g1" #组建集群需要使用相同group_id bootstrap_servers => "kafka1地址:9092,kafka2地址:9092" } } filter { grok{ match => { "source" => "/data/%{USERNAME:server}/" #获取server的值 } } #日志分类1 if [fields][type] == "logtype" and [fields][project]=="docker" { grok { #日志格式:程序名 时间 日志等级 日志信息 match => { "message" => "\[%{USER:name}\]\[%{TIME:time}\]\[%{WORD:level}\] web session ip: %{IP:clientip}" } #clientip:获取请求客户端地址 match => { "message" => "\[%{USER:name}\]\[%{TIME:time}\]\[%{WORD:level}\] %{GREEDYDATA:data}" } remove_field => [ "message", "offset", "beat", "@version", "input_type" ] } } #日志分类2 if [fields][type] == "boamp" and [fields][project]=="k8s" { grok { #日志格式:HTTP GET /super_cmdb/login/ 200 [0.16, 172.17.1.1:39298] match => { "message" => "%{WORD:request} %{WORD:method} %{URIPATHPARAM:request_url} %{NUMBER:status} \[%{NUMBER:http_response_time}, %{IP:clientip}:%{NUMBER:port}\]" } #remove_field => [ "message" ] } } geoip { #解析请求客户端地址详细信息放到 "geoip":{}下 source => "clientip" } }
#stdout{ codec=>rubydebug}用来调试使用,运行时会输出到屏幕,调试完成注释掉
#在filebeat里面我们定义了fields的logtype,在这里可以用来区别不同的日志
#index为ES的数据库index名
output {
#stdout{ codec=>rubydebug} if "_geoip_lookup_failure" in [tags] { #判断geoip解析客户端ip是否成功,tags不用定义,如果有问题,程序自己写入的 elasticsearch { hosts => ["es_ip:9200","es_ip:9200"] index => "%{[fields][project]}-%{[fields][type]}-%{+YYYY.MM.dd}" user => "elastic" password => "密码" template => "/data/logstash/config/template_base.json" template_name => "template_base" template_overwrite => true } } else { elasticsearch { hosts => ["es_ip:9200","es_ip:9200"] index => "logstash-%{[fields][project]}-%{[fields][type]}-%{+YYYY.MM.dd}" user => "elastic" password => "密码" template => "/data/logstash/config/template_base.json" template_name => "template_base" template_overwrite => true } } file { #将日志输出到文件 path => "/data/logstash/logs/%{[fields][project]}/%{[fields][project]}-%{[fields][type]}-%{+YYYY.MM.dd}.log" } }
模板文件:vim /data/logstash/config/template_base.json 【可不使用,取消对应配置即可】
分片数一般以(节点数*1.5或3倍)来计算,分片副本一般有1个就够了,一个分片一般可以处理30G数据,也可以根据数据大小进行分片数量调整
{ "template" : "*", "order" : 0, "settings" : { "index" : { "number_of_shards" : "3", "number_of_replicas" : "1" } } }
启动logstash:cd /usr/local/logstash/bin/ && ./logstash -f logstash_template.conf
9.安装filebeat:
解压
修改配置文件:
egrep -Ev \'#|^$\' /usr/local/filebeat/filebeat.yml
filebeat.prospectors: - enabled: true paths: - /data/dddjs_*_s*a/server/build_result/app/log/* fields: project: dddjs type: game host_ip: 10.21.210.170 output.kafka: enabled: true hosts: ["kafka.com:9092"] topic: \'%{[fields][project]}\' #username: "bbh" #password: "xxxxx" #ssl.certificate_authorities: ["/etc/filebeat/ssl/ca.crt"] #ssl.certificate: "/etc/filebeat/ssl/client.crt" #ssl.key: "/etc/filebeat/ssl/client.key"
启动filebeat:cd /usr/local/logstash/bin/ && ./filebeat -e -f filebeat.yml
【注意:若启动后无法连接kafka,请修改/etc/hosts,绑定kafka主机名和ip】
10.破解x-pack
elk系统很多功能需要x-pack的支持,但官方要收费。
elasticsearch 启动前,记得添加这两个参数vim /usr/local/elasticsearch/config/elasticsearch.yml,导入license时需要开启安全认证
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
编译java文件方法参考此链接:https://www.cnblogs.com/bigben0123/p/10305204.html
修改jar包:此处6.7.2版本已修改好,点击链接下载即可:
版本6.7.2:https://files.cnblogs.com/files/chenjw-note/x-pack-core-6.7.2.zip
版本6.8.0:https://files.cnblogs.com/files/chenjw-note/x-pack-core-6.8.0.zip
替换jar包:【注意:当elasticsearch是集群时,需要将集群个节点先替换修改后的jar包,才可以成功导入license,在主节点导一次即可,会同步到其他节点】
cp -r /usr/local/elasticsearch/modules/x-pack-core/x-pack-core-6.7.2.jar /usr/local/elasticsearch/modules/x-pack-core/x-pack-core-6.7.2.jar.bak cp x-pack-core/x-pack-core-6.7.2.jar /usr/local/elasticsearch/modules/x-pack-core/x-pack-core-6.7.2.jar /usr/local/elasticsearch/modules/x-pack-core/
重启elasticsearch
申请license:https://license.elastic.co/registration
修改license.json,修改 type,expiry_date_in_millis,max_nodes 即可
"license":{ "uid":"719615b9-5188-4db4-a0f9-29863d669b4c", "type":"platinum", #白金会员 "issue_date_in_millis":1558310400000, "expiry_date_in_millis":252457920099, #到期时间戳,单位为毫秒ms "max_nodes":1000, "issued_to":"xxxxxxxxxx", "issuer":"Web Form", "signature":"AAAAAxxxxxxxxxxxxxxxxxxxxx", "start_date_in_millis":1558310400000 } }
导入license:
curl -XPUT -u elastic \'http://es_ip:9200/_xpack/license\' -H "Content-Type: application/json" -d @license.json 输入密码 输出:{"acknowledged":true,"license_status":"valid"} 则成功
验证许可时间:
curl -XGET -u elastic es_ip:9200/_license
11.启动各组件间的ssl认证
(1) 生成密钥证书
cat /usr/local/kafka/ssl/fd.ext
subjectAltName = DNS:*.kafka.xxx.com, DNS:server.kafka.xxx.com #ssl证书对应匹配的域名格式,通过添加DNS来增加多个域名匹配
cat /usr/local/kafka/ssl/ssl.sh
#!/bin/bash BASE_DIR=. # 保存路径 DAYS_VALID=7200 # 证书有效期 PASSWORD=xxxxx # 证书密码 NAME=server.kafka.xxx.com # 域名 DEPT=yunwei # 部门名 COMPANY=xxx # 公司名 CITY=gz # 城市 PROVINCE=gd # 省份 COUNTRY=CN # 国家 CERT_DIR="$BASE_DIR/ca" SERVER_DIR="$BASE_DIR/secrets" CLIENT_DIR="$BASE_DIR/client" CA_CERT_NAME="$CLIENT_DIR/ca.crt" CA_KEY_NAME="$CERT_DIR/ca.key" PWD_NAME="$SERVER_DIR/password" SERVER_KEYSTORE="$SERVER_DIR/server.keystore.jks" SERVER_TRUSTSTORE="$SERVER_DIR/server.truststore.jks" SERVER_CSR="$CERT_DIR/server.csr" SERVER_CERT="$CERT_DIR/server.crt" CLIENT_KEY="$CLIENT_DIR/client.key" CLIENT_CSR="$CERT_DIR/client.csr" CLIENT_CERT="$CLIENT_DIR/client.crt" SUBJ="/C=$COUNTRY/ST=$PROVINCE/L=$CITY/O=$COMPANY/OU=$DEPT/CN=$NAME/CN=$NAME" DNAME="CN=$NAME, OU=$DEPT, O=$COMPANY, L=$CITY, ST=$PROVINCE, C=$COUNTRY" mkdir -p $CERT_DIR mkdir -p $SERVER_DIR mkdir -p $CLIENT_DIR rm $CERT_DIR/* rm $SERVER_DIR/* rm $CLIENT_DIR/* echo "1. Generate CA certificate and key..." openssl req -new -x509 -keyout $CA_KEY_NAME -out $CA_CERT_NAME -days $DAYS_VALID \ -passin pass:"$PASSWORD" -passout pass:"$PASSWORD" -subj "$SUBJ" echo "" echo "2. Generate server key store..." keytool -genkey -keyalg RSA -keystore $SERVER_KEYSTORE -alias $NAME \ -keysize 2048 -validity $DAYS_VALID -storepass $PASSWORD -keypass $PASSWORD \ -dname "$DNAME" -ext SAN=DNS:$NAME echo "" echo "3. Export server certificate signing request..." keytool -certreq -keystore $SERVER_KEYSTORE -alias $NAME \ -file $SERVER_CSR -storepass $PASSWORD -keypass $PASSWORD -noprompt echo "" echo "4. Sign server certificate by CA..." openssl x509 -req -CAcreateserial -CA $CA_CERT_NAME -CAkey $CA_KEY_NAME \ -in $SERVER_CSR -out $SERVER_CERT -days $DAYS_VALID -passin pass:$PASSWORD -extfile fd.ext echo "" echo "5. Import CA to server key store..." keytool -import -keystore $SERVER_KEYSTORE -alias CARoot -file $CA_CERT_NAME \ -storepass $PASSWORD -keypass $PASSWORD -noprompt echo "" echo "6. Import server certificate to server key store..." keytool -import -keystore $SERVER_KEYSTORE -alias $NAME -file $SERVER_CERT \ -storepass $PASSWORD -keypass $PASSWORD -noprompt echo "" echo "7. Import CA to server trust store..." keytool -import -keystore $SERVER_TRUSTSTORE -alias CARoot -file $CA_CERT_NAME \ -storepass $PASSWORD -keypass $PASSWORD -noprompt echo "" echo "8. Generate client key and certificate request..." openssl req -nodes -new -keyout $CLIENT_KEY -out $CLIENT_CSR -days $DAYS_VALID \ -subj "$SUBJ" echo "" echo "9. Sign client certificate by CA..." openssl x509 -req -CAcreateserial -CA $CA_CERT_NAME -CAkey $CA_KEY_NAME \ -in $CLIENT_CSR -out $CLIENT_CERT -days $DAYS_VALID -passin pass:$PASSWORD echo "" echo "10. Generate password file..." echo "$PASSWORD" > $PWD_NAME rm .srl echo "" echo "####### Done. #######" echo "Following files were generated" echo "Server password file: $PWD_NAME" echo "Server java keystore: $SERVER_KEYSTORE" echo "Server java truststore: $SERVER_TRUSTSTORE" echo "Signed Client cert: $CLIENT_CERT" echo "Client RSA private key: $CLIENT_KEY" echo "Client PEM truststore: $CA_CERT_NAME"
执行:cd /usr/local/kafka/ssl/ && ./ssl.sh
(2) 把ssl证书拷贝到各组件的机器上
(3) 修改各组件配置文件
vim /usr/local/kafka/config/server.properties
host.name=server.kafka.xxx.com listeners=PLAINTEXT://:9091,SSL://:9092 ssl.keystore.location=/usr/local/kafka/ssl/secrets/server.keystore.jks ssl.keystore.password=xxxx ssl.key.password=xxxx ssl.truststore.location=/usr/local/kafka/ssl/secrets/server.truststore.jks ssl.truststore.password=xxxx
重启kafka
vim /usr/local/logstash/bin/logstash_template.conf
security_protocol => "SSL" ssl_keystore_password => "xxxx" ssl_keystore_location => "/usr/local/kafka/ssl/secrets/server.keystore.jks" ssl_keystore_password => "xxxx" ssl_truststore_password => "xxxx" ssl_truststore_location => "/usr/local/kafka/ssl/secrets/server.truststore.jks"
重启logstash
vim /usr/local/filebeat/filebeat.yml
#输出到kafka output.kafka: enable: true hosts: ["server.kafka.xxx.com:9092"] topic: \'test\' ssl.certificate_authorities: ["/usr/local/filebeat/ssl/ca.crt"] ssl.certificate: "/usr/local/filebeat/ssl/client.crt" ssl.key: "/usr/local/filebeat/ssl/client.key"
重启filebeat
(4) 注意:记得进行域名解析或者绑定主机hosts
至此,elk平台详解已完成 ,真是一个漫长的过程,在此总结,希望给正在研究苦恼的朋友有所帮助,同时也给自己留下笔记,方便日后查阅。