logstash 配置文件写法

logstash 配置文件

开启http接口,并把收集到的日志放入ES中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
input {
http {
host => "0.0.0.0"
port => 7881 # 开启端口
codec => json # 格式化 json
add_field => { # 添加字段,在接受到的每条日志中添加 marathon:base-marathon 一个字段
"marathon" => "base-marathon"
}
}
}

## 判断日志中包含 oam_type 的key 放到相应的ES索引中。192.168.5
output {
if [oam_type] == "hadoop" {
elasticsearch {
hosts => ["192.168.5.27:9250","192.168.5.28:9250","192.168.5.29:9250"]
index => "logstash-cd-hadoop-%{+YYYY.MM.dd}"
flush_size => 10000
idle_flush_time => 60
template_overwrite => true
}
} else if [oam_type] == "kafka" {
elasticsearch {
hosts => ["192.168.5.27:9250","192.168.5.28:9250","192.168.5.29:9250"]
index => "logstash-cd-kafka-%{+YYYY.MM.dd}"
flush_size => 10000
idle_flush_time => 60
template_overwrite => true
}
} else if [oam_type] == "es" {
elasticsearch {
hosts => ["192.168.5.27:9250","192.168.5.28:9250","192.168.5.29:9250"]
index => "logstash-cd-es-%{+YYYY.MM.dd}"
flush_size => 10000
idle_flush_time => 60
template_overwrite => true
}
} else {
elasticsearch {
hosts => ["192.168.5.27:9250","192.168.5.28:9250","192.168.5.29:9250"]
index => "logstash-cd-marathon-%{+YYYY.MM.dd}"
flush_size => 10000
idle_flush_time => 60
template_overwrite => true
}
}
}
从kafka中取出nginx日志,放入到HDFS上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
input {
kafka {
zk_connect => "10.10.110.122:2181,10.10.110.123:2181,10.10.110.124:2181/kafka"
group_id => "logstash-kafka-hdfs"
topic_id => "prd_nginx_access"
codec => plain
reset_beginning => false # boolean (optional), default: false
consumer_threads => 1 # number (optional), default: 1
decorate_events => false # boolean (optional), default: false
}
}

# 格式化,将十二个月转换成数字。
filter {
grok {
match => {
"message" =>"^(?<hostname>.+?)\s(?<modulname>.+?)\s(?<remote_addr>.+?)\s\-\s(?<remote_user>.+?)\s\[(?<Day>.+?)/(?<Month>.+?)/(?<Year>.+?):(?<Hour>.+?):"
}
}
if [Month] == "Jan" {
mutate {
update => ["Month","01"]
}
} else if [Month] == "Feb" {
mutate {
update => ["Month","02"]
}
} else if [Month] == "Mar" {
mutate {
update => ["Month","03"]
}
} else if [Month] == "Apr" {
mutate {
update => ["Month","04"]
}
} else if [Month] == "May" {
mutate {
update => ["Month","05"]
}
} else if [Month] == "Jun" {
mutate {
update => ["Month","06"]
}
} else if [Month] == "Jul" {
mutate {
update => ["Month","07"]
}
} else if [Month] == "Aug" {
mutate {
update => ["Month","08"]
}
} else if [Month] == "Sep" {
mutate {
update => ["Month","09"]
}
} else if [Month] == "Oct" {
mutate {
update => ["Month","10"]
}
} else if [Month] == "Nov" {
mutate {
update => ["Month","11"]
}
} else if [Month] == "Dec" {
mutate {
update => ["Month","12"]
}
}
}




output {
if [modulname] {
webhdfs {
workers => 1
host => "namenode-master.host.com"
port => 14000
user => "hadoop"
path => "/Data/Logs/domain=%{modulname}/dt=%{Year}%{Month}%{Day}/hour=%{Hour}/%{modulname}_%{Year}%{Month}%{Day}%{Hour}.log"
flush_size => 5000
compression => "gzip"
idle_flush_time => 6
retry_interval => 3
retry_times => 3
codec => line {
format => "%{message}"
}
}
} else {
file {
path => "/home/logs/supervisor/logstash_prd_kafka_hdfs_error.log"
codec => line { format => "custom format: %{message}" }
}
}
stdout{codec => rubydebug}
}
收集nginx日志放到kafka中

