logstash out file to HDFS

logstash out file to HDFS

logstash 直接把文件内容写入 hdfs 中, 并支持 hdfs 压缩格式。
logstash 需要安装第三方插件,webhdfs插件,通过hdfs的web接口写入。
http://namenode00:50070/webhdfs/v1/ 接口

安装

可以在官网找到相应的版本, 我们用的是2.3.1,下载地址:

https://www.elastic.co/downloads/past-releases  

webhdfs插件地址

github地址:
  git clone  https://github.com/heqin5136/logstash-output-webhdfs-discontinued.git

官网地址及使用说明:
  https://www.elastic.co/guide/en/logstash/current/plugins-outputs-webhdfs.html

插件安装方式:

logstash 安装在 /home/mtime/logstash-2.3.1
git clone  https://github.com/heqin5136/logstash-output-webhdfs-discontinued.git
cd logstash-output-webhdfs-discontinued
/home/mtime/logstash-2.3.1/bin/plugin install logstash-output-webhdfs-discontinued

检查hdfs的webhds接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
   curl -i  "http://namenode:50070/webhdfs/v1/?user.name=hadoop&op=LISTSTATUS"   

HTTP/1.1 200 OK
Cache-Control: no-cache
Expires: Thu, 13 Jul 2017 04:53:39 GMT
Date: Thu, 13 Jul 2017 04:53:39 GMT
Pragma: no-cache
Expires: Thu, 13 Jul 2017 04:53:39 GMT
Date: Thu, 13 Jul 2017 04:53:39 GMT
Pragma: no-cache
Content-Type: application/json
Set-Cookie: hadoop.auth="u=hadoop&p=hadoop&t=simple&e=1499957619679&s=KSxdSAtjXAllhn73vh1MAurG9Bk="; Path=/; Expires=Thu, 13-Jul-2017 14:53:39 GMT; HttpOnly
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

注释: active namenode 返回是200 ,standby namenode 返回是403.

配置

添加 logstash 一个配置文件

vim /home/mtime/logstash-2.3.1/conf/hdfs.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
input {
kafka {
zk_connect => "192.168.51.191:2181,192.168.51.192:2181,192.168.51.193:2181" ## kafka zk 地址
group_id => 'hdfs' # 消费者组
topic_id => 'tracks' # topic 名字
consumer_threads => 1
codec => 'json'
}
}

filter { ## 为解决 插入hdfs时间相差8小时,
date {
match => [ "time" , "yyyy-MM-dd HH:mm:ss" ]
locale => "zh"
timezone => "-00:00:00"
target => "@timestamp"
}
}

output {
webhdfs {
workers => 1
host => "namenode"
standby_host => "standbynamenode"
port => 14000
user => "loguser"
path => "/Service-Data/%{+YYYY}-%{+MM}-%{+dd}/%{app}/logstash-%{+HH}.log"
flush_size => 10000
idle_flush_time => 10
compression => "gzip"
retry_interval => 3
codec => 'json' # 解决 写入hdfs文件是json格式,否则内容为 %{message}
}
# stdout { codec => rubydebug } # 打开日志
}

关于hdfs部分配置,可以在 plugins-outputs-webhdfs 官网找到。

启动 logstart

cd /home/mtime/logstash-2.3.1/bin/
./logstash -f ../conf/hdfs.conf    # 为前台启动 

问题总结

  • 新版logstash已经支持webhdfs插件,可以直接安装啦。
1
./logstash-plugin install logstash-output-webhdfs
  • 报错处理
1
[WARN ][logstash.outputs.webhdfs ] Failed to flush outgoing items {:outgoing_count=>160, :exception=>"WebHDFS::IOError",

我将hdfs端口 由原来的50070 改为 14000 端口,就在不报错了。
官方提供的例子中用的就是50070端口,一直没有尝试14000端口。

还有:

1
because this file lease is currently owned by DFSClient

hadoop 租约问题,后期正常就没有了。
执行recoverLease来释放文件的锁

1
$ hdfs debug recoverLease -path /logstash/2017/02/10/go-03.log

还有:

1
:message=>"webhdfs write caused an exception: {\"RemoteException\":{\"message\":\"Failed to APPEND_FILE

当一个进程在读写这个文件的时候,另一个进程应该是不能同时写入的。
我们由原来3个logstash同时消费,改为了1个logstash消费,不在报错了。
这个应该也可以通过有话写入hdfs参数来解决。

还有:

1
Max write retries reached. Exception: initialize: name or service not known {:level=>:error}

losgstash 需要能解析所有 hadoop 集群所有节点的主机名。

感觉文章还可以的话,帮忙点点下面的广告哦! 谢谢支持!

感谢您的支持!