Von FHEM über Fluent in Elasticsearch

Von FHEM über Fluent in Elasticsearch

Im 3. Teil wird es nun für die FHEM Leute endlich spannend.

Dazu gibt es ein Voraussetzung: FHEM schreibt seine Readings in eine MySQL Datenbank (Modul DbLog). Und natürlich ELK und Fluent aus den Teilen ☛ Elasticsearch in Docker und ☛ Fluent an Elasticsearch.

Die Datenbank etwas verändern

Damit die Daten permanent aus MySQL in Elasticsearch landen, muss die Datenbank von FHEM etwas verändert werden. Dazu kann man entweder PHPmyadmin nutzen oder den Commandprompt.

Letztlich geht es darum, der Tabelle "history" eine neue Spalte zu spendieren, die eine eindeutige und fortlaufende Nummer enthält, damit die Datenübertragung danach filtern kann (d.h. nur neuere Einträge). Mit dem Datum geht des leider nicht so zuverlässig, da immer nur Blöcke einer festen Anzahl an Zeilen gelesen werden und die nächste Abfrage mit dem höchsten Wert der letzten Abfrage fortsetzt. Mit dem Zeitstempel geht das jedoch nicht zuverlässig, da auf "größer als" gefiltert wird.

Ich habe dazu einen Spalte "uid" vom Typ "serial" hinzu gefügt. Bitte nichts weiter als "not null" angeben, da der Typ bereits einige Constraints selbst definiert.

Das Lesen übernimmt ein Fluent Plugin Fluent Plugin SQL🖹. Das haben im vorherigen Teil bereits in unser Fluent-Docker-Image eingebaut.

1alter table history add columne uid serial not null; 

Der Befehl dauert einige Zeit, da eben alle Zeilen um einen neuen Wert ereitert werden. War aber bei mir am Ende doch nach einigen Minuten durch, ich hatte mit Stunden gerechnet.

Jetzt weitern wir die fluentd.conf um einige Zeilen (zusätzlich zu denen aus ☛ Fluent an Elasticsearch!), diese oberhalb einfügen:

 1<source>
 2  @type sql
 3
 4  host 192.168.1.151
 5  port 3306
 6  database fhem
 7  adapter mysql2
 8  username fhemusername
 9  password fhempassword
10
11  tag_prefix fhem
12
13  select_interval 44s
14  select_limit 10000
15
16  state_file /log/sql_state
17
18  <table>
19    table history
20    tag history
21    update_column uid
22    time_column TIMESTAMP
23  </table>
24</source>
25
26<match fhem.**>
27	  @type elasticsearch
28	  host 192.168.1.2
29	  port 9200
30	  logstash_format false
31	  logstash_prefix fhem
32	  logstash_dateformat %Y%m
33	  include_timestamp true 
34	  include_tag_key true
35	  tag_key @tag
36	  index_name fhem
37	  type_name fhem
38	  flush_interval 5s
39	  time_key TIMESTAMP
40	  id_key uid
41	  pipeline FHEM
42</match>

Im Detail:

Als 1. wird eine weitere (!) Source definiert, d.h. die bestehende mit Port 24224 bleibt unverändert.

Diese ist vom Type "sql".

host 192.168.1.151
port 3306
database fhem
adapter mysql2
username fhemusername
password fhempassword
# klassische Dinge: Host und Port vom MySQL Server, Datenbankname dort, User und Passwort
# und der "adapter" damit der Plugin auch mit mysql arbeitet: mysql2 heißt der

tag_prefix fhem
# TAG für die Daten innerhalb vom fluentd

select_interval 44s
select_limit 10000
# wie oft wir Daten aus der Datenbank abfragen, hier alle 44 Sekunden 
# (extra ein krummer Wert um mit einem DbLog FHEM Modul mit z.B. 60s Takt nicht in Resonanz zu kommen)
# 10000 Zeilen pro Abfrage, damit beim ersten Datenabgleich jemals alles fertig wird

state_file /log/sql_state
# dort merkt sich das Plugin die letzte höchste ID (also unsere UID)
# falls ihr mal alles neu machen wollt, diese Datei löschen

<table>
  table history
  tag history
  update_column uid
  time_column TIMESTAMP
