基于ELK收集Nginx的日志
前言
前面又讲到,如何从0搭建ELK服务,如果还不太清楚的可以参考: https://lupf.cn/articles/2020/04/23/1587622994136.html
关于日志规范,可以参考 :https://lupf.cn/articles/2020/06/04/1591273110824.html
Nginx日志采集
第一步,规范Nginx日志
这里就直接讲配置了,就不涉及到软件的安装了,如果不知道可以参考上面的文章搭建
这里就直接讲配置了,就不涉及到软件的安装了,如果不知道可以参考上面的文章搭建
这里就直接讲配置了,就不涉及到软件的安装了,如果不知道可以参考上面的文章搭建
-
配置
log_format main '[$time_iso8601] ' # 通用日志格式的本地时间 '[$hostname] ' # 主机名 '[$remote_addr] ' # 客户端地址 '[$remote_port] ' # 客户端端口 '[$connection] ' # 连接序列号 '[$server_addr] ' # 接受请求的服务器的地址 '[$server_port] ' # 接受请求的服务器的端口 '[$request_id] ' # 请求的唯一ID '[$request_method] ' # 请求方式 '[$request_uri] ' # 完整的原始请求URI '[$request_time] ' # 请求以毫秒为单位的处理时间,以毫秒为单位(1.3.9,1.2.6;从客户端读取第一个字节以来经过的时间 '[$request_length] ' # 请求长度 '[$request_body] ' # 请求主体 '[-] ' #'[$bytes_received] ' #从客户端收到的字节数 '[$content_type] ' # 内容类型 '[$content_length] ' # 内容长度 '[$remote_user] ' # 基本身份验证随附的用户名 '[$request] ' # 完整的原始请求行 '[$scheme] ' # 请求方案 http或https '[$msec] ' # 请求以秒为单位的时间戳 '[-] ' #'[$protocol] ' # 与客户端通信的协议: TCP或UDP '[-] ' #'[$session_time] ' # 会话持续时间 '[$upstream_addr] ' # 与上游的连接地址 '[$upstream_response_time] ' # 从上游接受响应的耗时 '[$upstream_connect_time] ' # 与上游建立连接的时间 '[$upstream_bytes_sent] ' # 发送到上游服务器的字节数 '[$upstream_bytes_received] ' # 发送到上游服务器的字节数 '[$upstream_status] ' # 从上游获取的响应状态 '[$status] ' # 响应状态 '[$body_bytes_sent] ' # 发送给客户端的字节数,不计算响应头 '[$bytes_sent] ' # 发送给客户端的字节数 '[$http_user_agent] ' '[$http_referer] ' '[$http_host] ' '[$http_x_forwarded_for] ' '[$http_Authorization] ' '[$cookie_uid] '; # 日志保存的位置 access_log logs/busi_log/access.log main;
日志输出的样例,该日志即根据上面的配置输出的日志
[2020-09-23T00:11:00+08:00] [lvs-webserver1] [192.168.1.28] [56899] [6534884] [192.168.1.207] [443] [e3094fa308646dccdf170c95abcff40d] [GET] [/r/refs?service=git-upload-pack] [0.008] [253] [-] [-] [-] [-] [-] [GET /r/refs?service=git-upload-pack HTTP/1.1] [https] [1600791060.059] [-] [-] [192.168.1.202:10101] [0.008] [0.005] [335] [514] [401] [401] [328] [589] [git/2.26.0] [-] [git.tongkabao.com.cn] [-] [-] [-] [2020-09-23T00:11:00+08:00] [lvs-webserver1] [192.168.1.28] [56900] [6534887] [192.168.1.207] [443] [ecde0d3f923432367266b2aea7eb5562] [GET] [/r/refs?service=git-upload-pack] [0.004] [252] [-] [-] [-] [-] [-] [GET /r/refs?service=git-upload-pack HTTP/1.1] [https] [1600791060.066] [-] [-] [192.168.1.202:10101] [0.004] [0.000] [334] [513] [401] [401] [327] [588] [git/2.26.0] [-] [git.tongkabao.com.cn] [-] [-] [-] [2020-09-23T00:11:00+08:00] [lvs-webserver1] [192.168.1.28] [56905] [6534898] [192.168.1.207] [443] [ab7443972fd6eade17833a4e0070eec1] [GET] [/r/refs?service=git-upload-pack] [0.004] [288] [-] [-] [-] [-] [jenkins] [GET /r/refs?service=git-upload-pack HTTP/1.1] [https] [1600791060.077] [-] [-] [192.168.1.202:10101] [0.004] [0.000] [370] [514] [401] [401] [328] [589] [git/2.26.0] [-] [git.tongkabao.com.cn] [-] [Basic amVua2luczo=] [-]
第二步,FileBeat采集N给inx日志
在nginx的http节点下配置
FileBeat有提供Nginx的收集模块,但是我们也完全可以把Nginx的日志单纯当作一个普通的日志去采集
-
配置 filebeat.yml
#============== Filebeat prospectors =========== filebeat.prospectors: - type: log paths: - /usr/local/openresty/nginx/logs/busi_log/access.log document_type: "nginx-access-log" fileset: module: nginx name: access fields: module: nginx name: access log_topic: nginx-log service: kt # 自定义的服务名称 evn: dev # 自定义的环境 用户将不同环境的日志采集到不同的kafka topic中 multiline: pattern: '^\[' #指定多行匹配的表达式 以[开头的标识一行新的数据 negate: true # 是否匹配到 match: after # 没有匹配上正则合并到上一行末尾 max_lines: 1000 # 最大未匹配上的行数 timeout: 2s # 指定时间没有新的日志 就不等待后面的日志输入 # 以下为error的采集 暂时关闭 #- type: log # paths: # - /usr/local/openresty/nginx/logs/error.log # document_type: "nginx-error-log" # fileset: # module: nginx # name: error # fields: # module: nginx # name: error # log_topic: nginx-log # evn: dev # # multiline: # pattern: '^\[' #指定多行匹配的表达式 以[开头的标识一行新的数据 # negate: true # 是否匹配到 # match: after # 没有匹配上正则合并到上一行末尾 # max_lines: 1000 # 最大未匹配上的行数 # timeout: 2s # 指定时间没有新的日志 就不等待后面的日志输入 ## 定义输出的方式 output.kafka: enabled: true # 启动 hosts: ["kafka地址1:9092","kafka地址1:9093"] #kafka的ip和port topic: '%{[fields.log_topic]}-%{[fields.evn]}' #指定输出到的topicname 也就是上面定义的变量 partition.hash: # 分区规则 hash reachable_only: true compression: gzip # 数据压缩 max_message_bytes: 1000000 # 最大的消息字节数 required_acks: 1 # kafka ack的方式 0:丢出去就好了 1:有一个响应就好了 -1:所有节点响应才算成功 logging.to_files: true #
-
注意点1
日志的地址一定要正确
-
注意点2
fields节点相当于自定义参数,是为了后续保存到不同的topic index做准备的
-
注意点3
kafka地址
-
注意点4
Kafka中需要提前创建好对应的topic
-
logStash配置
-
配置
# 添加以下配置 # 输入从kafka input { kafka { topics_pattern => "nginx-log-.*" # 也可以模糊匹配 如:err-log-* 这样就可以匹配到 err-log-product err-log-user bootstrap_servers => "ip:9092" # kafka的ip 端口 codec => json # 数据格式 consumer_threads => 1 # 对应partition的数量 decorate_events => true #auto_offset_rest => "latest" # 默认值就是这个 group_id => "nginx-logs-group" # kafka的消费组 client_id => "logstash-1-1" } } filter { ruby { code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))" } if "nginx-log" in [fields][log_topic] { grok { match => ["message","\[%{NOTSPACE:currentDataTime}\] \[%{DATA:hostname}\] \[%{NOTSPACE:remoteAddr}\] \[%{NOTSPACE:remote.port}\] \[%{NOTSPACE:connection}\] \[%{NOTSPACE:server.addr}\] \[%{NOTSPACE:server.port}\] \[%{DATA:request.id}\] \[%{DATA:request.method}\] \[%{DATA:request.uri}\] \[%{DATA:request.time}\] \[%{DATA:request.length}\] \[%{DATA:request.body}\] \[%{DATA:bytesReceived}\] \[%{DATA:content.type}\] \[%{DATA:content.length}\] \[%{DATA:remote.user}\] \[%{DATA:request.info}\] \[%{DATA:scheme}\] \[%{DATA:msec}\] \[%{DATA:protocol}\] \[%{DATA:sessionTime}\] \[%{DATA:upstream.addr}\] \[%{DATA:upstream.responseTime}\] \[%{DATA:upstream.connectTime}\] \[%{DATA:upstream.bytesSent}\] \[%{DATA:upstream.bytesReceived}\] \[%{DATA:upstream.status}\] \[%{DATA:status}\] \[%{DATA:bodyBytesSent}\] \[%{DATA:bytesSent}\] \[%{DATA:http.user_agent}\] \[%{DATA:http.referer}\] \[%{DATA:http.host}\] \[%{DATA:http.x_forwarded_for}\] \[%{DATA:http.Authorization}\] \[%{DATA:cookie.uid}\] "] } # 根据条件添加数据 if "iPhone;" in [http.user_agent] { mutate { add_field => { "deviceOS" => "iphone OS" } } }else if "Android" in [http.user_agent] { mutate { add_field => { "deviceOS" => "Android" } } }else if "Windows" in [http.user_agent] { mutate { add_field => { "deviceOS" => "Windows" } } }else if "Macintosh;" in [http.user_agent] { mutate { add_field => { "deviceOS" => "Mac OS" } } }else { mutate { add_field => { "deviceOS" => "unkown" } } } # 根据geoip 分析用户所处的地区 geoip { source => [remoteAddr] target => "geoip" database => "/usr/local/logstash-6.8.9/GeoLite2-City.mmdb" #add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] #add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } # 类型转换 mutate { convert => [ "bodyBytesSent", "integer" ] convert => [ "bytesSent", "integer" ] convert => [ "remote.port", "integer" ] convert => [ "request.length", "integer" ] convert => [ "server.port", "integer" ] convert => [ "status", "integer" ] convert => [ "upstream.bytesReceived", "integer" ] convert => [ "upstream.bytesSent", "integer" ] convert => [ "upstream.status", "integer" ] convert => [ "msec", "float" ] convert => [ "request.time", "float" ] convert => [ "upstream.connectTime", "float" ] convert => [ "upstream.responseTime", "float" ] #convert => [ "[geoip][coordinates]", "float" ] } } # } } ## 输出方式 ## 到控制台 当测试无误之后,可以将此输出关闭 output { stdout { codec => rubydebug } } output { if "nginx-log" in [fields][log_topic] { elasticsearch { hosts => ["ip:9200"] # es的ip 端口 # 用户名 密码 user => "用户名" password => "密码" # 索引名称(index) # 索引格式:all-log-应用名称-按天分组 # [fields][logbiz]为filebeat中定义的变量 index => "logstash-%{[fields][module]}-%{[fields][name]}-%{[fields][evn]}-%{index_time}" # 是否嗅探集群 # 通过嗅探机制解析es集群的负载均衡发送日志 sniffing => true # logstash默认自带一个mapping模版,进行模版覆盖 template_overwrite => true } } }
-
注意点1
kafka的地址
-
注意点2
kafka中的topic,不能填错了
-
注意点3;GEO
可以按以下地址下载
链接:https://pan.baidu.com/s/1aMZdF8uyCu46eaJwrH-S_Q
提取码:4aj7 -
注意点4;添加字段和转换字段
-
注意点5;ES的index得根据自己的场景规划好
-
ELK采集Nginx,完。。。