Docker安装ELK并实现JSON格式日志分析的方法
elk是什么
elk是elastic公司提供的一套完整的日志收集以及前端展示的解决方案,是三个产品的首字母缩写,分别是elasticsearch、logstash和kibana。
其中logstash负责对日志进行处理,如日志的过滤、日志的格式化等;elasticsearch具有强大的文本搜索能力,因此作为日志的存储容器;而kibana负责前端的展示。
elk搭建架构如下图:
加入了filebeat用于从不同的客户端收集日志,然后传递到logstash统一处理。
elk的搭建
因为elk是三个产品,可以选择依次安装这三个产品。
这里选择使用docker安装elk。
docker安装elk也可以选择分别下载这三个产品的镜像并运行,但是本次使用直接下载elk的三合一镜像来安装。
因此首先要保证已经有了docker的运行环境,docker运行环境的搭建请查看:https://blog.csdn.net/qq13112...
拉取镜像
有了docker环境之后,在服务器运行命令:
docker pull sebp/elk
这个命令是在从docker仓库下载elk三合一的镜像,总大小为2个多g,如果发现下载速度过慢,可以将docker仓库源地址替换为国内源地址。
下载完成之后,查看镜像:
docker images
logstash配置
在/usr/config/logstash
目录下新建beats-input.conf,用于日志的输入:
input { beats { port => 5044 } }
新建output.conf,用于日志由logstash到elasticsearch的输出:
output { elasticsearch { hosts => ["localhost"] manage_template => false index => "%{[@metadata][beat]}" } }
其中的index
为输出到elasticsearch后的index
。
运行容器
有了镜像之后直接启动即可:
docker run -d -p 5044:5044 -p 5601:5601 -p 9203:9200 -p 9303:9300 -v /var/data/elk:/var/lib/elasticsearch -v /usr/config/logstash:/etc/logstash/conf.d --name=elk sebp/elk
-d的意思是后台运行容器;
-p的意思是宿主机端口:容器端口,即将容器中使用的端口映射到宿主机上的某个端口,elasticsearch的默认端口是9200和9300,由于我的机器上已经运行了3台elasticsearch实例,因此此处将映射端口进行了修改;
-v的意思是宿主机的文件|文件夹:容器的文件|文件夹,此处将容器中elasticsearch 的数据挂载到宿主机的/var/data/elk
上,以防容器重启后数据的丢失;并且将logstash的配置文件挂载到宿主机的/usr/config/logstash
目录。
--name的意思是给容器命名,命名是为了之后操作容器更加方便。
如果你之前搭建过elasticsearch的话,会发现搭建的过程中有各种错误,但是使用docker搭建elk的过程中并没有出现那些错误。
运行后查看容器:
docker ps
查看容器日志:
docker logs -f elk
进入容器:
docker exec -it elk /bin/bash
修改配置后重启容器:
docker restart elk
查看kinaba
浏览器输入http://my_host:5601/
即可看到kinaba界面。此时elasticsearch中还没有数据,需要安装filebeat采集数据到elk中。
filebeat搭建
filebeat用于采集数据并上报到logstash或者elasticsearch,在需要采集日志的服务器上下载filebeat并解压即可使用
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.1-linux-x86_64.tar.gz
tar -zxvf filebeat-6.2.1-linux-x86_64.tar.gz
修改配置文件
进入filebeat,修改filebeat.yml。
filebeat.prospectors: - type: log #需要设置为true配置才能生效 enabled: true path: #配置需要采集的日志路径 - /var/log/*.log #可以打一个tag以后分类使用 tag: ["my_tag"] #对应elasticsearch的type document_type: my_type setup.kibana: #此处为kibana的ip及端口,即kibana:5601 host: "" output.logstash: #此处为logstash的ip及端口,即logstash:5044 host: [""] #需要设置为true,否则不生效 enabled: true #如果想直接从filebeat采集数据到elasticsearch,则可以配置output.elasticsearch的相关配置
运行filebeat
运行:
./filebeat -e -c filebeat.yml -d "publish"
此时可以看到filebeat会将配置的path下的log发送到logstash;然后在elk中,logstash处理完数据之后就会发送到elasticsearch。但我们想做的是通过elk进行数据分析,因此导入到elasticsearch的数据必须是json格式的。
这是之前我的单条日志的格式:
2019-10-22 10:44:03.441 info rmjk.interceptors.ipinterceptor line:248 - {"clienttype":"1","decode":"0fbd93a286533d071","eatype":2,"eaid":191970823383420928,"ip":"xx.xx.xx.xx","model":"honor stf-al10","ostype":"9","path":"/applicationenter","result":5,"session":"ef0a5c4bca424194b29e2ff31632ee5c","timestamp":1571712242326,"uid":"130605789659402240","v":"2.2.4"}
导入之后不好分析,之后又想到使用logstash的filter中的grok来处理日志使之变成json格式之后再导入到elasticsearch中,但是由于我的日志中的参数是不固定的,发现难度太大了,于是转而使用logback,将日志直接格式化成json之后,再由filebeat发送。
logback配置
我的项目是spring boot,在项目中加入依赖:
<dependency> <groupid>net.logstash.logback</groupid> <artifactid>logstash-logback-encoder</artifactid> <version>5.2</version> </dependency>
然后在项目中的resource目录下加入logback.xml:
<?xml version="1.0" encoding="utf-8"?> <configuration> <!-- 说明: 1、日志级别及文件 日志记录采用分级记录,级别与日志文件名相对应,不同级别的日志信息记录到不同的日志文件中 例如:error级别记录到log_error_xxx.log或log_error.log(该文件为当前记录的日志文件),而log_error_xxx.log为归档日志, 日志文件按日期记录,同一天内,若日志文件大小等于或大于2m,则按0、1、2...顺序分别命名 例如log-level-2013-12-21.0.log 其它级别的日志也是如此。 2、文件路径 若开发、测试用,在eclipse中运行项目,则到eclipse的安装路径查找logs文件夹,以相对路径../logs。 若部署到tomcat下,则在tomcat下的logs文件中 3、appender fileerror对应error级别,文件名以log-error-xxx.log形式命名 filewarn对应warn级别,文件名以log-warn-xxx.log形式命名 fileinfo对应info级别,文件名以log-info-xxx.log形式命名 filedebug对应debug级别,文件名以log-debug-xxx.log形式命名 stdout将日志信息输出到控制上,为方便开发测试使用 --> <contextname>service</contextname> <property name="log_path" value="logs"/> <!--设置系统日志目录--> <property name="appdir" value="doctor"/> <!-- 日志记录器,日期滚动记录 --> <appender name="fileerror" class="ch.qos.logback.core.rolling.rollingfileappender"> <!-- 正在记录的日志文件的路径及文件名 --> <file>${log_path}/${appdir}/log_error.log</file> <!-- 日志记录器的滚动策略,按日期,按大小记录 --> <rollingpolicy class="ch.qos.logback.core.rolling.timebasedrollingpolicy"> <!-- 归档的日志文件的路径,例如今天是2013-12-21日志,当前写的日志文件路径为file节点指定,可以将此文件与file指定文件路径设置为不同路径,从而将当前日志文件或归档日志文件置不同的目录。 而2013-12-21的日志文件在由filenamepattern指定。%d{yyyy-mm-dd}指定日期格式,%i指定索引 --> <filenamepattern>${log_path}/${appdir}/error/log-error-%d{yyyy-mm-dd}.%i.log</filenamepattern> <!-- 除按日志记录之外,还配置了日志文件不能超过2m,若超过2m,日志文件会以索引0开始, 命名日志文件,例如log-error-2013-12-21.0.log --> <timebasedfilenamingandtriggeringpolicy class="ch.qos.logback.core.rolling.sizeandtimebasedfnatp"> <maxfilesize>2mb</maxfilesize> </timebasedfilenamingandtriggeringpolicy> </rollingpolicy> <!-- 追加方式记录日志 --> <append>true</append> <!-- 日志文件的格式 --> <encoder class="ch.qos.logback.classic.encoder.patternlayoutencoder"> <pattern>%d{yyyy-mm-dd hh:mm:ss.sss} %-5level %logger line:%-3l - %msg%n</pattern> <charset>utf-8</charset> </encoder> <!-- 此日志文件只记录info级别的 --> <filter class="ch.qos.logback.classic.filter.levelfilter"> <level>error</level> <onmatch>accept</onmatch> <onmismatch>deny</onmismatch> </filter> </appender> <!-- 日志记录器,日期滚动记录 --> <appender name="filewarn" class="ch.qos.logback.core.rolling.rollingfileappender"> <!-- 正在记录的日志文件的路径及文件名 --> <file>${log_path}/${appdir}/log_warn.log</file> <!-- 日志记录器的滚动策略,按日期,按大小记录 --> <rollingpolicy class="ch.qos.logback.core.rolling.timebasedrollingpolicy"> <!-- 归档的日志文件的路径,例如今天是2013-12-21日志,当前写的日志文件路径为file节点指定,可以将此文件与file指定文件路径设置为不同路径,从而将当前日志文件或归档日志文件置不同的目录。 而2013-12-21的日志文件在由filenamepattern指定。%d{yyyy-mm-dd}指定日期格式,%i指定索引 --> <filenamepattern>${log_path}/${appdir}/warn/log-warn-%d{yyyy-mm-dd}.%i.log</filenamepattern> <!-- 除按日志记录之外,还配置了日志文件不能超过2m,若超过2m,日志文件会以索引0开始, 命名日志文件,例如log-error-2013-12-21.0.log --> <timebasedfilenamingandtriggeringpolicy class="ch.qos.logback.core.rolling.sizeandtimebasedfnatp"> <maxfilesize>2mb</maxfilesize> </timebasedfilenamingandtriggeringpolicy> </rollingpolicy> <!-- 追加方式记录日志 --> <append>true</append> <!-- 日志文件的格式 --> <encoder class="ch.qos.logback.classic.encoder.patternlayoutencoder"> <pattern>%d{yyyy-mm-dd hh:mm:ss.sss} %-5level %logger line:%-3l - %msg%n</pattern> <charset>utf-8</charset> </encoder> <!-- 此日志文件只记录info级别的 --> <filter class="ch.qos.logback.classic.filter.levelfilter"> <level>warn</level> <onmatch>accept</onmatch> <onmismatch>deny</onmismatch> </filter> </appender> <!-- 日志记录器,日期滚动记录 --> <appender name="fileinfo" class="ch.qos.logback.core.rolling.rollingfileappender"> <!-- 正在记录的日志文件的路径及文件名 --> <file>${log_path}/${appdir}/log_info.log</file> <!-- 日志记录器的滚动策略,按日期,按大小记录 --> <rollingpolicy class="ch.qos.logback.core.rolling.timebasedrollingpolicy"> <!-- 归档的日志文件的路径,例如今天是2013-12-21日志,当前写的日志文件路径为file节点指定,可以将此文件与file指定文件路径设置为不同路径,从而将当前日志文件或归档日志文件置不同的目录。 而2013-12-21的日志文件在由filenamepattern指定。%d{yyyy-mm-dd}指定日期格式,%i指定索引 --> <filenamepattern>${log_path}/${appdir}/info/log-info-%d{yyyy-mm-dd}.%i.log</filenamepattern> <!-- 除按日志记录之外,还配置了日志文件不能超过2m,若超过2m,日志文件会以索引0开始, 命名日志文件,例如log-error-2013-12-21.0.log --> <timebasedfilenamingandtriggeringpolicy class="ch.qos.logback.core.rolling.sizeandtimebasedfnatp"> <maxfilesize>2mb</maxfilesize> </timebasedfilenamingandtriggeringpolicy> </rollingpolicy> <!-- 追加方式记录日志 --> <append>true</append> <!-- 日志文件的格式 --> <encoder class="ch.qos.logback.classic.encoder.patternlayoutencoder"> <pattern>%d{yyyy-mm-dd hh:mm:ss.sss} %-5level %logger line:%-3l - %msg%n</pattern> <charset>utf-8</charset> </encoder> <!-- 此日志文件只记录info级别的 --> <filter class="ch.qos.logback.classic.filter.levelfilter"> <level>info</level> <onmatch>accept</onmatch> <onmismatch>deny</onmismatch> </filter> </appender> <appender name="jsonlog" class="ch.qos.logback.core.rolling.rollingfileappender"> <!-- 正在记录的日志文件的路径及文件名 --> <file>${log_path}/${appdir}/log_ipinterceptor.log</file> <rollingpolicy class="ch.qos.logback.core.rolling.timebasedrollingpolicy"> <filenamepattern>${log_path}/${appdir}/log_ipinterceptor.%d{yyyy-mm-dd}.log</filenamepattern> </rollingpolicy> <encoder class="net.logstash.logback.encoder.loggingeventcompositejsonencoder"> <jsonfactorydecorator class="net.logstash.logback.decorate.characterescapesjsonfactorydecorator"> <escape> <targetcharactercode>10</targetcharactercode> <escapesequence>\u2028</escapesequence> </escape> </jsonfactorydecorator> <providers> <pattern> <pattern> { "timestamp":"%date{iso8601}", "uid":"%mdc{uid}", "requestip":"%mdc{ip}", "id":"%mdc{id}", "clienttype":"%mdc{clienttype}", "v":"%mdc{v}", "decode":"%mdc{decode}", "dataid":"%mdc{dataid}", "datatype":"%mdc{datatype}", "vid":"%mdc{vid}", "did":"%mdc{did}", "cid":"%mdc{cid}", "tagid":"%mdc{tagid}" } </pattern> </pattern> </providers> </encoder> </appender> <!-- 彩色日志 --> <!-- 彩色日志依赖的渲染类 --> <conversionrule conversionword="clr" converterclass="org.springframework.boot.logging.logback.colorconverter"/> <conversionrule conversionword="wex" converterclass="org.springframework.boot.logging.logback.whitespacethrowableproxyconverter"/> <conversionrule conversionword="wex" converterclass="org.springframework.boot.logging.logback.extendedwhitespacethrowableproxyconverter"/> <!-- 彩色日志格式 --> <property name="console_log_pattern" value="${console_log_pattern:-%clr(%d{yyyy-mm-dd hh:mm:ss.sss}){faint} %clr(${log_level_pattern:-%5p}) %clr(${pid:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${log_exception_conversion_word:-%wex}}"/> <appender name="stdout" class="ch.qos.logback.core.consoleappender"> <!--encoder 默认配置为patternlayoutencoder--> <encoder> <pattern>${console_log_pattern}</pattern> <charset>utf-8</charset> </encoder> <!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息--> <filter class="ch.qos.logback.classic.filter.thresholdfilter"> <level>debug</level> </filter> </appender> <!-- 指定项目中某个包,当有日志操作行为时的日志记录级别 --> <!-- rmjk.dao.mappe为根包,也就是只要是发生在这个根包下面的所有日志操作行为的权限都是debug --> <!-- 级别依次为【从高到低】:fatal > error > warn > info > debug > trace --> <logger name="rmjk.dao.mapper" level="debug"/> <logger name="rmjk.service" level="debug"/> <!--显示日志--> <logger name="org.springframework.jdbc.core" additivity="false" level="debug"> <appender-ref ref="stdout"/> <appender-ref ref="fileinfo"/> </logger> <!-- 打印json日志 --> <logger name="ipinterceptor" level="info" additivity="false"> <appender-ref ref="jsonlog"/> </logger> <!-- 生产环境下,将此级别配置为适合的级别,以免日志文件太多或影响程序性能 --> <root level="info"> <appender-ref ref="fileerror"/> <appender-ref ref="filewarn"/> <appender-ref ref="fileinfo"/> <!-- 生产环境将请stdout,testfile去掉 --> <appender-ref ref="stdout"/> </root> </configuration>
其中的关键为:
<logger name="ipinterceptor" level="info" additivity="false"> <appender-ref ref="jsonlog"/> </logger>
在需要打印的文件中引入slf4j:
private static final logger log = loggerfactory.getlogger("ipinterceptor");
mdc中放入需要打印的信息:
mdc.put("ip", ipaddress); mdc.put("path", servletpath); mdc.put("uid", parammap.get("uid") == null ? "" : parammap.get("uid").tostring());
此时如果使用了log.info("msg")
的话,打印的内容会输入到日志的message中,日志格式如下:
修改logstash配置
修改/usr/config/logstash
目录下的beats-input.conf:
input { beats { port => 5044 codec => "json" } }
只加了一句codec => "json"
,但是logstash会按照json格式来解析输入的内容。
因为修改了配置,重启elk:
docker restart elk
这样,当我们的日志生成完毕之后,使用filebeat导入到elk中,就可以通过kibana来进行日志分析了。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。