日志格式为文本, logstash 放到kafka中会变成一个大的json串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
input {
file {
path => ["/home/nginx/logs/accesslog/**/*.log"]
exclude => ["/home/nginx/logs/accesslog/11.test.com/*.log","/home/nginx/logs/accesslog/2.test.com/*.log","/home/nginx/logs/accesslog/3.test.com/*.log"]
sincedb_path => "/home/optools/logstash/sincedb"
start_position => "beginning"
discover_interval => 10
close_older => 3600
ignore_older => 86400
sincedb_write_interval => 5
stat_interval => 1
}
}

output {
kafka {
bootstrap_servers => "kafka00:9092,kafka01:9092,kafka02:9092"
topic_id => "prd_nginx_access"
compression_type => "gzip"
codec => plain {
format => "%{message}"
}
}
}
处理json日志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
input {
file {
path => ["/home/logs/v4-weblog/*.log"]
sincedb_path => "/home/logstash/conf/sincedb"
start_position => "beginning"
codec => "json" # 往后端传是json,如果后端要文本,codec => "plain"
discover_interval => 10
close_older => 3600
ignore_older => 86400
sincedb_write_interval => 5
stat_interval => 1
}
}

output {
kafka {
topic_id => "weblogv4_mx_wandafilm"
bootstrap_servers => "192.168.5.30:9092,192.168.5.38:9092,192.168.5.48:9092"
codec => plain {
format => "%{message}"
}
}

#stdout{
# codec => rubydebug
#}
}
filter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
input {
kafka {
zk_connect => "192.168.5.18:2181,192.168.5.28:2181,192.168.5.30:2181,192.168.5.38:2181,192.168.5.48:2181"
group_id => "huawei_hard_monitor"
topic_id => "huawei_hard_monitor"
codec => json
reset_beginning => false # boolean (optional), default: false
consumer_threads => 1 # number (optional), default: 1
decorate_events => false # boolean (optional), default: false
}
}

filter {
grok {
match => {
"SNMPv2-SMI::enterprises.2011.23.2.1" => "^Location:(?<Location>.*?); Time:(?<Time>.*?); Sensor:(?<Sensor>.*?); Severity:(?<Severity>.*?); Code:(?<Code>.*?); Description:(?<Description>.*?)$"
}
}
mutate {
rename => ["SNMPv2-MIB::sysUpTime.0", "SNMPv2-MIB--sysUpTime-0"]
rename => ["SNMPv2-MIB::snmpTrapOID.0", "SNMPv2-MIB--snmpTrapOID-0"]
rename => ["SNMPv2-SMI::enterprises.2011.23.2.1", "SNMPv2-SMI--enterprises_2011_23_2_1"]
}
}

output {
elasticsearch {
workers => 4
hosts => ["192.168.5.27:9250","192.168.5.28:9250","192.168.5.29:9250"]
index => "logstash-huawei_hard_monitor-%{+YYYY.MM.dd}"
flush_size => 50000
idle_flush_time => 30
template_overwrite => true
}
# stdout{codec => rubydebug}
}
nginx 日志收集

中文转码,\x 转为Xx \\x 转为 XXx
添加字段,nginx access 和 error 日志放在不同索引中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
input {
kafka {
zk_connect => "192.168.5.30:2181,192.168.5.38:2181,192.168.5.48:2181,192.168.5.18:2181,192.168.5.28:2181"
group_id => "logstash-docker-nginx"
topic_id => "test_for_docker"
codec => json
reset_beginning => false # boolean (optional), default: false
consumer_threads => 4 # number (optional), default: 1
decorate_events => false # boolean (optional), default: false
}
}


