Log4j2对接Logstash

前言

Log4j2对接ELK

  1. Log4j2 使用 Socketlogstash 推送数据
  2. logstash经过过滤后,把数据保存到 ElasticSearch 中。
  3. log4j2使用socket推送日志到 logstash

环境信息:

  • 部署路径:/opt/logstash
  • Log4j2 对接方式:Socket
  • 部署地址:192.168.2.8
  • logstash建议使用 grok插件

grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据解析成结构化和方便查询的结构。

在线测试 官方pattern 插件安装

安装插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 查看
/opt/logstash/bin/logstash-plugin list
# 在线安装
/opt/logstash/bin/logstash-plugin install logstash-filter-grok
vim /opt/logstash/Gemfile
# 更新
/opt/logstash/bin/logstash-plugin update logstash-filter-grok
mkdir /opt/logstash/plugins
cd /opt/logstash/plugins
wget https://github.com/logstash-plugins/logstash-filter-grok/archive/refs/tags/v4.4.2.tar.gz
tar -zxvf v4.4.2.tar.gz
vim /opt/logstash/Gemfile
# 写入
# gem "logstash-filter-grok", :path => "path"

配置

logstash

配置 PIP 文件

完成实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
cat > /opt/logstash/config/app-log4j2.conf <<EOF
input {
tcp {
host =>"192.168.2.8"
port => 9699
codec => json {
charset => "UTF-8"
}
}
stdin{}
}
filter{
json{
source => "message"
}
}
output {
elasticsearch {
#action => "index"
#manage_template => false
hosts => "192.168.2.8:9200"
index => "app-logstash"
#document_type => "logstash"
}
stdout { codec=> rubydebug }
}
EOF

配置 pipelines.yml(可选)

vim /opt/logstash/config/pipelines.yml

1
2
- pipeline.id: app-log4j2-processing
path.config: "/opt/logstash/config/app-log4j2.conf"

配置 logstash.yml(可选)

1
path.config: "/opt/logstash/config/piptlines.yml"

启动logstash

1
2
# 如果 pipelines.yml 与 logstash.yml 没有配置,需要命令行中主动添加配置
/opt/logstash/bin/logstash -f /opt/logstash/config/app-log4j2.conf

项目

log4j2.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Configuration:
status: warn
Appenders:
Console:
name: CONSOLE
target: SYSTEM_OUT
PatternLayout:
pattern: "%clr{%d{yyyy-MM-dd HH:mm:ss,SSS}}{faint}:%clr{%-6p} %clr{${sys:PID}}{magenta} [thread-%T] %clr{[%t]}{faint} %clr{%-40.40c{1.}:}{cyan} %m%n%xwEx"
Socket:
- name: LOGSTASH
host: 192.168.2.8
port: 9699
protocol: TCP
thresholdFilter:
level: INFO
# 大于等于 info 放行
onMatch: ACCEPT
# 低于 info 拦截
onMismatch: DENY
PatternLayout:
pattern: "%d{yyyy-MM-dd HH:mm:ss,SSS}:%-6p ${sys:PID} [thread-%T] [%t] %-40.40c{1.}: %m%n%xwEx"
Loggers:
Root:
level: INFO
AppenderRef:
- ref: CONSOLE
Logger:
- name: com.skytech
additivity: false
level: DEBUG
AppenderRef:
- ref: CONSOLE
- ref: LOGSTASH

异步日志

依赖

1
2
3
4
5
 <dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>

Springboot配置log4j2.component.properties

1
Log4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector

https://logging.apache.org/log4j/2.x/manual/async.html

