Artikel: Datenströme einfach zusammenführen mit "Telegraf"

28.09.2023 - Felix Wende

Unser Fachartikel Datenströme einfach zusammenführen mit “Telegraf” wurde in der Ausgabe 3/2023 des JavaSPEKTRUM Magazins veröffentlicht.


Durch unsere Erfahrungen aus verschiedenen Projekten im Bereich der Datenakquirierung, -verarbeitung und -weiterleitung, stellen wir mit Telegraf eine Möglichkeit vor, wie Zeitreihendaten aus mehreren Quellen einfach angebunden und zusammengeführt werden können.


Telegraf


Telegraf (GitHub) ist eine unabhängige Anwendung, die für die gängigen Betriebssysteme zur Verfügung gestellt wird. Über eine einzelne Konfigurationsdatei (TOML-Format) und die über 300 Plugins können komplexe Verarbeitungspipelines in wenigen Zeilen umgesetzt werden. Eine Telegraf-Konfigurationsdatei setzt sich grundsätzlich aus folgenden fünf Blöcken zusammen:


  • Agent: Der Agent bildet die Laufzeitumgebung und hier werden allgemeine Parameter definiert, beispielsweise in welchen Intervallen Daten eingelesen oder ausgegeben werden, die Größe des Pufferspeichers oder die Konfiguration des Loggings.
  • Inputs: Input-Plugins sind dafür zuständig, Daten aus verschiedenen Quellen anzubinden. Für viele Anwendungsfälle existieren bereits Plugins. Es stehen aber auch generische Plugins zur Verfügung, die individuell auf die eigene Aufgabenstellung angepasst werden können.
  • Prozessoren: Prozessoren ermöglichen es, Daten schon an zentraler Stelle verschiedenen Verarbeitungsschritten zu unterziehen.
  • Aggregatoren: Anhand der Aggregatoren wird deutlich, dass Telegraf für die Verarbeitung von Zeitreihendaten entwickelt wurde. Hier können Rohdaten auf aggregiert werden und so an die Outputs weitergeleitet werden.
  • Outputs: Die Output-Plugins sind das letzte Glied in der Kette und definieren, wohin die Daten geschickt werden. Beispielsweise können die Daten in eine Datei geschrieben oder an eine Datenbank geschickt werden.

Für alle Bestandteile außer dem Agent gilt, dass beliebig viele Plugins genutzt werden können, unter der Voraussetzung, dass mindestens ein Input und ein Output vorhanden ist. Grundsätzlich durchlaufen die Daten die Bestandteile in der Reihenfolge:


Inputs --> Prozessoren --> Aggregatoren --> Outputs

Es besteht jedoch auch die Möglichkeit, durch entsprechendes Routing individuelle Verarbeitungspipelines zu definieren. Dies kommt z.B. dann zum Einsatz, wenn Daten aus mehreren Inputs unterschiedlich verarbeitet werden sollen.


Hands-on


Anhand eines kleinen Beispiels soll die Funktionsweise und Konfiguration von Telegraf erläutert werden.


Problemstellung


Angenommen es gibt zwei APIs und es soll die Anzahl der eingehenden Anfragen an zentraler Stelle gesammelt werden, dafür nutzen sie jedoch unterschiedliche Formate, um die Auslastung zu erfassen. Die erste API schreibt pro Anfrage eine Zeile in eine Log-Datei (CSV) mit Uhrzeit, API-ID und Erfolg/Misserfolg (1/0):


timestamp,api_id,request
2023-02-06 14:45:00,api_1,1
2023-02-06 14:45:03,api_1,0
2023-02-06 14:45:05,api_1,1

Die zweite API verschickt jede Minuten die aggregierten Statistiken in folgendem JSON-Format:


{
    "timestamp": "2023-02-06 14:45:00",
    "api_id": "api_2",
    "successful_requests": 103,
    "failed_requests": 4
}

Die Daten der beiden APIs sollen in zweiterem Format abgespeichert werden, weshalb insbesondere die Daten aus API-1 noch entsprechend verarbeitet werden müssen.


