Elastisches Drama (1?)
In den Artikel ☛ Elasticsearch in Docker und folgenden habe ich mir Elasticsearch installiert.
Wo wären wir, wenn es danach so einfach weiter gegangen wäre?
Heute wollte ich "mal eben" die CSV Dateien vom ODB2 Logger importieren. Bot sich an. Jetzt sind 12 Stunde vorbei und irgendwie geht irgendwas. Aber jede Ergänzung hat es immer erstmal auf 0 gedreht.
Vielleicht auch, weil ich eine Pipeline wollte, die die Daten etwas überarbeitet:
- Ampere und Volt sollten zusammen als neue Spalte Watt erzeugt werden
- die etwas magischen Zahlen für kWh und SOC in die richtigen Einheiten statt den fancy Autobauereigenen Zahlen
- zuletzt: die geloggten Koordinaten als Koordinatenfeld (denn sonst ist ELK zu doof dafür)
Die ersten 2 gingen sogar noch relativ leicht per "Painless" Script. Der Name muss aber Ironie sein.
Spätestens beim 3. war die Ironie komplett! Die Daten liegen im CSV leider als DMS Format vor. Kein Programm der Welt will diese vermutlich so haben, aber das stört ja den Logger nicht.
Umrechung ist ja nun auch nicht so schwer. Aber Painless wäre nicht der Name, wenn es damit einfach ginge: Strings einfach mal anhand von Leerzeichen splitten? Nö!
Ein längeres Gespräch mit ChatGPT und auch Google bracht dann einen Bedarf nach Regexen!!!
Also statt
1String [] result = value.split(" ");
darf es das sein:
1String [] result = / /.split(value);
Ja, wenn man es weiß ist es total einfach!
ChatGPT hat dann leider die Reihenfolge Long und Lat im Result verdreht und ich bin dann laut Kibana durch Ostafrika gefahren. Wäre vielleicht auch die bessere Idee statt mit solchen Problemen den Sonntag zu verbringen.
Und weil das bestimmt nicht alles sein wird, heißt der Beitrag "Drama 1". Hey, die Ironie im Namen kann ich auch.
Hier noch das Script "importer.py" zum Import einer CSV-Datei:
1#!/usr/bin/python3
2# https://stackoverflow.com/questions/71197482/how-to-load-1000-lines-of-a-csv-into-elasticsearch-as-1000-different-documents-u
3
4import csv, sys
5from elasticsearch import helpers, Elasticsearch, RequestsHttpConnection
6
7if len(sys.argv) <2:
8 print("Keine Datei uebergeben!")
9 sys.exit(1)
10
11index="leafspy-csv"
12pipeline="leafspy-csv-pipline"
13
14es = Elasticsearch(
15 hosts=[{
16 'host': '192.168.1.2',
17 'port': '9200'}],
18 use_ssl=False,
19 verify_certs=False,
20 connection_class=RequestsHttpConnection
21)
22
23mapping = '''
24{
25 "mappings": {
26 "properties": {
27 "Speed1": {
28 "type": "long"
29 },
30 "Speed2": {
31 "type": "long"
32 },
33 "12v Bat Amps": {
34 "type": "keyword"
35 },
36 "12v Bat Volts": {
37 "type": "double"
38 },
39 "A/C Comp(0_1MPa)": {
40 "type": "long"
41 },
42 "A/C Pwr(250w)": {
43 "type": "long"
44 },
45 "AHr": {
46 "type": "long"
47 },
48 "Ambient": {
49 "type": "double"
50 },
51 "Aux Pwr(100w)": {
52 "type": "long"
53 },
54 "Avg CP mV": {
55 "type": "long"
56 },
57 "BLevel": {
58 "type": "long"
59 },
60 "BMS": {
61 "type": "long"
62 },
63 "CP mV Diff": {
64 "type": "long"
65 },
66 "CP1": {
67 "type": "long"
68 },
69 "CP10": {
70 "type": "long"
71 },
72 "CP11": {
73 "type": "long"
74 },
75 "CP12": {
76 "type": "long"
77 },
78 "CP13": {
79 "type": "long"
80 },
81 "CP14": {
82 "type": "long"
83 },
84 "CP15": {
85 "type": "long"
86 },
87 "CP16": {
88 "type": "long"
89 },
90 "CP17": {
91 "type": "long"
92 },
93 "CP18": {
94 "type": "long"
95 },
96 "CP19": {
97 "type": "long"
98 },
99 "CP2": {
100 "type": "long"
101 },
102 "CP20": {
103 "type": "long"
104 },
105 "CP21": {
106 "type": "long"
107 },
108 "CP22": {
109 "type": "long"
110 },
111 "CP23": {
112 "type": "long"
113 },
114 "CP24": {
115 "type": "long"
116 },
117 "CP25": {
118 "type": "long"
119 },
120 "CP26": {
121 "type": "long"
122 },
123 "CP27": {
124 "type": "long"
125 },
126 "CP28": {
127 "type": "long"
128 },
129 "CP29": {
130 "type": "long"
131 },
132 "CP3": {
133 "type": "long"
134 },
135 "CP30": {
136 "type": "long"
137 },
138 "CP31": {
139 "type": "long"
140 },
141 "CP32": {
142 "type": "long"
143 },
144 "CP33": {
145 "type": "long"
146 },
147 "CP34": {
148 "type": "long"
149 },
150 "CP35": {
151 "type": "long"
152 },
153 "CP36": {
154 "type": "long"
155 },
156 "CP37": {
157 "type": "long"
158 },
159 "CP38": {
160 "type": "long"
161 },
162 "CP39": {
163 "type": "long"
164 },
165 "CP4": {
166 "type": "long"
167 },
168 "CP40": {
169 "type": "long"
170 },
171 "CP41": {
172 "type": "long"
173 },
174 "CP42": {
175 "type": "long"
176 },
177 "CP43": {
178 "type": "long"
179 },
180 "CP44": {
181 "type": "long"
182 },
183 "CP45": {
184 "type": "long"
185 },
186 "CP46": {
187 "type": "long"
188 },
189 "CP47": {
190 "type": "long"
191 },
192 "CP48": {
193 "type": "long"
194 },
195 "CP49": {
196 "type": "long"
197 },
198 "CP5": {
199 "type": "long"
200 },
201 "CP50": {
202 "type": "long"
203 },
204 "CP51": {
205 "type": "long"
206 },
207 "CP52": {
208 "type": "long"
209 },
210 "CP53": {
211 "type": "long"
212 },
213 "CP54": {
214 "type": "long"
215 },
216 "CP55": {
217 "type": "long"
218 },
219 "CP56": {
220 "type": "long"
221 },
222 "CP57": {
223 "type": "long"
224 },
225 "CP58": {
226 "type": "long"
227 },
228 "CP59": {
229 "type": "long"
230 },
231 "CP6": {
232 "type": "long"
233 },
234 "CP60": {
235 "type": "long"
236 },
237 "CP61": {
238 "type": "long"
239 },
240 "CP62": {
241 "type": "long"
242 },
243 "CP63": {
244 "type": "long"
245 },
246 "CP64": {
247 "type": "long"
248 },
249 "CP65": {
250 "type": "long"
251 },
252 "CP66": {
253 "type": "long"
254 },
255 "CP67": {
256 "type": "long"
257 },
258 "CP68": {
259 "type": "long"
260 },
261 "CP69": {
262 "type": "long"
263 },
264 "CP7": {
265 "type": "long"
266 },
267 "CP70": {
268 "type": "long"
269 },
270 "CP71": {
271 "type": "long"
272 },
273 "CP72": {
274 "type": "long"
275 },
276 "CP73": {
277 "type": "long"
278 },
279 "CP74": {
280 "type": "long"
281 },
282 "CP75": {
283 "type": "long"
284 },
285 "CP76": {
286 "type": "long"
287 },
288 "CP77": {
289 "type": "long"
290 },
291 "CP78": {
292 "type": "long"
293 },
294 "CP79": {
295 "type": "long"
296 },
297 "CP8": {
298 "type": "long"
299 },
300 "CP80": {
301 "type": "long"
302 },
303 "CP81": {
304 "type": "long"
305 },
306 "CP82": {
307 "type": "long"
308 },
309 "CP83": {
310 "type": "long"
311 },
312 "CP84": {
313 "type": "long"
314 },
315 "CP85": {
316 "type": "long"
317 },
318 "CP86": {
319 "type": "long"
320 },
321 "CP87": {
322 "type": "long"
323 },
324 "CP88": {
325 "type": "long"
326 },
327 "CP89": {
328 "type": "long"
329 },
330 "CP9": {
331 "type": "long"
332 },
333 "CP90": {
334 "type": "long"
335 },
336 "CP91": {
337 "type": "long"
338 },
339 "CP92": {
340 "type": "long"
341 },
342 "CP93": {
343 "type": "long"
344 },
345 "CP94": {
346 "type": "long"
347 },
348 "CP95": {
349 "type": "long"
350 },
351 "CP96": {
352 "type": "long"
353 },
354 "Charge Mode": {
355 "type": "long"
356 },
357 "Date/Time": {
358 "type": "keyword"
359 },
360 "Debug": {
361 "type": "text"
362 },
363 "Elv": {
364 "type": "long"
365 },
366 "Est Pwr A/C(50w)": {
367 "type": "long"
368 },
369 "Est Pwr Htr(250w)": {
370 "type": "long"
371 },
372 "GPS Status": {
373 "type": "keyword"
374 },
375 "Gear": {
376 "type": "long"
377 },
378 "Gids": {
379 "type": "long"
380 },
381 "HVolt1": {
382 "type": "double"
383 },
384 "HVolt2": {
385 "type": "double"
386 },
387 "Hx": {
388 "type": "double"
389 },
390 "Inverter 2 Temp": {
391 "type": "long"
392 },
393 "Inverter 4 Temp": {
394 "type": "long"
395 },
396 "Judgment Value": {
397 "type": "long"
398 },
399 "L1/L2": {
400 "type": "long"
401 },
402 "Lat": {
403 "type": "keyword"
404 },
405 "Long": {
406 "type": "keyword"
407 },
408 "Max CP mV": {
409 "type": "long"
410 },
411 "Min CP mV": {
412 "type": "long"
413 },
414 "Motor Pwr(w)": {
415 "type": "long"
416 },
417 "Motor Temp": {
418 "type": "long"
419 },
420 "OBC": {
421 "type": "long"
422 },
423 "OBC Out Pwr": {
424 "type": "long"
425 },
426 "Odo(km)": {
427 "type": "long"
428 },
429 "Pack Amps": {
430 "type": "double"
431 },
432 "Pack T1 C": {
433 "type": "double"
434 },
435 "Pack T1 F": {
436 "type": "double"
437 },
438 "Pack T2 C": {
439 "type": "double"
440 },
441 "Pack T2 F": {
442 "type": "double"
443 },
444 "Pack T3 C": {
445 "type": "keyword"
446 },
447 "Pack T3 F": {
448 "type": "keyword"
449 },
450 "Pack T4 C": {
451 "type": "double"
452 },
453 "Pack T4 F": {
454 "type": "double"
455 },
456 "Pack Volts": {
457 "type": "double"
458 },
459 "Plug State": {
460 "type": "long"
461 },
462 "Power SW": {
463 "type": "long"
464 },
465 "QC": {
466 "type": "long"
467 },
468 "RPM": {
469 "type": "long"
470 },
471 "RegenWh": {
472 "type": "long"
473 },
474 "SOC": {
475 "type": "long"
476 },
477 "SOH": {
478 "type": "double"
479 },
480 "Speed": {
481 "type": "double"
482 },
483 "TP-FL": {
484 "type": "double"
485 },
486 "TP-FR": {
487 "type": "double"
488 },
489 "TP-RL": {
490 "type": "double"
491 },
492 "TP-RR": {
493 "type": "double"
494 },
495 "Torque Nm": {
496 "type": "double"
497 },
498 "VIN": {
499 "type": "keyword"
500 },
501 "Wiper Status": {
502 "type": "long"
503 },
504 "epoch time": {
505 "type": "date",
506 "format": "epoch_second"
507 },
508 "Pack Watts": {
509 "type": "double"
510 },
511 "kWh": {
512 "type": "double"
513 },
514 "cSOC": {
515 "type": "double"
516 },
517 "location": {
518 "type": "geo_point"
519 }
520 }
521 }
522}'''
523res = es.indices.create(index=index, ignore=400, body=mapping)
524#print(res)
525#quit()
526
527upload_list = [] # list of items for upload
528
529# Load all csv data
530with open(sys.argv[1], newline='') as csvfile:
531
532 data_list = []
533
534 csv_data = csv.reader(csvfile)
535 for row in csv_data:
536 data_list.append(row)
537
538 # separate out the headers from the main data
539 headers = data_list[0]
540 # drop headers from data_list
541 data_list.pop(0)
542
543 for item in data_list: # iterate over each row/item in the csv
544
545 item_dict = {}
546
547 # match a column header to the row data for an item
548 i = 0
549 for header in headers:
550 item_dict[header.strip()] = item[i]
551 i = i + 1
552
553 item_dict["_id"] = item_dict["epoch time"]
554 item_dict["pipeline"] = pipeline
555 # add the transformed item/row to a list of dicts
556 upload_list += [item_dict]
557
558# using helper library's Bulk API to index list of Elasticsearch docs
559try:
560 resp = helpers.bulk(
561 es,
562 upload_list,
563 index=index
564 )
565 msg = "helpers.bulk() RESPONSE: " + str(resp)
566 print(msg) # print the response returned by Elasticsearch
567except Exception as err:
568 msg = "Elasticsearch helpers.bulk() ERROR: " + str(err)
569 print(msg)
570 sys.exit(1)
Und noch das Dockerfile, um das Ding in ein Image zu packen - mit dem PIP Zeug will man sich ja nun doch nicht sein System versauen (was auch gut so war, die nötigen Pakete musste ich mehrmals ändern):
1FROM debian:bullseye
2
3RUN ln -snf /usr/share/zoneinfo/CET /etc/localtime && echo 'CET' >/etc/timezone
4RUN apt-get update
5RUN DEBIAN_FRONTEND=noninteractive apt-get install -y python3 python3-pip && apt-get autoclean
6
7RUN pip install elasticsearch==7.17.8 requests
8
9COPY --chmod=700 entry.sh /entry.sh
10COPY --chmod=700 importer.py /importer.py
11
12CMD /entry.sh
Und weil es im Dockerfile so steht, das entry.sh:
1#!/bin/sh
2
3chmod +x /importer.py
4
5for N in /data/Log_*
6do
7 /importer.py $N
8done
Starten:
1docker run -it --rm -v /LeafSpy/LOG_FILES:/data mein-elastic-csv:latest