Von FHEM über Fluent in Elasticsearch
- Elasticsearch🖹 und Kibana (ELK) in Docker: ☛ Elasticsearch in Docker
- Einbindung von Fluentd🖹 an Elasticsearch: ☛ Fluent an Elasticsearch
- Ablegen von FHEM🖹 Daten in Elasticsearch: ☛ Von FHEM über Fluent in Elasticsearch
Von FHEM über Fluent in Elasticsearch
Im 3. Teil wird es nun für die FHEM Leute endlich spannend.
Dazu gibt es ein Voraussetzung: FHEM schreibt seine Readings in eine MySQL Datenbank (Modul DbLog). Und natürlich ELK und Fluent aus den Teilen ☛ Elasticsearch in Docker und ☛ Fluent an Elasticsearch.
Die Datenbank etwas verändern
Damit die Daten permanent aus MySQL in Elasticsearch landen, muss die Datenbank von FHEM etwas verändert werden. Dazu kann man entweder PHPmyadmin nutzen oder den Commandprompt.
Letztlich geht es darum, der Tabelle "history" eine neue Spalte zu spendieren, die eine eindeutige und fortlaufende Nummer enthält, damit die Datenübertragung danach filtern kann (d.h. nur neuere Einträge). Mit dem Datum geht des leider nicht so zuverlässig, da immer nur Blöcke einer festen Anzahl an Zeilen gelesen werden und die nächste Abfrage mit dem höchsten Wert der letzten Abfrage fortsetzt. Mit dem Zeitstempel geht das jedoch nicht zuverlässig, da auf "größer als" gefiltert wird.
Ich habe dazu einen Spalte "uid" vom Typ "serial" hinzu gefügt. Bitte nichts weiter als "not null" angeben, da der Typ bereits einige Constraints selbst definiert.
Das Lesen übernimmt ein Fluent Plugin Fluent Plugin SQL🖹. Das haben im vorherigen Teil bereits in unser Fluent-Docker-Image eingebaut.
1alter table history add columne uid serial not null;
Der Befehl dauert einige Zeit, da eben alle Zeilen um einen neuen Wert ereitert werden. War aber bei mir am Ende doch nach einigen Minuten durch, ich hatte mit Stunden gerechnet.
Jetzt weitern wir die fluentd.conf um einige Zeilen (zusätzlich zu denen aus ☛ Fluent an Elasticsearch!), diese oberhalb einfügen:
1<source>
2 @type sql
3
4 host 192.168.1.151
5 port 3306
6 database fhem
7 adapter mysql2
8 username fhemusername
9 password fhempassword
10
11 tag_prefix fhem
12
13 select_interval 44s
14 select_limit 10000
15
16 state_file /log/sql_state
17
18 <table>
19 table history
20 tag history
21 update_column uid
22 time_column TIMESTAMP
23 </table>
24</source>
25
26<match fhem.**>
27 @type elasticsearch
28 host 192.168.1.2
29 port 9200
30 logstash_format false
31 logstash_prefix fhem
32 logstash_dateformat %Y%m
33 include_timestamp true
34 include_tag_key true
35 tag_key @tag
36 index_name fhem
37 type_name fhem
38 flush_interval 5s
39 time_key TIMESTAMP
40 id_key uid
41 pipeline FHEM
42</match>
Im Detail:
Als 1. wird eine weitere (!) Source definiert, d.h. die bestehende mit Port 24224 bleibt unverändert.
Diese ist vom Type "sql".
host 192.168.1.151
port 3306
database fhem
adapter mysql2
username fhemusername
password fhempassword
# klassische Dinge: Host und Port vom MySQL Server, Datenbankname dort, User und Passwort
# und der "adapter" damit der Plugin auch mit mysql arbeitet: mysql2 heißt der
tag_prefix fhem
# TAG für die Daten innerhalb vom fluentd
select_interval 44s
select_limit 10000
# wie oft wir Daten aus der Datenbank abfragen, hier alle 44 Sekunden
# (extra ein krummer Wert um mit einem DbLog FHEM Modul mit z.B. 60s Takt nicht in Resonanz zu kommen)
# 10000 Zeilen pro Abfrage, damit beim ersten Datenabgleich jemals alles fertig wird
state_file /log/sql_state
# dort merkt sich das Plugin die letzte höchste ID (also unsere UID)
# falls ihr mal alles neu machen wollt, diese Datei löschen
<table>
table history
tag history
update_column uid
time_column TIMESTAMP
</table>
# unsere FHEM Datentabelle "history"
# der fluentd interne Tag, heißt dann fhem.history (vgl. oben tag_prefix)
# die Spalte "uid" auf die wir filtern
# und der Name der Spalte mit dem Zeitstempel
Diesesmal haben wir einen Match auf einen Tag fhem.*
. Das ist auch der Grund, weshalb dieser neue Teil oberhalb des bestehenden eingefügt werden muss. Fluent wählt den Match aus, der in der Datei zu erst trifft und überspringt die anderen. D.h. der *.**
Match würde ansonsten unsere Daten abgreifen. Type ist "elasticsearch", klar.
host 192.168.1.2
port 9200
# wir gehabt, Host und Port vom ELK
logstash_format false
# diesesmal KEIN Logstash
include_timestamp true
include_tag_key true
# Zeit und Tag wollen wir
tag_key @tag
# hier ein anderer Key für den Tag "@tag" - könnte auch "tag" oder eben wieder "@log_name" sein
index_name fhem
type_name fhem
# wie der Index in ELK heißen soll
time_key TIMESTAMP
# Name der Keys mit der Zeit, eben der TIMESTAMP Spalte aus der Tabelle
id_key uid
# und noch der Name der UID Spalte, so werden doppelte Einträge in ELK vermieden
pipeline FHEM
# dazu kommen wir später, wem das zu komplex ist, kann die Zeile auch weg lassen
flush_interval 5s
# wie lange fluentd die Daten erstmal im Speicher hält bevor sie an EL gehen
Bevor wir diese Konfig aktivieren, müssen wir über Pipelines in ELK reden. Das ist nämlich diese Option "pipeline FHEM".
Die Daten aus FHEM sind Strings. Auch die Zahlen. Und das ist ELK nicht egal. Mit Strings macht ELK keine Zahlenakrobatik. Aber genau die wollen wir.
Dazu gibt es u.a. die Idee der Pipelines in ELK. Dort kann man Daten während des Einlesens bearbeiten. Sind sie jedoch einmal gelesen und gespeichert, wird das schwieriger - und langsamer. Geht aber auch, erkläre ich allerdings nicht hier.
Zu den Pipelines. Diese "FHEM" Pipeline erzeugen wir jetzt.
- Dazu Kibana (http://euer-host:5601) öffnen.
- Unten Mitte Rechts gibt es den Punkt "Stack Management", auswählen.
- Links im Menü gibt es dann "Ingest" und "Ingest Pipelines", letztere auswählen.
- Rechts "Create Pipeline".
- Als Name dann "FHEM" (groß bzw. passend zur pipeline Option).
- Dann auf "Add Processor".
- Als Typ "Convert" wählen.
- In Field dann "VALUE" (groß, weil in MySQL History das so heißt).
- Type auf "Double".
- Target field dann z.B. "VALUE_NUM".
- "Ignore missing" aktivieren.
- "Ignore failure" aktivieren ebenfalls (wichtig!).
- Unten auf "Add".
- Zuletzt auf "Create pipeline".
Das war es eigentlich schon. Es wurde eine Pipeline "FHEM" erzeugt, die den Inhalt der Spalte "VALUE" in eine neue Spalte "VALUE_NUM" als Double-Zahl abspeichert. Falls dies nicht geht, z.B. weil Strings, ist ihr das egal.
Jetzt kann Fluent mit dem neuen Konfig-File gestartet werden. Ggf. erst das Image bauen:
1DOCKER_BUILDKIT=1 docker build -t myfluentd:latest --rm -f Dockerfile .
Nach eingen Minuten sollten dann die ersten Daten im Kibana auftauchen und dieses File "sql_state" auch da sein.
Im "Stack Management" muss unter "Index Management" der Index "FHEM" aufgetaucht sein. Weiter unten unter "Kibana" -> "Index Patterns" jetzt noch ein Pattern (Create Pattern Index) mit dem Pattern "fhem*" und dem Timestamp Field "@timestamp" erzeugen.
Und schon kann es unter "Analytics" -> "Discovery" in die Werte von FHEM gehen. Naja, vermutlich sind erst nach einigen Stunden ausreichend Daten da und nach Tagen alle. Aber einfach schonmal spielen geht recht schnell.