</table>
# unsere FHEM Datentabelle "history"
# der fluentd interne Tag, heißt dann fhem.history (vgl. oben tag_prefix)
# die Spalte "uid" auf die wir filtern
# und der Name der Spalte mit dem Zeitstempel

Diesesmal haben wir einen Match auf einen Tag fhem.*. Das ist auch der Grund, weshalb dieser neue Teil oberhalb des bestehenden eingefügt werden muss. Fluent wählt den Match aus, der in der Datei zu erst trifft und überspringt die anderen. D.h. der *.** Match würde ansonsten unsere Daten abgreifen. Type ist "elasticsearch", klar.

host 192.168.1.2
port 9200
# wir gehabt, Host und Port vom ELK

logstash_format false
# diesesmal KEIN Logstash

include_timestamp true 
include_tag_key true
# Zeit und Tag wollen wir

tag_key @tag
# hier ein anderer Key für den Tag "@tag" - könnte auch "tag" oder eben wieder "@log_name" sein

index_name fhem
type_name fhem
# wie der Index in ELK heißen soll

time_key TIMESTAMP
# Name der Keys mit der Zeit, eben der TIMESTAMP Spalte aus der Tabelle
id_key uid
# und noch der Name der UID Spalte, so werden doppelte Einträge in ELK vermieden

pipeline FHEM
# dazu kommen wir später, wem das zu komplex ist, kann die Zeile auch weg lassen

flush_interval 5s
# wie lange fluentd die Daten erstmal im Speicher hält bevor sie an EL gehen

Bevor wir diese Konfig aktivieren, müssen wir über Pipelines in ELK reden. Das ist nämlich diese Option "pipeline FHEM".

Die Daten aus FHEM sind Strings. Auch die Zahlen. Und das ist ELK nicht egal. Mit Strings macht ELK keine Zahlenakrobatik. Aber genau die wollen wir.

Dazu gibt es u.a. die Idee der Pipelines in ELK. Dort kann man Daten während des Einlesens bearbeiten. Sind sie jedoch einmal gelesen und gespeichert, wird das schwieriger - und langsamer. Geht aber auch, erkläre ich allerdings nicht hier.

Zu den Pipelines. Diese "FHEM" Pipeline erzeugen wir jetzt.

  1. Dazu Kibana (http://euer-host:5601) öffnen.
  2. Unten Mitte Rechts gibt es den Punkt "Stack Management", auswählen.
  3. Links im Menü gibt es dann "Ingest" und "Ingest Pipelines", letztere auswählen.
  4. Rechts "Create Pipeline".
  5. Als Name dann "FHEM" (groß bzw. passend zur pipeline Option).
  6. Dann auf "Add Processor".
  7. Als Typ "Convert" wählen.
  8. In Field dann "VALUE" (groß, weil in MySQL History das so heißt).
  9. Type auf "Double".
  10. Target field dann z.B. "VALUE_NUM".
  11. "Ignore missing" aktivieren.
  12. "Ignore failure" aktivieren ebenfalls (wichtig!).
  13. Unten auf "Add".
  14. Zuletzt auf "Create pipeline".

Das war es eigentlich schon. Es wurde eine Pipeline "FHEM" erzeugt, die den Inhalt der Spalte "VALUE" in eine neue Spalte "VALUE_NUM" als Double-Zahl abspeichert. Falls dies nicht geht, z.B. weil Strings, ist ihr das egal.

Jetzt kann Fluent mit dem neuen Konfig-File gestartet werden. Ggf. erst das Image bauen:

1DOCKER_BUILDKIT=1 docker build -t myfluentd:latest --rm -f Dockerfile .

Nach eingen Minuten sollten dann die ersten Daten im Kibana auftauchen und dieses File "sql_state" auch da sein.

Im "Stack Management" muss unter "Index Management" der Index "FHEM" aufgetaucht sein. Weiter unten unter "Kibana" -> "Index Patterns" jetzt noch ein Pattern (Create Pattern Index) mit dem Pattern "fhem*" und dem Timestamp Field "@timestamp" erzeugen.

Und schon kann es unter "Analytics" -> "Discovery" in die Werte von FHEM gehen. Naja, vermutlich sind erst nach einigen Stunden ausreichend Daten da und nach Tagen alle. Aber einfach schonmal spielen geht recht schnell.