filter {
ruby {
code => "
event['log'] = event['log'].gsub('\x','Xx')
event['log'] = event['log'].gsub('\\x','XXx')
"
}

if "http_cookie" in [log] {
mutate { add_tag => "nginx-access" }
json {
source => "log"
}
mutate {
convert => [
"status", "integer",
"body_bytes_sent" , "integer",
"upstream_response_time", "float",
"request_time", "float"
]
remove_field => "log"
}
geoip {
source => "ip"
}
date {
match => ["time_local", "ISO8601"]
locale =>"en"
}
} else {
mutate { add_tag => "nginx-error" }
}
}

output {
if "nginx-access" in [tags] {
elasticsearch {
workers => 4
hosts => ["192.168.5.27:9250","192.168.5.28:9250","192.168.5.29:9250"]
index => "logstash-nginxaccess-%{+YYYY.MM.dd}"
flush_size => 10240
idle_flush_time => 30
template_overwrite => true
}
} else if "nginx-error" in [tags] {
elasticsearch {
workers => 4
hosts => ["192.168.5.27:9250","192.168.5.28:9250","192.168.5.29:9250"]
index => "logstash-nginxerror-%{+YYYY.MM.dd}"
flush_size => 100
idle_flush_time => 5
template_overwrite => true
}
}

# stdout{codec => rubydebug}
}
转换时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
input {
kafka {
zk_connect => "192.168.5.18:2181,192.168.5.28:2181,192.168.5.30:2181,192.168.5.38:2181,192.168.5.48:2181"
group_id => "es-hdfs"
topic_id => "logdata-es"
codec => json
reset_beginning => false # boolean (optional), default: false
consumer_threads => 1 # number (optional), default: 1
decorate_events => false # boolean (optional), default: false
}
}

filter {
date {
match => [ "time" , "yyyy-MM-dd HH:mm:ss" ]
locale => "zh"
timezone => "-00:00:00"
target => "@timestamp"
}
}

output {
elasticsearch {
hosts => ["192.168.5.27:9250","192.168.5.28:9250","192.168.5.29:9250"]
index => "logstash-%{app}-%{+YYYY.MM.dd}"
#document_type => "%{type}"
flush_size => 3840
idle_flush_time => 10
template_overwrite => true
}
# stdout { codec => rubydebug }
}

一个nginx json 日志收集的例子

nginx 配置日志格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
log_format main   '{"@timestamp":"$time_iso8601",'
'"@source":"$server_addr",'
'"hostname":"$hostname",'
'"remote_user":"$remote_user",'
'"ip":"$http_x_forwarded_for",'
'"client":"$remote_addr",'
'"request_method":"$request_method",'
'"scheme":"$scheme",'
'"domain":"$server_name",'
'"referer":"$http_referer",'
'"request":"$request_uri",'
'"requesturl":"$request",'
'"args":"$args",'
'"size":$body_bytes_sent,'
'"status": $status,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamaddr":"$upstream_addr",'
'"http_user_agent":"$http_user_agent",'
'"http_cookie":"$http_cookie",'
'"https":"$https",'
'"request_body":"$request_body",'
'"http_x_clientid":"$http_x_clientid"'
'}';

logstash 手机nginx日志,并处理转码问题。(需要2.4版本之上,才能支持这个正则)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
input {
file {
path => ["/home/nginx/logs/accesslog/**/*.log"]
codec => json
sincedb_path => "/home/logstash/sincedb"
start_position => "beginning"
discover_interval => 30
close_older => 3600
ignore_older => 86400
sincedb_write_interval => 10
stat_interval => 1
}
}

filter {
ruby {
code => "if event.get('message').include?('\x') then
event.set('message', event.get('message').gsub(/\\x([0-9A-F]{2})/) {
case $1
when '22'
'\\"'
when '0D'
'\\r'
when '0A'
'\\n'
when '27'
'\\\''
when '5C'
'\\\\'
else
$1.hex.chr
end
})
end"
}
json {
source => "message"
}

mutate {
remove_field =>["message"]
}
}

output {
kafka {
bootstrap_servers => "192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092"
topic_id => "mtopic_name"
compression_type => "gzip"
}
#stdout { codec => rubydebug }
}

感觉文章还可以的话,帮忙点点下面的广告哦! 谢谢支持!

感谢您的支持!