Ausgewählte Prozessoren

In diesem Abschnitt befinden sich einige ausgewählte Prozessoren, die besondere Eigenschaften aufweisen und daher detaillierter beschrieben werden.

JSONataTransformJSON

Der JSONataTransformJSON-Prozessor dient dazu, mithilfe eines JSONata-Skriptes JSON zu transformieren.

Auf eingehendem Content in JSON können Queries bzw. Transformationen ausgeführt werden.

Der Prozessor hat in IGUASU folgende extra Funktionen:

  • $nfUuid() erzeugt eine UUID vom Typ 4 (Pseudozufallsgenerator). Die UUID wird mit einem kryptografisch starken Pseudozufallszahlengenerator erzeugt.

  • Attribute aus dem Eingangs-FlowFile können (neben dem Eingangs-Content) gelesen/verarbeitet werden: $nfGetAttribute(<name>)

  • Neben bzw. auch anstelle des Ausgangs-Contents können Ergebnisse in Attribute geschrieben werden: $nfSetAttribute(<name>,<value>

  • Die NiFi Expression Language kann verwendet werden: $nfEl(<expression>)

  • Es kann ein Lookup-Service verwendet werden, wenn dieser am Prozessor definiert wurde: $nfLookup(<key>)

Der JSONataTransformJSON-Prozessor hat einen spezifischen Editor, der das einfache Bearbeiten des gesamten Skriptes erlaubt.

Sie können Ihre Transformation schnell ausprobieren, indem Sie den "Test/run"-Button verwenden

Mehr allgemeine Information über JSONata:

Wie dieser Prozessor JSONata-Transformationen implementiert, unterscheidet sich leicht von try.jsonata.org!

Die erweiterten Funktionen werden nun im Detail erläutert.

Einfache Transformation

Eingangsnachricht (auch für die weiteren Beispiele; von jsonata.org übernommen):

{
    "FirstName": "Fred",
    "Surname": "Smith",
    "Age": 28,
    "Address": {
        "Street": "Hursley Park",
        "City": "Winchester",
        "Postcode": "SO21 2JN"
    }
}

Überführung einiger dieser Daten in eine andere Form der Adresse:

{
    "name": FirstName & " " & Surname,
    "mobile": Phone[type = "mobile"].number,
    "address": Address.City
}

Ergebnis:

{
    "name": "Fred Smith",
    "mobile": "077 7700 1234",
    "address": "Winchester"
}

Ergebnis in Attribute schreiben

Wollen Sie die gleichen Ergebnisse in Attribute statt in den Ausgangs-Content setzen, können Sie die folgende Funktion verwenden:

  • nfSetAttribute(<name>,<value>)

Zusätzlich können Sie am Prozessor deaktivieren, dass das Ergebnis des Skriptes in den Ausgang geschrieben wird:

Write Output

false

Der Content wird also unangetastet gelassen. In diesem Fall ist das sinnvoll, da nur die Attribute erstellt werden sollen.

Das Skript sieht nun so aus:

$nfSetAttribute("name", FirstName & " " & Surname) &
$nfSetAttribute("mobile", Phone[type = "mobile"].number) &
$nfSetAttribute("city", Address.City)

Im Ergebnis erscheinen dann die Attribute:

name

Fred Smith

mobile

077 7700 1234

city

Winchester

Es gibt am Prozessor auch noch ein Property, um das gesamte Ergebnis der Transformation in ein Attribut zu schreiben:

Write to Attribute

<name of attribute>

Attribute lesen

Wollen Sie im ersten Fall beispielsweise auf das Attribut filename zugreifen, um dieses als ID in das Ergebnis zu setzen, sieht das Skript folgendermaßen aus:

{
    "id": $nfGetAttribute("filename"),
    "name": FirstName & " " & Surname,
    "mobile": Phone[type = "mobile"].number,
    "address": Address.City
}

Nutzung der NiFi Expression Language

Wollen Sie die NiFi Expression Language innerhalb eines JSONata verwenden, geht dies über die entsprechende Funktion nfEl(<expression>).

Im folgenden Beispiel wird die NiFi Expression Language verwendet, um mit einer Regular Expression zu prüfen, ob der Name korrekt ist (also nur entsprechende Zeichen enthält).

{
    "name": FirstName & " " & Surname,
    "isValidName": $nfEl("${literal('" & FirstName & " " & Surname & "'):matches('^[\\p{L} \\p{Nd}_]+$')}"),
    "mobile": Phone[type = "mobile"].number,
    "address": Address.City
}

Die Funktion kann zusätzlich zu der Expression eine beliebige Anzahl von name/value-Paaren aufnehmen. Diese werden der Expression Language für die Ausführung als temporäre Attribute bereitgestellt. Das heißt, dass sie im Gegensatz zu $nfSetAttribute(<name>,<value>) nicht über die Ausführung hinaus gesetzt werden. Das kann beispielsweise genutzt werden, um Werte aus dem Input für die $nfEl()-Ausführung als Attribute mitzugeben.

Anstelle des Literals im letzten Beispiel könnte also auch Folgendes geschrieben werden:

{
  ...
  "isValidName": $nfEl("${name:matches('^[\\p{L} \\p{Nd}_]+$')}", "name", FirstName & " " & Surname )
  ...
}

TransformXml

Der TransformXml-Prozessor dient dazu, mithilfe eines XSLT-Skriptes XML zu transformieren.

Der Prozessor hat in IGUASU folgende Besonderheiten:

  • die neueste Version des Saxon XSLT-Prozessors mit XSLT 3.0/XPath 3.1 wird unterstützt

  • der lizenzierte Saxon EE inkl. seiner erweiterten Features wird mitgeliefert

  • das XSLT-Skript kann direkt in einem Property gespeichert werden (neben den Varianten des externen Files bzw. des Lookup-Service) - dies erleichtert die Verwendung und das Deployment

  • die direkte Verarbeitung von JSON durch fn:json-to-xml() bzw. fn:xml-to-json() wird durch die Möglichkeit der Einbettung des eingehenden JSON in einen XML-Root-Tag erleichtert

  • Ergebnis-Dokumente (xsl:result-document) können verwendet werden, um:

    • Relationen/Ausgänge des Moduls zu erstellen

    • Attribute im success/failure-Ausgang zu erstellen (hierzu muss der Name des href mit a: starten)

  • Nutzung der NiFi Expression Language in XPath-Ausdrücken

    • dazu wird der Namespace xmlns:nf="http://nifi.org" deklariert

    • die aufzurufende Methode heißt el() - z.B. <xsl:value-of select="nf:el('${UUID()}')"/>

Der TransformXml-Prozessor hat einen spezifischen Editor, der das einfache Bearbeiten des gesamten Skriptes erlaubt.

Die Funktionen werden folgend im Detail erläutert.

Nutzung von Ergebnis-Dokumenten

Voraussetzung:

Support result documents

true

Im XSLT sieht das dann so aus:

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="3.0">
  <xsl:output method="xml" name="xml" indent="yes"/>
  <xsl:output method="text" name="text"/>
  <xsl:template match="/">
    <xsl:result-document href="relationOne" format="xml">
      <resultOne><xsl:copy-of select="/"/></resultOne>
    </xsl:result-document>
    <xsl:result-document href="relationTwo" format="text">
      number of nodes: <xsl:value-of select="count(//*)"/>
    </xsl:result-document>
    <xsl:result-document href="a:attributeOne" format="text">something</xsl:result-document>
    </xsl:template>
</xsl:stylesheet>

Die Ergebnisse der result-documents von relationOne und relationTwo werden in die entsprechenden Relationen (Ausgänge) des Prozessors geschrieben. Diese werden verfügbar, indem Sie im Skript die result-document-Tags erstellen und das Skript dann speichern.

Das Ergebnis des result-document von a:attributeOne wird aufgrund des Prefix a: als Attribut in die success/failure-Relation geschrieben.

NiFi Expression Language

Die NiFi Expression Language kann sowohl bei der Übergabe von Parametern über Dynamic Properties als auch innerhalb von XPath-Ausdrücken verwendet werden.

EL in Parametern

Durch das Hinzufügen eines beliebigen Dynamic Properties (über den -Button) wird der Inhalt dieses Properties als Parameter (xsl:param) an das XSLT-Skript übergeben. Innerhalb des Wertes darf die Expression Language verwendet werden. Diese kann dabei z.B. auch auf die Attribute des herein gehenden FlowFiles zugreifen:

testParam

the filename is ${filename}

Dies kann dann im XSLT verwendet werden:

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="3.0">
  <xsl:output method="text"/>
  <xsl:param name="testParam"/>
  <xsl:template match="/">
    <xsl:value-of select="$testParam"/>
  </xsl:template>
</xsl:stylesheet>

Da der filename in NiFi typischerweise eine UUID ist, kommt als Ergebnis heraus:

the filename is 8ec0e87a-56dc-425f-b4c5-1de7f515ddea

EL in XPath Ausdrücken*

Um die NiFi Expression Language innerhalb von XPath zu verwenden, muss dies zunächst durch das entsprechende Property eingeschaltet werden:

Allow NiFi EL in XPath

true

Im XSLT-Skript muss noch der Namespace gesetzt werden (xmlns:nf="http://nifi.org"). Dann kann die Funktion (nf:el()) überall dort, wo XPath-Ausdrücke erlaubt sind, aufgerufen werden:

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="3.0"
xmlns:nf="http://nifi.org">
  <xsl:output method="text"/>
  <xsl:template match="/">
    <xsl:value-of select="nf:el('${UUID()}')"/>
  </xsl:template>
</xsl:stylesheet>

Das Ergebnis ist:

2560fc8c-3581-4732-8862-6bb191eb0dcc

JSON-Verarbeitung

Um JSON direkt lesen zu können, muss das entsprechende Property gesetzt sein:

Surround input with <xml> tag

true

Dadurch wird das eingehende JSON zu einem XML, auf welches dann die XPath-3.0-Funktion angewendet werden kann:

Eingangs-JSON:

{
  "name": "Harry",
  "age": 23,
  "address": {
    "city": "London"
  }
}

XSLT Skript:

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
                xmlns:fn="http://www.w3.org/2005/xpath-functions"
                exclude-result-prefixes="fn" version="3.0">
  <xsl:output indent="yes"/>
  <xsl:template match="/">
    <xsl:copy-of select="fn:json-to-xml(.)"/>
  </xsl:template>
</xsl:stylesheet>

Ergebnis:

<map xmlns="http://www.w3.org/2005/xpath-functions">
   <string key="name">Harry</string>
   <number key="age">23</number>
   <map key="address">
      <string key="city">London</string>
   </map>
</map>

Um aus einer solchen XML-Struktur wieder JSON zu machen, können Sie fn:xml-to-json() nutzen.

ListenBPCFlowStarter

Der ListenBPCFlowStarter-Prozessor ermöglicht die nahtlose Verknüpfung von IGUASU mit einer Virtimo Business-Process-Center-Instanz. Hierbei werden die anfangs erwähnten BPC-Services in IGUASU genutzt, um anhand der Konfigurationen eine Verbindung aufzubauen. Der ListenBPCFlowStarter fungiert im Anschluss als Listener und Startpunkt eines Flows, an den die Daten und der Input des BPC-Nutzers übergeben wird.

Der Prozessor hat in IGUASU folgende Besonderheiten:

  • Der gewählte BPC Listener Base Path wird im BPC unter den IGUASU-Einstellungen als ID angezeigt.

  • Zur besseren Unterscheidung der hinterlegten ListenBPCFlowStarter-Prozessoren werden zudem im BPC der Flow Starter Name und die Flow Starter Desc. angezeigt.

  • Durch den Einsatz von unterschiedlichen HybridRESTServerController-Services können die ListenBPCFlowStarter-Prozessoren im BPC in verschiedene Komponenten gruppiert werden.

Weitere Informationen zur Verbindung von IGAUSU und BPC finden sie im BPC Anbindung Tutorial.

PutBPCProcessLog

Der PutBPCProcessLog-Prozessor ermöglicht das Erstellen von BPC Process Logs, die vom IGUASU-Prozessor an die gewünschte BPC-Instanz übermittelt werden. Für diesen Zweck müssen einerseits ein BPC Controller, in dem die BPC-URL und der angelegte API Key enthalten sind, und andererseits der gewünschte BPC Logger ausgewählt werden. Zusätzlich wird die Möglichkeit angeboten, den Input Type zu ändern und dadurch zu bestimmen, ob der Inhalt des FlowFiles oder die Datei in der BPC Entries JSON-Property geloggt werden soll.

Dies ist das Format der erwarteten LOG-Daten:

{
  "entries": [
    {
      "parent": {
        "process-id": "448f1867-b4ef-4fb8-9db6-cf0f26acc384",
        ...
      },
      "childs": [
        {
          "process-id": "448f1867-b4ef-4fb8-9db6-cf0f26acc384",
          "child-id": "a18b56f8-5312-3c81-779d-4c08bd4ee29f",
          ...
        }
      ]
    }
  ]
}

Der Prozessor hat in IGUASU folgende Besonderheiten:

  • In den Auswahlmöglichkeiten der Property Choose BPC Logger sind die Logger hinterlegt, die zuvor im BPC erstellt wurden. Falls die BPC-Instanz nicht erreichbar sein sollte, kann zudem die ID des Loggers angegeben werden.

Weitere Informationen zur Verbindung von IGAUSU und BPC finden sie im BPC Anbindung Tutorial und in der BPC Dokumentation.

PutBPCAuditLog

Der PutBPCAuditLog-Prozessor dient dazu, Daten in das BPC Audit Log zu schreiben. Zur Konfiguration können dabei das BPC Audit Level, Audit Originator und die Action gewählt werden. Zur Verknüpfung des IGUASU-Flows mit einer BPC-Instanz wird hierbei der HybridRESTClientController-Service genutzt, in dem die BPC-URL und der generierte BPC API Key hinterlegt sind.

Der Prozessor hat in IGUASU folgende Besonderheiten:

  • Es kann die NiFi Expression Language verwendet werden, um die benötigten Log-Informationen aus den FlowFiles auszulesen.

Weitere Informationen zur Verbindung von IGAUSU und BPC finden sie im BPC Anbindung Tutorial und in der BPC Dokumentation.

Metro-Prozessoren

Um Zwischenergebnisse der verarbeiteten FlowFiles zu erstellen, die im späteren Verlauf eines Datenflusses erneut benutzt werden sollen, können Metro-Prozessoren eingesetzt werden. Hierbei wird zwischen den GetMetro-, PutMetro-, MergeMetro- und ExitMetro-Prozessoren unterschieden, die im Folgenden beschrieben sind.

Ein Anwendungsbeispiel der beschriebenen Prozessoren befindet sich zudem im Abschnitt How-Tos unter Metro.

PutMetro

FlowFiles können mit dem PutMetro-Prozessor zwischengespeichert werden.
Für die Konfiguration des Prozessors wird ein MetroLineController-Service benötigt, über den die Kommunikation der Metro-Prozessoren erfolgt. PutMetro-Prozessoren sind zudem optisch anders dargestellt als andere Prozessoren und ermöglichen dadurch eine bessere Übersicht über den Prozess des Zwischenspeicherns.

GetMetro

Die gespeicherten FlowFiles können im späteren Verlauf des Datenflusses durch einen GetMetro-Prozessor abgerufen werden. Hierbei ist wichtig, den gleichen Metro-Controller auszuwählen, der beim Speichern der Daten im PutMetro-Prozessor genutzt wurde.
Zusätzlich muss das Korrelationsattribut correlation hinterlegt werden, das über einen dynamischen Property erstellt werden kann. Als Wert bietet sich hierbei eine individuelle ID an, die genutzt werden kann, um FlowFiles innerhalb der Metro-Verbindung zu unterscheiden.

Im Folgenden ist ein Beispiel der Zwischenspeicherung und des späteren Abrufs der Daten dargestellt:

GetMetro

Wurden die zwischengespeicherten FlowFiles bereits durch einen GetMetro-Prozessor abgerufen, sind diese nicht mehr verfügbar. Dadurch können Fehler bei weiteren Zugriffsversuchen entstehen.

ExitMetro

Werden mehrere PutMetro-Prozessoren zur Zwischenspeicherung genutzt, können zudem ExitMetro-Prozessoren genutzt werden, um alle in der Metro vorhandenen Flowfiles zu erhalten. Hierbei werden keine Korrelationsattribute benötigt, da die Abfrage sich nicht auf einzelne FlowFiles, sondern auf alle bezieht.

Ein Anwendungsfall für den ExitMetro-Prozessor ist, wenn FlowFiles, die von vielen verschiedenen Prozessoren weitergeleitet werden, in einem gemeinsamen Prozess behandelt werden müssen (z.B. bei der Fehlerbehandlung). Die einfachste Lösung wäre, die passenden Relations (z.B. Fehler-Relationen) aller Prozessoren zu einem einzelnen Prozessor zu führen, welcher den übergeordneten Prozess übernimmt und einleitet.

Dies kann jedoch gerade in komplexen Flows dazu führen, dass zu viele lange Prozesse in die Länge gezogen werden und der Flow überläuft. Verwenden Sie zum Umgehen dieses Problems z.B. mehrere PutMetro-Prozessoren und einen ExitMetro-Prozessor, der die FlowFiles an den Prozessor übergibt, welcher wiederum die Fehlerbehandlung übernimmt.

ExitMetro
Abbildung 1. Die gestrichelten Linien in der Grafik veranschaulichen, wie FlowFiles von den PutMetros zum ExitMetro übertragen werden - sind aber im Diagramm nicht sichtbar.

MergeMetro

Über den MergeMetro-Prozessor kann eine FlowFile mit einer oder mehreren FlowFiles zusammengeführt werden, die von PutMetro-Prozessoren zwischengespeichert wurden. Die FlowFiles werden anhand eines in der Eigenschaft „Correlation Attribute Name“ definierten Attributs abgeglichen.

MergeMetro

Um XMLs zusammenzuführen, ist es am einfachsten, wenn die XML-Inhalte keine XML-Deklaration enthalten. Die gewünschte Deklaration für das zusammengeführte XML kann über die Property „Header“ hinzugefügt werden.

Um JSON-Objekte in einem JSON-Array zusammenzuführen, können Sie die Properties "Header", "Footer" und "Demarcator" auf [, ] bzw. , setzen.

Merge-Prozessoren

Um unabhängige oder zuvor getrennte FlowFiles zu vereinen stehen unterschiedliche Prozessoren zur Verfügung.

MergeContent-Prozessor Beispiel

Hierbei werden je nach Prozessor unterschiedliche Strategien und Formate angeboten, die je nach individueller Anforderung angepasst werden können. In diesem Abschnitt befindet sich eine Übersicht über einige Prozessoren, die zum Vereinen von FlowFiles genutzt werden können

MergeContent und MergeRecord

Die beiden Prozessoren MergeContent und MergeRecord verfügen über viele Einstellungsmöglichkeiten, die zum Zusammenführen von FlowFiles genutzt werden können. Viele der Optionen sind in beiden Prozessoren verfügbar, wobei es kleine Unterschiede gibt. Beispielsweise können bei der Record-orientierten Verarbeitung vom MergeRecord-Prozessor zusätzlich ein Reader und ein Writer definiert werden, wodurch beim Mergen ebenfalls eine Conversion erfolgen könnte. In Folgenden sind die Konfigurationsmöglichkeiten und die Funktionsweise der Optionen beschrieben:

  • Merge Strategy
    Mit der Merge Strategy kann definiert werden, nach welchen Kriterien die FlowFiles kombiniert werden sollen. Hierbei stehen zwei unterschiedliche Vorgehen zur Verfügung, die für diesen Zweck ausgewählt werden können.

    • Bin-Packing Algorithm
      Der Bin-Packing Algorithmus ist die Strategie die Standardmäßig zunächst ausgewählt ist. Hierbei werden FlowFiles in einzelnen Behälter (Bins) gesammelt, bis die definierten Schwellenwerte erreicht sind. Mit dem Parameter Minimum Number of Entries kann dadurch definiert werden, wie viele FlowFiles vorhanden sein müssen, damit diese in ein FlowFile kombiniert werden. Wenn die Konfiguration Maximum Group Size nicht definiert ist, ist die Größe der dadurch generierten Bins nicht eingeschränkt und jedes Mal wenn der Minimum Number of Entries Schwellenwert erreicht ist, werden alle FlowFiles in der Queue des Prozessors in ein FlowFile integriert.

      Neben der quantitativen Definition der gewünschten Größe der Bins ist es ebenfalls möglich, das Zusammenführen von FlowFiles abhängig von der Zeit durchzuführen. Mit der Option Max Bin Age kann ein positiver Integer Wert als Dauer oder eine Zeiteinheit in Sekunden, Minuten oder Stunden definiert werden, nach der die Kombination der FlowFiles erfolgt.

      Darüber hinaus kann ein Correlation Attribute Name festgelegt werden, anhand FlowFiles in der Queue passend zueinander gruppiert werden. Dadurch ist es möglich, unabhängige FlowFiles aus der Kombination auszuschließen und durch die Vereinigung thematisch zusammenhängende FlowFiles zu erstellen. Hierbei ist allerdings zu beachten, dass nur ein Korrelations-Attribut festgelegt werden kann. Alle FlowFiles die das definierte Attribut nicht besitzen, werden nicht verarbeitet und bleiben daher in der Queue.

    • Defragment
      Die zweite angebotene Strategie um einzelne FlowFiles zu kombinieren ist die Defragment-Strategie, bei der spezifische Attribute zum Zusammenführen genutzt werden. Waren die einzelnen FlowFiles zuvor bereits in nur einem FlowFile enthalten und wurden im Laufe des Flows durch einen Split-Prozessor beispielsweise getrennt, befinden sich individuelle Attribute die bei dieser Strategie genutzt werden. Die durch das Splitten generierten Attribute fragment.identifier , fragment.count und fragment.index werden verwendet, um die zugehörigen FlowFiles erneut zu vereinen. Diese Strategie lässt sich gut zusammen mit -Split Prozessoren verwenden (z.B. SplitJson), welche die benötigten Attribute dann automatisch hinzufügen.

  • Merge Format
    Mit der Merge Format Einstellung kann festgelegt werden, in welchem Format die Zusammenführung der einzelnen FlowFiles erfolgen soll. Standardmäßig ist diese Option auf Binary Concatenation gesetzt, wodurch verschieden FlowFiles in ein einzelnes kombiniert werden. Optional können die Daten ebenfalls im ZIP, TAR oder in anderen Formaten verknüpft werden.

  • Attribute Strategy
    Zusätzlich steht in den Prozessoren die Möglichkeit zur Verfügung, eine Strategie für die vorhandenen Attribute auszuwählen. Hierbei stehen zwei Optionen zur Verfügung, die im Folgenden beschrieben sind:

    • Keep All Unique Attributes
      Mit dieser Strategie werden alle einzigartigen Attribute beim Ergebnis-FlowFile beibehalten. Sollten mehrere einzelne FlowFiles vor der Verarbeitung ein Attribut teilen, dass zudem denselben Wert hat, dann wird das Attribut ebenfalls übernommen, wenn nicht alle FlowFiles das Attribut haben.

      Sollten die FlowFiles unterschiedliche Werte für ein gemeinsames Attribut haben, so wird dieses Attribut nicht übernommen.
    • Keep Only Common Attributes
      Attribute, die behalten werden sollen, müssen bei allen einzelnen FlowFiles die kombiniert werden vorhanden sein. Sollte das Attribut bei einem der FlowFiles fehlen, wird das Attribut nicht im Ergebnis integriert.