制作rsyslog服务将日志输入到kafka

有一些服务,需要将日志打到UDP端口, 其实也可以打到本机的rsyslog服务,但是我的服务已经上容器了, 还不想没个容器都封装rsyslog服务,也不像挂载, 就想着把rsyslog封装成为一个服务发布。谁都可以调用。

那么问题来了, rsyslog将日志怎么处理,怎么区分?

  • 我这是比较固定的日志格式, 不用做过多处理,
  • rsyslog 将收到的日志,直接抓发到kafka上。本地不留数据。
  • rsyslog启多个端口,通过端口来区别放到哪个kafka topic上。
  • 也可以通过local级别来区分,我这里没做。

简单的是有 Dcokerfile

1
2
3
4
5
6
7
8
9
10
11
12
FROM centos:7
MAINTAINER Shining-YS
RUN yum install -y rsyslog rsyslog-kafka lz4 libfastjson libestr

COPY rsyslog.conf /etc/rsyslog.conf
COPY tokafka.conf /etc/rsyslog.d/tokafka.conf
COPY rsyslog /etc/sysconfig/rsyslog

EXPOSE 514

#CMD ["/usr/sbin/rsyslogd","-dn"]
CMD ["/usr/sbin/rsyslogd","-n"]

这其中包括及个文件, 我就直接列出来了

rsyslog.conf 主要是开启514端口,TCP和UDP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# rsyslog configuration file

# For more information see /usr/share/doc/rsyslog-*/rsyslog_conf.html
# If you experience problems, see http://www.rsyslog.com/doc/troubleshoot.html

#### MODULES ####

# The imjournal module bellow is now used as a message source instead of imuxsock.
$ModLoad imuxsock # provides support for local system logging (e.g. via logger command)
$ModLoad imjournal # provides access to the systemd journal
#$ModLoad imklog # reads kernel messages (the same are read from journald)
#$ModLoad immark # provides --MARK-- message capability

# Provides UDP syslog reception
$ModLoad imudp
$UDPServerRun 514

# Provides TCP syslog reception
$ModLoad imtcp
$InputTCPServerRun 514


#### GLOBAL DIRECTIVES ####

# Where to place auxiliary files
$WorkDirectory /var/lib/rsyslog

# Use default timestamp format
$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat

# File syncing capability is disabled by default. This feature is usually not required,
# not useful and an extreme performance hit
#$ActionFileEnableSync on

