les patterns :

Nous allons voir comment utiliser les patterns, qui permettent de rendre plus lisible le fichier de configuration de logstash, évite les erreurs.
Nous prendrons un fichier de pattern fournit par "mesimeris" sur GitHub.
Je place le fichier "patterns" dans /etc/logstash/patterns, voici à quoi il ressemble :

# NOTE: These patterns take into account the additional log-line information passed to the logstash listener from rsyslog. YMMV.

DHCPD ((%{SYSLOGTIMESTAMP:timestamp})\s*(%{HOSTNAME:hostname})\s*dhcpd\S+\s*(%{WORD:dhcp_action})?.*[for|on] (%{IPV4:dhcp_client_ip})?.*[from|to] (%{COMMONMAC:dhcp_client_mac})?.*via (%{USERNAME:interface}))
#IPTABLES ((%{SYSLOGTIMESTAMP:nf_timestamp})\s*(%{HOSTNAME:nf_host})\s*kernel\S+\s*(%{WORD:nf_action})?.*IN=(%{USERNAME:nf_in_interface})?.*OUT=(%{USERNAME:nf_out_interface})?.*MAC=(%{COMMONMAC:nf_dst_mac}):(%{COMMONMAC:nf_src_mac})?.*SRC=(%{IPV4:nf_src_ip}).*DST=(%{IPV4:nf_dst_ip}).*PROTO=(%{WORD:nf_protocol}).?*SPT=(%{INT:nf_src_port}?.*DPT=%{INT:nf_dst_port}?.*))
#Gestion de IPv4 et v6
IPTABLES ((%{SYSLOGTIMESTAMP:nf_timestamp})\s*(%{HOSTNAME:nf_host})\s*kernel\S+\s*(%{WORD:nf_action})?.*IN=(%{USERNAME:nf_in_interface})?.*OUT=(%{USERNAME:nf_out_interface})?.*MAC=(%{COMMONMAC:nf_dst_mac}):(%{COMMONMAC:nf_src_mac})?.*SRC=(%{IP:nf_src_ip}).*DST=(%{IP:nf_dst_ip}).*PROTO=(%{WORD:nf_protocol}).?*SPT=(%{INT:nf_src_port}?.*DPT=%{INT:nf_dst_port}?.*))
DNS ((%{MONTHDAY:day})-(%{MONTH:month})-(%{YEAR:year}) (%{TIME:timestamp}) client (%{IPV4:dns_client_ip})#(%{NONNEGINT:dns_uuid})?.*query: (%{HOSTNAME:dns_dest}) (%{WORD:dns_type}) (%{WORD:dns_record})?.*(%{IPV4:dns_server}))
PGSQL ((%{SYSLOGTIMESTAMP:pgsql_timestamp}) (%{HOSTNAME:pgsql_hostname})?.*SAST >(%{WORD:pgsql_severity}):  (%{GREEDYDATA:pgsql_message}))
SQUID ((%{DATA:squid_timestamp_unix}) (%{INT:squid_time_elapsed_ms}) (%{IPV4:squid_client_ip}) (%{DATA:squid_action})/(%{INT:squid_reply_code}) (%{INT:squid_request_size}) (%{WORD:squid_method}) (%{URIPROTO:squid_request_protocol})://(%{DATA:squid_request_hostname}) - (%{WORD:squid_heirarchy})/(%{IPV4:squid_dest_ip}) (%{WORD:squid_content_type})/(%{WORD:squid_content_object}))
RADIUSINFO ((%{SYSLOGTIMESTAMP:radius_timestamp}) (%{YEAR}) : (%{WORD:radius_severity}): (%{GREEDYDATA:radius_messsage}))
RADIUSERROR ((%{SYSLOGTIMESTAMP:radius_timestamp}) (%{YEAR}) : (%{WORD:radius_severity}): (%{DATA:radius_module}): (%{GREEDYDATA:radius_messsage}))
RADIUSAUTHOKMAC ((%{SYSLOGTIMESTAMP:radius_timestamp}) (%{YEAR}) : (%{WORD:radius_severity}): (%{GREEDYDATA:radius_login_status}) \[(%{USERNAME:radius_user})/(%{USERNAME:radius_password})\] \(from client (%{GREEDYDATA:radius_from_client}) port (%{INT:radius_nas_port}) (%{WORD}) (%{GREEDYDATA:radius_mac})\))
RADIUSAUTHOK ((%{SYSLOGTIMESTAMP:radius_timestamp}) (%{YEAR}) : (%{WORD:radius_severity}): (%{GREEDYDATA:radius_login_status}) \[(%{USERNAME:radius_user})/(%{USERNAME:radius_password})\] \(from client (%{GREEDYDATA:radius_from_client}) port (%{INT:radius_nas_port}))
RADIUSAUTHFAIL ((%{SYSLOGTIMESTAMP:radius_timestamp}) (%{YEAR}) : (%{WORD:radius_severity}): (%{GREEDYDATA:radius_login_status}): \[(%{USERNAME:radius_user})/(%{USERNAME:radius_password})\] \(from client (%{GREEDYDATA:radius_from_client}) port (%{INT:radius_nas_port}))

Il suffit d'ajouter la directive "patterns_dir" dans le filtre "grok" :

grok {
    patterns_dir => ["/etc/logstash/patterns"]
    match => { "message" => "%{IPTABLES}"}
    add_tag => [ "iptables", "iptables-drop"]
}

Donc on charge les patterns, on regarde dans le message si la regex IPTABLES match.

La Géo-localisation :

Cela peut être intéressant pour les logs Apache, Iptables, fail2ban, ... enfin tout ce qui une adresse IP.
Pour cela il faut une base de Géo-localisation, Max Mind, pour les tests j'ai utilisé GeoLiteCity.dat.
Copier "GeoLiteCity.dat" dans /etc/logstash/.

Dans votre fichier de configuration logstash, il suffit d'ajouter les lignes suivantes dans la section "filter" :

geoip {
      source => "clientip"
      database => "/etc/logstash/GeoLiteCity.dat"
   }

Dans cette exemple, on indique que le champs contenant l'adresse IP à Géo-localiser est "clientip" (le parser de mes logs apache) et on fournit le chemin vers la database.

Status d'Elasticsearch :

Status du cluster :

{
  "cluster_name" : "elasticsearch",
  "status" : "yellow",
  "timed_out" : false,
  "number_of_nodes" : 2,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 26,
  "active_shards" : 26,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 26
}

Status d'un indice :

{"cluster_name":"elastic search",
"status":"yellow",
"timed_out":false,
"number_of_nodes":2,
"number_of_data_nodes":1,
"active_primary_shards":5,
"active_shards":5,
"relocating_shards":0,
"initializing_shards":0,
"unassigned_shards":5
}

On s'aperçoit que le status est Yellow (les trois status sont : green, yellow et red), après recherche c'est le fait qu'il y ait des "unassigned_shards" qui découle de ma configuration de base à savoir une réplication de configuré mais avec un seul host dans le cluster. Deux solutions soit on ajoute un host dans le cluster soit on désactive la réplication.
Dans un premier temps nous allons désactiver la réplication.

Pour cela il y a deux choses à faire :

curl -XPUT 'localhost:9200/_settings' -d '  {  "index" : { "number_of_replicas" : 0 } }'
{"acknowledged":true}
--SNiP--
vi /etc/elasticsearch/elasticsearch.yml
# Note, that for development on a local machine, with small indices, it usually
# makes sense to "disable" the distributed features:
#
#index.number_of_shards: 1
index.number_of_replicas: 0
#Decommenter la ligne ci-dessus

--SNiP--
service elasticsearch restart

Vérification (il y a quelques secondes/minutes avant de passer Green suivant le nombre d'indices) :

{
  "cluster_name" : "elasticsearch",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 2,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 26,
  "active_shards" : 26,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0
}

Si vous voulez voir l'état de votre/vos cluster(s) en WebUI il y a Elasticsearch-HQ.