1. ELK part 6 - Tìm hiểu về logstash.
Tìm hiểu về logstash.
Nơi chứa các tài liệu tham khảo của dịch vụ Cloud365.
Ở bài trước mình đã giới thiệu về filebeat một agent được sử dụng phổ biến cho việc thu thập các bản tin nhật ký từ các máy trạm, hôm nay mình sẽ giới thiệu về Logstash, công cụ sẽ trực tiếp nhận các bản tin được đẩy về từ filebeat sau đó xử lý và đẩy vào Elasticsearch.
Logstash là một công cụ mã nguồn mở thu thập dữ liệu có khả năng liên hợp theo thời gian thực. Logstash có thể hợp nhất dữ liệu từ các nguồn khác nhau và chuẩn hóa dữ liệu ở phần xử lý tiếp theo. Loại bỏ và đồng hóa tất cả dữ liệu đó trong một số use case cần phân tích và thể hiện trên biểu đồ.
Logstash có 3 thành phần chính cũng chính là 3 bước xử lý chính của logstash đó là:
Đường ống xử lý sự kiện của Logstash có ba giai đoạn: input → filter → output. Các đầu vào tạo ra các sự kiện, bộ lọc sửa đổi chúng và các đầu ra sẽ chuyển chúng tới nơi khác. Đầu vào và đầu ra hỗ trợ codec cho phép bạn mã hóa hoặc giải mã dữ liệu khi nó vào hoặc thoát khỏi đường dẫn mà không phải sử dụng bộ lọc riêng biệt.
Ví dụ về file input:
input {
beats {
port => 5044
ssl => false
}
}
Ví dụ về file filter:
filter {
grok {
match => {
"message" => [
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?:? %{SSH_INVALID_USER:message}"
]
}
patterns_dir => "/etc/logstash/patterns/sshd"
named_captures_only => true
remove_tag => ["_grokparsefailure"]
break_on_match => true
add_tag => [ "SSH", "SSH_INVALID_USER" ]
add_field => { "event_type" => "SSH_INVALID_USER" }
overwrite => "message"
}
}
# Grok Filter for SSH Failed Password
filter{
grok {
match => {
"message" => [
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?:? %{SSH_FAILED_PASSWORD:message}"
]
}
patterns_dir => "/etc/logstash/patterns/sshd"
named_captures_only => true
remove_tag => ["_grokparsefailure"]
break_on_match => true
add_tag => [ "SSH", "SSH_FAILED_PASSWORD" ]
add_field => { "event_type" => "SSH_FAILED_PASSWORD" }
overwrite => "message"
}
}
filter {
# Grok Filter for SSH Password Accepted
grok {
match => {
"message" => [
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?:? %{SSH_ACCEPTED_PASSWORD}"
]
}
patterns_dir => "/etc/logstash/patterns/sshd"
named_captures_only => true
remove_tag => ["_grokparsefailure"]
break_on_match => true
add_tag => [ "SSH", "SSH_ACCEPTED_PASSWORD" ]
add_field => { "event_type" => "SSH_ACCEPTED_PASSWORD" }
}
}
Ví dụ về file output:
output {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
}
Như vậy ở bài này mình đã tổng quan về logstash và cách hoạt động của nó, ở series này mình đã giới thiệu đa số các thành phần cần sử dụng để triển khai ELk stack, ở bài lần sau mình sẽ hướng dẫn mọi người cách setup biểu đồ trên Kibana để có cái nhìn trực quan về những sự kiện thu thập được từ client.
Thực hiện bởi cloud365.vn
Chuỗi bài giới thiệu về giải pháp ELK
Tìm hiểu về logstash.
Tìm hiểu về filebeat.
Cài đặt ELK sử dụng kafka làm cache
Tài liệu về ELK stack
Tài liệu về ELK stack
Tài liệu về ELK stack