# Include all config files in /etc/rsyslog.d/
$IncludeConfig /etc/rsyslog.d/*.conf

# Turn off message reception via local log socket;
# local messages are retrieved through imjournal now.
$OmitLocalLogging on

# File to store the position in the journal
$IMJournalStateFile imjournal.state


#### RULES ####

# Log all kernel messages to the console.
# Logging much else clutters up the screen.
#kern.* /dev/console

# Log anything (except mail) of level info or higher.
# Don't log private authentication messages!
*.info;mail.none;authpriv.none;cron.none /var/log/messages

# The authpriv file has restricted access.
authpriv.* /var/log/secure

# Log all the mail messages in one place.
mail.* -/var/log/maillog


# Log cron stuff
cron.* /var/log/cron

# Everybody gets emergency messages
*.emerg :omusrmsg:*

# Save news errors of level crit and higher in a special file.
uucp,news.crit /var/log/spooler

# Save boot messages also to boot.log
local7.* /var/log/boot.log


# ### begin forwarding rule ###
# The statement between the begin ... end define a SINGLE forwarding
# rule. They belong together, do NOT split them. If you create multiple
# forwarding rules, duplicate the whole block!
# Remote Logging (we use TCP for reliable delivery)
#
# An on-disk queue is created for this action. If the remote host is
# down, messages are spooled to disk and sent when it is up again.
#$ActionQueueFileName fwdRule1 # unique name prefix for spool files
#$ActionQueueMaxDiskSpace 1g # 1gb space limit (use as much as possible)
#$ActionQueueSaveOnShutdown on # save messages to disk on shutdown
#$ActionQueueType LinkedList # run asynchronously
#$ActionResumeRetryCount -1 # infinite retries if host is down
# remote host is: name/ip:port, e.g. 192.168.0.1:514, port optional
#*.* @@remote-host:514
# ### end of the forwarding rule ###

rsyslog 文件

1
2
3
4
5
# Options for rsyslogd
# Syslogd options are deprecated since rsyslog v3.
# If you want to use them, switch to compatibility mode 2 by "-c 2"
# See rsyslogd(8) for more details
SYSLOGD_OPTIONS="-r -m 0"

tokafka.conf 通过端口将日志写入到kakfa不同topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
module(load="omkafka")
module(load="imudp")
module(load="imtcp")
input(type="imudp" port="514" ruleset="tokafka")
input(type="imtcp" port="514" ruleset="tokafka")

ruleset(name="tokafka") {
#输出到kafka
action(type="omkafka" topic="shining_test1" broker="kafka1:9092,kafka2:9092,hkafka3:9092" partitions.number="5")
#输出到文件
#action(type="omfile" file="/tmp/shining_test1.log")
}

ruleset(name="shining_test2") {
action(type="omkafka" topic="shining_test2" broker="kafka1:9092,kafka2:9092,hkafka3:9092" partitions.number="5")
}

ruleset(name="shining_test3") {
action(type="omkafka" topic="shining_test2" broker="kafka1:9092,kafka2:9092,hkafka3:9092" partitions.number="5")
}

ruleset(name="shining_test4") {
action(type="omkafka" topic="shining_test3" broker="kafka1:9092,kafka2:9092,hkafka3:9092" partitions.number="5")
}


input(type="imudp" port="7510" ruleset="shining_test4")
input(type="imtcp" port="7510" ruleset="shining_test4")
input(type="imudp" port="7511" ruleset="shining_test2")
input(type="imtcp" port="7511" ruleset="shining_test2")
input(type="imudp" port="7512" ruleset="shining_test3")
input(type="imtcp" port="7512" ruleset="shining_test3")

其中 omkafka 模块参数说明,可以参考:
https://rsyslog.readthedocs.io/en/latest/configuration/modules/omkafka.html

For Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#加载omkafka和imfile模块
module(load="omkafka")
module(load="imfile")

# nginx template
template(name="nginxAccessTemplate" type="string" string="%hostname%<-+>%syslogtag%<-+>%msg%\n")

# ruleset
ruleset(name="nginx-kafka") {
#日志转发kafka
action (
type="omkafka"
template="nginxAccessTemplate"
confParam=["compression.codec=snappy", "queue.buffering.max.messages=400000"]
partitions.number="4"
topic="test_nginx"
broker="localhost:9092"
queue.spoolDirectory="/tmp"
queue.filename="test_nginx_kafka"
queue.size="360000"
queue.maxdiskspace="2G"
queue.highwatermark="216000"
queue.discardmark="350000"
queue.type="LinkedList"
queue.dequeuebatchsize="4096"
queue.timeoutenqueue="0"
queue.maxfilesize="10M"
queue.saveonshutdown="on"
queue.workerThreads="4"
)
}

# 定义消息来源及设置相关的action
input(type="imfile" Tag="nginx,aws" File="/usr/local/nginx/logs/access.log" Ruleset="nginx-kafka")

docker 镜像编译

1
docker build -t rsyslog-to-kafka:latest .

启动容器,还需要映射UDP端口

1
docker run --name rsyslog-to -kafka -p 514:514 -p 514:514/udp  -p 7510: 7510 -p 7510: 7510/udp -p 7511: 7511 -p 7511: 7511/udp -p 7512: 7512 -p 7512: 7512/udp rsyslog-to-kafka:latest
感谢您的支持!