Umsetzung


Der Agent wird nur dahingehen angepasst, dass der hostname nicht jedem ausgegebenen Datenpunkt beigefügt wird.


[agent]
  omit_hostname = true

Das Input-Plugin für API-1 liest regelmäßig die Datei log.csv aus und parsed neue Einträge entsprechend dem definierten Format. Über name_override werden die Daten aus diesem Input-Plugin als "datasource_file" markiert, sodass die nachfolgenden Schritte darauf angepasst werden können.


[[inputs.tail]]
  name_override = "datasource_file"
  tagexclude = ["path"]
  files = ['log.csv']
  data_format = "csv"
  csv_column_names = ["timestamp", "api_id", "request"]
  csv_tag_columns = ["api_id"]
  csv_timestamp_column = "timestamp"
  csv_timestamp_format = "2006-01-02 15:04:05"
  csv_timezone = "Local"

Das Input-Plugin für API-2 hört auf localhost:8080/telegraf und erwartet das entsprechende JSON-Format. Diese Daten werden als "datasource_http" bezeichnet.


[[inputs.http_listener_v2]]
  name_override = "datasource_http"
  service_address = ":8080"
  methods = ["POST"]
  data_format = "json"
  tag_keys = ["api_id"]
  json_time_key = "timestamp"
  json_time_format = "2006-01-02 15:04:05"
  json_timezone = "Local"

Der rename-Prozessor wird nur auf die Daten von API-1 angewandt (siehe namepass), da sie jedoch zunächst keine Felder request_1 und request_0 haben, überspringen sie diesen Prozess ohne Veränderungen. Der converter-Prozessor wird auf beide Datenquellen angewandt und wandelt die Felder successful_requests und failed_requests in Integer um. Daten von API-1 bleiben jedoch auch hier zunächst unverändert, da sie keine Felder mit diesen Bezeichnungen besitzen.


[[processors.rename]]
  order = 1
  namepass = ["datasource_file"]
  [[processors.rename.replace]]
    field = "request_1"
    dest = "successful_requests"
  [[processors.rename.replace]]
    field = "request_0"
    dest = "failed_requests"

[[processors.converter]]
  order = 2
  [processors.converter.fields]
    integer = ["successful_requests", "failed_requests"]

Das Aggregator-Plugin gruppiert jetzt die Daten aus API-1 zu 1-Minuten-Intervallen zusammen, indem die Anzahl der einzelnen Werte (1/0) in der Spalte request ermittelt wird. So entstehen zwei neue Felder, und zwar request_1 und request_0. Die Rohdaten werden durch drop_original = true nicht weitergeleitet.


[[aggregators.valuecounter]]
  namepass = ["datasource_file"]
  period = "1m"
  drop_original = true
  fields = ["request"]

Die entstandenen Aggregate durchlaufen erneut die beiden Prozessor-Plugins und werden somit auf das endgültige, gewünschte Format gebracht.


Die Ausgabe erfolgt über das file-Output-Plubin im influx-Format:


[[outputs.file]]
  name_override = "api_usage"
  files = ["metrics.out"]
  data_format = "influx"
api_usage,api_id=api_1 successful_requests=2i,failed_requests=1i 1675691100000000000
api_usage,api_id=api_2 successful_requests=103i,failed_requests=4i 1675691100000000000

Zusammenfassend können sich die Datenströme wie folgt darstellen lassen:


API-1
--> inputs.tail
--> (processors.rename)
--> (processors.convert)
--> aggregators.valuecounter
--> processor.rename
--> processor.convert
--> outputs.file

API-2
--> inputs.http_listener_v2
--> processors.convert
--> outputs.file


Integrationsmöglichkeiten


Da mit Telegraf die Sammlung von Zeitreihendaten einfach und schnell umgesetzt werden kann, bietet es sich an eine Zeitreihendatenbank (z.B.InfluxDB, Timescale) und eine Visualisierungsmöglichkeit (z.B. Chronograf, Grafana) anzuschließen.

trinnovative signed trinnovative signed

Mehr zum Blogthema erfahren?