app-log4j2.conf配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
input { 
tcp {
host =>"192.168.2.8"
port => 9699
codec => "plain"
}
}
filter{
if "c.s.s.l.s.f.StorageApiLogFileServiceImpl" in [message] {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:logDate}:%{LOGLEVEL:logLevel}%{SPACE}%{INT:pid}%{SPACE}\[%{WORD}-%{INT:thread}\]%{SPACE}\[%{WORD}-%{WORD}-%{INT}-%{WORD}-%{INT}\]%{SPACE}(?<targetClass>.*):%{SPACE}接口日志信息:(?<logData>.*)"
}
}
json {
source => "logData"
target => "logDataJson"
remove_field => ["server","timestamp"]
}
mutate {
add_field => {
"id" => "%{[logDataJson][id]}"
"optType" => "%{[logDataJson][optType]}"
"optTime" => "%{[logDataJson][optTime]}"
"trackId" => "%{[logDataJson][trackId]}"
"spendTime" => "%{[logDataJson][spendTime]}"
"optUser" => "%{[logDataJson][optUser]}"
"optUserName" => "%{[logDataJson][optUserName]}"
"otherValue" => "%{[logDataJson][otherValue]}"

"optName" => "%{[logDataJson][optName]}"
"optDesc" => "%{[logDataJson][optDesc]}"
"apiMapping"=> "%{[logDataJson][apiMapping]}"
"optMethod" => "%{[logDataJson][optMethod]}"
"sourceIp" => "%{[logDataJson][sourceIp]}"
"successFlag" => "%{[logDataJson][successFlag]}"
"requestHeaders" => "%{[logDataJson][requestHeaders]}"
"requestParams" => "%{[logDataJson][requestParams]}"
"requestBody" => "%{[logDataJson][requestBody]}"
"responseHeaders" => "%{[logDataJson][responseHeaders]}"
"responseBody" => "%{[logDataJson][responseBody]}"
}
}
if([methodParams]){
mutate {
# opt
add_field => {
"methodParams" => "%{[logDataJson][methodParams]}"
"methodResponse" => "%{[logDataJson][methodResponse]}"
}
}
}
if([authProcess]){
mutate {
# sso
add_field => {
"authType" => "%{[logDataJson][authType]}"
"authProcess" => "%{[logDataJson][authProcess]}"
"authUser" => "%{[logDataJson][authUser]}"
"authStatus" => "%{[logDataJson][authStatus]}"
}
}
}
mutate {
remove_field=>["logData","tags","_source","message","logDataJson","event"]
}
}
if([optTime]) {
date {
match => ["optTime", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
}
}
}
output {
stdout { codec=> rubydebug }
if "接口日志" in [optType] {
elasticsearch {
hosts => "192.168.2.8:9200"
index => "app-log-api-%{+YYYY.MM.dd}"
}
}
if "操作日志" in [optType] {
elasticsearch {
hosts => "192.168.2.8:9200"
index => "app-log-opt-%{+YYYY.MM.dd}"
}
}
if "认证日志" in [optType] {
elasticsearch {
hosts => "192.168.2.8:9200"
index => "app-log-sso-%{+YYYY.MM.dd}"
}
}
}

对应的log4j2.yml配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Configuration:
Appenders:
Socket:
- name: LOGSTASH
host: 192.168.2.8
port: 9699
protocol: TCP
thresholdFilter:
level: INFO
# 大于等于 info 放行
onMatch: ACCEPT
# 低于 info 拦截
onMismatch: DENY
PatternLayout:
pattern: "%d{yyyy-MM-dd HH:mm:ss,SSS}:%-6p ${sys:PID} [thread-%T] [%t] %c{1.}: %m%n%xwEx"

app-kafka.conf配置

1
2
3
4
5
6
7
8
9
10
11
12

input {
kafka {
group_id => "logstash"
# 此处的topics_pattern => 应该是需要和 filebeat 中配置的topic一致。
topics_pattern => "TOPIC_LOG_API"
# 配置Kafka服务器集群地址,多个用,隔开
bootstrap_servers => "192.168.2.8:9092"
auto_offset_reset => "latest"
codec => "json"
}
}

重启脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
cat > /opt/script/restart-logstash << EOF
#!/bin/bash
pid=\$(ps -ef|grep /opt/logstash |grep -v grep|awk '{print \$2}')
if [ -z "\$pid" ]
then
echo '/opt/logstash not start '
else
kill -9 \$pid
echo '/opt/logstash shutdown'
fi
cd /opt/
nohup /opt/logstash/bin/logstash -f /opt/logstash/config/app-log4j2.conf >nohup-logstash.log 2>&1 &

echo '/opt/logstash/bin/logstash -f /opt/logstash/config/app-log4j2.conf is start '
tail -f nohup-logstash.log
EOF
chmod +x /opt/script/*
cat /opt/script/restart-logstash

注意:使用 cat 输出到文件中的执行脚本 $ 需要改为 \$

本文地址: https://github.com/maxzhao-it/blog/post/907423512/