在这篇博文中,您将学习如何使用 Logstash 将特定事件写入特定索引。 Logstash 是一个免费且开放的服务器端数据处理管道,它从多个来源摄取数据,对其进行转换,然后将其发送到您最喜欢的“stash”,在此示例设置中,它是一个 弹性搜索.
Logstash 可以配置为根据条件将特定事件写入特定索引。 跟随以了解如何操作。
Logstash:将特定事件写入特定索引
那么,如何配置 Logstash 将特定事件写入特定的 Elasticsearch 索引? 好吧,如果您要将事件转发到 Logstash,那么在转发到存储此类 Elasticsearch 以进行索引之前,您肯定必须有一些过滤器来解析或提取特定字段。
在我们之前的指南中,我们介绍了使用 Logstash 处理各种日志的各种教程;
如何使用 Logstash 解析 SSH 日志
在 ELK Stack 上处理和可视化 ModSecurity 日志
在 ELK Stack 上可视化 WordPress 用户活动日志
为了演示如何使用 Logstash 将特定事件写入特定索引,我们将考虑有关如何使用 Logstash 解析 SSH 日志的指南。
考虑下面解析 SSH 身份验证事件的 Logstash 配置文件;
- 无效用户登录失败
May 20 20:18:59 elk sshd[1831]: Failed password for invalid user admin from 192.168.59.1 port 41150 ssh2
- 接受密码
May 20 20:20:32 elk sshd[1863]: Accepted password for root from 192.168.59.1 port 41174 ssh2
- 有效用户的密码失败
May 20 20:22:05 elk sshd[1967]: Failed password for kifarunix from 192.168.59.1 port 41190 ssh2
input { beats { port => 5044 } } filter { # parses Successful login events grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}s+%{IPORHOST:dst_host}s+%{WORD:syslog_program}[d+]:s+(?<status>Accepteds+password)s+fors+%{USER:auth_user}s+froms+%{SYSLOGHOST:src_host}.*" } add_field => { "activity" => "SSH Logins" } add_tag => "successful_login" } # parses Failed login events for valid users grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}s+%{IPORHOST:dst_host}s+%{WORD:syslog_program}[d+]:s+(?<status>Faileds+password)s+fors+%{USER:auth_user}s+froms+%{SYSLOGHOST:src_host}.*" } add_field => { "activity" => "SSH Logins" } add_tag => "failed_login" } # parses Failed login events for invalid users grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}s+%{IPORHOST:dst_host}s+%{WORD:syslog_program}[d+]:s+(?<status>Faileds+password)s+fors+invalids+users+%{USER:auth_user}s+froms+%{SYSLOGHOST:src_host}.*" } add_field => { "activity" => "SSH Logins" } add_tag => "invalid_users" } } output { elasticsearch { hosts => ["localhost:9200"] manage_template => false index => "ssh_auth-%{+YYYY.MM}" } }
根据 Logstash 输出配置部分;
output { elasticsearch { hosts => ["localhost:9200"] manage_template => false index => "ssh_auth-%{+YYYY.MM}" } }
所有事件都发送到一个索引,称为 ssh_auth-%{+YYYY.MM}
.
Logstash:将特定事件写入特定索引
现在,我们想编写事件 有效用户登录失败, 无效用户的登录失败事件, 和 成功登录事件 到个别指数。
Logstash 支持可以帮助解决此问题的使用条件。
为了轻松演示如何使用 Logstash 将特定事件写入特定索引,我们将配置每个 grok 模式正则表达式,为事件添加一个标签,将事件彼此分开。
例如,对于成功的登录事件,一个标签, 成功登录, 将被添加到事件中,同样适用于 有效用户登录失败 和 无效用户的登录失败事件 事件增加了 登录失败 和 invalid_users 分别标记。
因此,我们将使用 if 语句修改我们的 Logstash 输出,如下所示;
output { if "successful_login" in { elasticsearch { hosts => ["localhost:9200"] index => "ssh-success-logins-%{+YYYY.MM}" } } else if "failed_login" in { elasticsearch { hosts => ["localhost:9200"] index => "ssh-failed-logins-%{+YYYY.MM}" } } else if "invalid_users" in { elasticsearch { hosts => ["localhost:9200"] index => "ssh-invalid-users-%{+YYYY.MM}" } } else { elasticsearch { hosts => ["localhost:9200"] index => "other-ssh-events-%{+YYYY.MM}" } } }
这将导致 Logstash 写入 成功登录事件 到 Elasticsearch 索引调用 ssh-success-logins-*, 登录失败事件 索引调用 ssh-failed-logins-*, 无效的用户登录事件 索引调用 ssh-无效用户-* 和 任何其他事件 到 其他 ssh 事件-* 指数。
检查 Logstash 配置语法
Save Logstash 配置文件并使用命令运行语法检查;
/usr/share/logstash/bin/logstash --path.settings /etc/logstash -t
Configuration OK [2021-05-20T21:50:10,111][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
如果你得到 配置OK 那么你就可以走了。
Logstash 现在已准备好接收来自 Filebeat 的事件。
假设您已经设置了 Filebeat 并执行了 ssh 身份验证事件,那么您导航到 Kibana 并验证您是否已创建 Elasticsearch 索引;
您现在可以继续创建 Kibana 索引, Kibana > 索引模式 > 创建索引模式.
来自发现;
好了,您已经将不同的事件写入了不同的 Elasticsearch 索引。
这标志着 Logstash 教程的结束:将特定事件写入特定索引。
其他相关教程
配置 Filebeat-Logstash SSL/TLS 连接的简单方法
如何调试 Logstash Grok 过滤器
将 Wazuh Manager 与 ELK Stack 集成