Unser Fachartikel Datenströme einfach zusammenführen mit “Telegraf” wurde in der Ausgabe 3/2023 des JavaSPEKTRUM Magazins veröffentlicht.
Durch unsere Erfahrungen aus verschiedenen Projekten im Bereich der Datenakquirierung, -verarbeitung und -weiterleitung, stellen wir mit Telegraf eine Möglichkeit vor, wie Zeitreihendaten aus mehreren Quellen einfach angebunden und zusammengeführt werden können.
Telegraf (GitHub) ist eine unabhängige Anwendung, die für die gängigen Betriebssysteme zur Verfügung gestellt wird. Über eine einzelne Konfigurationsdatei (TOML-Format) und die über 300 Plugins können komplexe Verarbeitungspipelines in wenigen Zeilen umgesetzt werden. Eine Telegraf-Konfigurationsdatei setzt sich grundsätzlich aus folgenden fünf Blöcken zusammen:
Für alle Bestandteile außer dem Agent gilt, dass beliebig viele Plugins genutzt werden können, unter der Voraussetzung, dass mindestens ein Input und ein Output vorhanden ist. Grundsätzlich durchlaufen die Daten die Bestandteile in der Reihenfolge:
Inputs --> Prozessoren --> Aggregatoren --> Outputs
Es besteht jedoch auch die Möglichkeit, durch entsprechendes Routing individuelle Verarbeitungspipelines zu definieren. Dies kommt z.B. dann zum Einsatz, wenn Daten aus mehreren Inputs unterschiedlich verarbeitet werden sollen.
Anhand eines kleinen Beispiels soll die Funktionsweise und Konfiguration von Telegraf erläutert werden.
Angenommen es gibt zwei APIs und es soll die Anzahl der eingehenden Anfragen an zentraler Stelle gesammelt werden, dafür nutzen sie jedoch unterschiedliche Formate, um die Auslastung zu erfassen. Die erste API schreibt pro Anfrage eine Zeile in eine Log-Datei (CSV) mit Uhrzeit, API-ID und Erfolg/Misserfolg (1/0):
timestamp,api_id,request
2023-02-06 14:45:00,api_1,1
2023-02-06 14:45:03,api_1,0
2023-02-06 14:45:05,api_1,1
Die zweite API verschickt jede Minuten die aggregierten Statistiken in folgendem JSON-Format:
{
"timestamp": "2023-02-06 14:45:00",
"api_id": "api_2",
"successful_requests": 103,
"failed_requests": 4
}
Die Daten der beiden APIs sollen in zweiterem Format abgespeichert werden, weshalb insbesondere die Daten aus API-1 noch entsprechend verarbeitet werden müssen.
Der Agent wird nur dahingehen angepasst, dass der hostname
nicht jedem ausgegebenen Datenpunkt beigefügt wird.
[agent]
omit_hostname = true
Das Input-Plugin für API-1 liest regelmäßig die Datei log.csv
aus und parsed neue Einträge entsprechend dem definierten Format. Über name_override
werden die Daten aus diesem Input-Plugin als "datasource_file"
markiert, sodass die nachfolgenden Schritte darauf angepasst werden können.
[[inputs.tail]]
name_override = "datasource_file"
tagexclude = ["path"]
files = ['log.csv']
data_format = "csv"
csv_column_names = ["timestamp", "api_id", "request"]
csv_tag_columns = ["api_id"]
csv_timestamp_column = "timestamp"
csv_timestamp_format = "2006-01-02 15:04:05"
csv_timezone = "Local"
Das Input-Plugin für API-2 hört auf localhost:8080/telegraf
und erwartet das entsprechende JSON-Format. Diese Daten werden als "datasource_http"
bezeichnet.
[[inputs.http_listener_v2]]
name_override = "datasource_http"
service_address = ":8080"
methods = ["POST"]
data_format = "json"
tag_keys = ["api_id"]
json_time_key = "timestamp"
json_time_format = "2006-01-02 15:04:05"
json_timezone = "Local"
Der rename
-Prozessor wird nur auf die Daten von API-1 angewandt (siehe namepass
), da sie jedoch zunächst keine Felder request_1
und request_0
haben, überspringen sie diesen Prozess ohne Veränderungen.
Der converter
-Prozessor wird auf beide Datenquellen angewandt und wandelt die Felder successful_requests
und failed_requests
in Integer um. Daten von API-1 bleiben jedoch auch hier zunächst unverändert, da sie keine Felder mit diesen Bezeichnungen besitzen.
[[processors.rename]]
order = 1
namepass = ["datasource_file"]
[[processors.rename.replace]]
field = "request_1"
dest = "successful_requests"
[[processors.rename.replace]]
field = "request_0"
dest = "failed_requests"
[[processors.converter]]
order = 2
[processors.converter.fields]
integer = ["successful_requests", "failed_requests"]
Das Aggregator-Plugin gruppiert jetzt die Daten aus API-1 zu 1-Minuten-Intervallen zusammen, indem die Anzahl der einzelnen Werte (1/0) in der Spalte request
ermittelt wird. So entstehen zwei neue Felder, und zwar request_1
und request_0
. Die Rohdaten werden durch drop_original = true
nicht weitergeleitet.
[[aggregators.valuecounter]]
namepass = ["datasource_file"]
period = "1m"
drop_original = true
fields = ["request"]
Die entstandenen Aggregate durchlaufen erneut die beiden Prozessor-Plugins und werden somit auf das endgültige, gewünschte Format gebracht.
Die Ausgabe erfolgt über das file
-Output-Plubin im influx
-Format:
[[outputs.file]]
name_override = "api_usage"
files = ["metrics.out"]
data_format = "influx"
api_usage,api_id=api_1 successful_requests=2i,failed_requests=1i 1675691100000000000
api_usage,api_id=api_2 successful_requests=103i,failed_requests=4i 1675691100000000000
Zusammenfassend können sich die Datenströme wie folgt darstellen lassen:
API-1
--> inputs.tail
--> (processors.rename)
--> (processors.convert)
--> aggregators.valuecounter
--> processor.rename
--> processor.convert
--> outputs.file
API-2
--> inputs.http_listener_v2
--> processors.convert
--> outputs.file
Da mit Telegraf die Sammlung von Zeitreihendaten einfach und schnell umgesetzt werden kann, bietet es sich an eine Zeitreihendatenbank (z.B.InfluxDB, Timescale) und eine Visualisierungsmöglichkeit (z.B. Chronograf, Grafana) anzuschließen.