Dieser Artikel richtet sich an Personen, die bisher keine Erfahrung mit Hadoop haben und sich einen Einstieg basierend auf praktischen Übungen wünschen. Neben der Erläuterung der Kernkomponenten, werden mithilfe eines Docker-Image grundlegende Konzepte gezeigt.
Einleitung
Dieser Artikel basiert auf dem Vortrag “Hadoop – Taming the Elephant (With a Whale)”. Er behandelt die Grundlagen von Hadoop und bleibt dabei praxisnah und zielgerichtet. Der Aufbau des Artikels gleicht in weiten Teilen dem Aufbau des Vortrags, dessen Folien online verfügbar sind (siehe Abbildung 1).
Was ist Hadoop eigentlich?
Hadoop ist ein System zur Datenspeicherung und -analyse, das seit 2006 entwickelt wird und seit 2008 zu Apaches Kernprojekten gehört. Heutzutage wird Hadoop oft in Enterprise-Systemen für die Arbeit mit Big Data eingesetzt. Die Tochter von Doug Cutting, einem der Mitbegründer des Projektes, nannte ihren gelben Plüschelefant “Hadoop”. Daher rühren sowohl der Projektname als auch das gelbe Elefantenlogo [1]. Hadoop besteht aus fünf Kernkomponenten: Hadoop Common, HDFS, MapReduce, YARN und Ozone. Hadoop Common beinhaltet die grundlegenden Bestandteile der Applikation. HDFS, das “Hadoop Distributed File System”, dient zur Speicherung von Daten. MapReduce stellt ein Programmiermodell zur Verarbeitung großer Datenmengen dar. YARN, kurz für “Yet Another Resource Negotiator”, ist das Cluster-Ressource-Management-System von Hadoop. Ozone ist erst seit Ende 2018 in einer Beta-Version Teil des Kerns, es handelt sich hierbei um einen Objektspeicher. Dieser Artikel befasst sich hauptsächlich mit HDFS und MapReduce.
Hadoop in Docker starten
Ein Vorgehen für einen einfachen und schnellen Start in die Thematik, das viele Möglichkeiten für praktische Fingerübungen bietet, ist die Verwendung eines Docker-Image. So werden umfangreiche Installationen vermieden und die Übungen können schnell in Angriff genommen werden. Auch für die ersten Schritte in Hadoop kann ein Docker-Image eingesetzt werden. In Produktion bildet Hadoop in Docker die Ausnahme. Die Basis für die Beispiele in diesem Artikel ist das Docker-Image sequenceiq/hadoop-docker:2.7.0
, in dem alle für Hadoop notwendigen Bestandteile als Single-Node-Installation in einem einzigen Docker-Container gestartet werden. Mit dem Befehl in Listing 1 wird das Docker-Image gestartet.
Beim Starten des Image werden die Ports für die Web-Benutzeroberfläche (50070), das Tracken von Jobs (8088) sowie der Port zum Herunterladen von Ergebnissen (50075) weitergeleitet. Dies ermöglicht später einen Zugriff auf diese Dienste via “localhost”.
Nach dem Ausführen des run-Befehls wird man direkt zur Bash des gestarteten Containers weitergeleitet. Dies ist der Indikator für den erfolgreichen Start des Docker-Containers. Um zu kontrollieren, ob alle Hadoop-Services erfolgreich gestartet wurden, kann ein Beispiel-MapReduce-Job verwendet werden, der von Apache bereits in Hadoop inkludiert wurde. Zum Starten des Jobs, der in Java geschrieben ist, wird der Befehl (siehe Listing 2) in der Bash des Docker-Containers ausgeführt.
Es gibt zwei Wege zur Überprüfung der Ergebnisse. Zum einen können die Ergebnisse mithilfe der Web-UI kontrolliert werden. Hierfür wird in einem Web-Browser die URL “localhost:50070” geöffnet. In der Navigationsleiste ist der Punkt “Utilities” zu finden, der den Punkt “Browse HDFS” beinhaltet. Hier kann das gesamte HDFS erkundet werden. Unter anderem auch die zuvor entstandenen Ergebnisse. Diese sind unter “/user/root/output” zu finden. Es handelt sich um einen Ordner mit zwei Dateien: einem SUCCESS-Deskriptor – eine leere Datei, die anzeigt, dass die Durchführung des MapReduce-Jobs erfolgreich war – sowie einer Datei “part-r-00000”, die die entstandenen Ergebnisse enthält (siehe Abbildung 2).
Zum anderen können die Ergebnisse über die Kommandozeile angesehen werden. Gibt man den Befehl (siehe Listing 3) in der Bash des Docker-Containers ein, so werden die Ergebnisse des Jobs ausgegeben.
MapReduce
Ein guter Anfang, um erste praktische Erfahrungen mit Hadoop zu machen, ist, einen MapReduce-Job mithilfe von Java und Maven zu entwickeln. Aber was ist MapReduce? MapReduce ist, wie bereits erwähnt, eine der fünf Kernkomponenten von Hadoop. Es handelt sich um ein Programmiermodell zur Verarbeitung großer Datenmengen und gliedert sich in zwei Teilphasen: die Map-Phase und die Reduce-Phase [2]. Beide Phasen akzeptieren Schlüssel-Wert-Paare als Eingabe und liefern diese als Ausgabe. In der Map-Phase wird der Datensatz zerlegt, es werden ein neuer Schlüssel extrahiert und ein passender Wert zugeordnet. Wert und Schlüssel richten sich hierbei nach dem Ziel des Jobs. Soll beispielsweise aus einem Haustier-Datensatz mit Identifikationsnummern, Tierarten und Tiernamen die Anzahl der verschiedenen Tierarten ermittelt werden, so ist es denkbar, die Tierart als Schlüssel und die Identifikationsnummer als Wert zu extrahieren, wie in Abbildung 3 dargestellt.
In der Reduce-Phase werden die vom Mapper zerlegten Datensätze zur Erstellung des Ergebnisses wieder zusammengefügt. Für das Haustier-Beispiel würde das bedeuten, die Identifikationsnummern für eine bestimmte Tierart zu zählen und so die Anzahl der Haustiere dieser Tierart zu ermitteln (siehe Abbildung 4).
Optional kann einem MapReduce-Job ein sogenannter “Combiner” zugefügt werden, der die Netzwerklast zwischen Mapper und Reducer reduziert. In den meisten Fällen kann der bereits vorhandene Reducer ebenfalls als Combiner eingesetzt werden. Für die nachfolgenden Beispiele ist ein Combiner nicht vonnöten, da keine Multi-Node-Installation von Hadoop vorliegt und die Netzwerkauslastung kein Problem darstellt.
YARN
Seit Hadoop 2, also seit 2013, gehört YARN zum Kern von Hadoop. Neben der Zuweisung der Systemressourcen für verschiedene Anwendungen kümmert sich YARN außerdem um die Verteilung von Aufgaben auf den Knoten des Clusters [3]. Durch die Entkopplung von Ressourcenmanagement und Planung der MapReduce-Jobs gelang es mithilfe von YARN, die vorherige Implementierung von MapReduce zu verbessern. YARN ermöglicht außerdem die Integration von Alternativen zu MapReduce.
HDFS
Hadoop speichert die Daten im HDFS, dem “Hadoop Distributed File System”. Das HDFS besteht aus zwei Node-Typen, die in einem Master-Worker-Muster zusammenarbeiten. Zum einen gibt es die DataNodes, die für die Speicherung der Daten zuständig sind. Zum anderen gibt es einen NameNode, der die Metadaten des HDFS beinhaltet und “weiß”, wo welche Daten abgelegt sind. Die Datenspeicherung selbst erfolgt in sogenannten Blöcken. Eine Datei wird beim Speichern in einen oder mehrere Blöcke zerlegt, die einzelnen Blöcke können auf verschiedenen DataNodes abgelegt werden [4]. Das Dateisystem von Hadoop ist darauf ausgelegt, effizient wenige große Datensätze zu verarbeiten.
MapReduce mit Java
Zur Realisierung eines MapReduce-Jobs in Java werden zwei Abhängigkeiten via Maven eingebunden: “hadoop-common” und “hadoop-mapreduce-client-core”. Die nachfolgenden Implementierungen basieren auf einem CSV-Datensatz, in dem es unter anderem die Information “Category” gibt. Das Ergebnis des Map-Reduce-Jobs soll das Zählen des Auftretens der einzelnen Kategorien sein.
Im vorigen Abschnitt wurde bereits erwähnt, dass MapReduce-Jobs aus einem Mapper und einem Reducer bestehen. Beide werden mithilfe des von Hadoop bereitgestellten API umgesetzt.
Als Erstes wird der Mapper entwickelt, der von der Hadoop-Klasse Mapper
erbt. Die map
-Methode muss überschrieben werden. Beim Erben von der Basisklasse Mapper
müssen die Variablentypen von Schlüssel und Wert der Eingabe und Ausgabe spezifiziert werden. Die map
-Methode hat die Parameter key
und value
, wobei dieser bei einer CSV-Datei die gesamte Zeile darstellt, sowie einen context
. Der Parameter context
dient zur Weitergabe der Ergebnisse. Im folgenden Beispiel wird die Kategorie einer Zeile separiert und gemeinsam mit der Zeilen-ID (key
) im context
abgelegt. Hadoop verwendet eigene Datentypen, so wird Text
statt String
und LongWritable
statt Long
verwendet (siehe Listing 4).
Auf dieser Basis wird der Reducer implementiert. Er wird die Ergebnisse des Mappers aufnehmen und die Vorkommen einzelner Kategorien zählen. Ein Reducer muss von der Hadoop-Klasse Reducer
erben und die reduce
-Methode überschreiben. Die reduce
-Methode hat als Parameter einen key
, ein Iterable
values
und den context
. Als key
erhält der Reducer die Kategorie, die im Mapper extrahiert wurde, die zugehörigen Zeilen-IDs werden ihm als Iterable
übergeben. Die Aufgabe des Reducer liegt im Zählen der IDs und somit im Reduzieren der Ergebnisse des Mappers. Bei Iteration über die IDs wird das Auftreten der einzelnen Kategorien gezählt. Das Ergebnis wird als Wert vom Typ IntWritable
zu der jeweiligen Kategorie gespeichert (siehe Listing 5).
Neben einem Mapper und einem Reducer benötigt die Java-Implementierung des MapReduce-Jobs einen Einstiegspunkt. Dieser beinhaltet einige zusätzliche Informationen zum MapReduce-Job, wie einen Namen, die Angabe der Klasse des Mappers und des Reducer sowie die Informationen darüber, wo die Eingabedaten liegen und wohin die Ausgabedaten geschrieben werden sollen (siehe Listing 6).
Für die Ausführung des MapReduce-Jobs wird ein jar
benötigt, dieses kann mithilfe von Maven erstellt werden. Nach Ausführen des Maven-Befehls liegt das jar
im target-Ordner und kann von dort in den Docker-Container verschoben werden.
Zum Starten des MapReduce-Jobs dient der gleiche Befehl wie zum Starten des MapReduce-Jobs von Apache, lediglich die Argumente des Befehls müssen angepasst werden (siehe Listing 7).
Wie im Beispiel zuvor können die Ergebnisse via Web-UI oder Kommandozeile betrachtet werden.
MapReduce ohne Java
MapReduce-Jobs können auch in anderen Sprachen als Java implementiert werden. Eine Möglichkeit dafür bietet das Hadoop-Streaming-API. Es arbeitet mit der Unix-Standardeingabe und -ausgabe, dadurch ist die einzige Anforderung an die Programmiersprache, damit umgehen zu können. Das in Java vorhandene, nützliche Feature, dass die Werte für einen Schlüssel dem Reducer als Iterable
übergeben werden, geht hierbei leider verloren, sodass es die Aufgabe des Programmierers ist, die Werte den richtigen Schlüsseln zuzuordnen. Glücklicherweise übergibt das Hadoop-Streaming-API dem Reducer die Schlüssel in geordneter Reihenfolge, es genügt also, den vorangegangenen Schlüssel zwischenzuspeichern. Durch die Verwendung von Standardeingabe und -ausgabe ergibt sich der Vorteil, dass Mapper und Reducer mithilfe von Pipes in der Kommandozeile getestet werden können. Für MapReduce-Jobs, die mit dem Streaming-API implementiert werden, ist es nicht nötig, einen dedizierten Startpunkt zu implementieren. Es ist ausreichend, einen Mapper und einen Reducer zu schreiben, die mit folgendem Befehl (Listing 8) über das in Java implementierte Hadoop-Streaming-API gestartet werden können.
In diesem Beispiel werden ein in Python implementierter Mapper und Reducer aufgerufen, die mit den gleichen Eingabedaten wie die Java-Realisierung arbeiten. Ein dediziertes Verzeichnis speichert die Ausgabedaten. Bei der Verwendung des Hadoop-Streaming-API sollte daran gedacht werden, die Dateien mithilfe von chmod
ausführbar zu machen. Der Mapper geht Zeile für Zeile die Standardeingabe durch, teilt die CSV-Zeile am Separator und schreibt die erhaltene Kategorie, die in der zweiten Spalte der CSV-Zeile steht, sowie eine selbst zugewiesene ID in die Standardausgabe (Listing 9).
Der Reducer ist ähnlich aufgebaut. Er verarbeitet Zeile für Zeile die Standardeingabe, die der Ausgabe des Mappers entspricht. Die Besonderheit des Reducer besteht darin, dass der vorherige Zeilenschlüssel gespeichert werden muss, um nur die zu einem Schlüssel gehörenden Werte zu zählen. Das Ergebnis des Reducer wird in die Standardausgabe geschrieben (siehe Listing 10).
Ergebnisse über HTTP erhalten
Die in den beiden vorangegangenen Abschnitten entstandenen Ergebnisse sollen möglicherweise in einer UI dargestellt werden. Wie kann man die Ergebnisse von außen abfragen? Auf diese Frage liefert HttpFS eine Antwort. Diese Abkürzung steht für “HDFS over HTTP”. Es ermöglicht, alle Operationen, die man auf dem HDFS ausführen kann, über HTTP auszuführen. Für die Erstellung einer UI, die die Ergebnisse veranschaulicht, genügen zwei Operationen des HDFS: das Browsen durch Ordnerstrukturen und das Öffnen von Dateien. Hier werden diese als Beispiele mithilfe von “curl” gezeigt. Das Browsen von Ordnerstrukturen erfolgt durch die Verwendung des Öffnungsmodus LISTSTATUS
(siehe Listing 11).
Die Antwort wird in JSON übertragen und enthält ein Array mit Dateien innerhalb des Ordners. Es werden unter anderem der Dateiname (pathSuffix
) und der Typ der Datei (type
) übertragen. Eine Datei kann vom Typ FILE
oder DIRECTORY
sein. Ein DIRECTORY
kann erneut durchsucht werden, während ein FILE
wie folgt geöffnet werden kann (siehe Listing 12).
Resümee
Trotz des großen Umfangs von Hadoop ist es möglich, erste Schritte mit “dem Elefanten” am heimischen Rechner zu wagen. Dabei hilft der “Wal” und komprimiert den grauen beziehungsweise gelben Riesen in einen handlichen Container. Die eigentlichen Stärken von Hadoop, wie die Verarbeitung großer, unstrukturierter Datenmengen, können auf diese Weise zwar nicht demonstriert werden. Die Grundlagen von MapReduce und HDFS können allerdings leicht zugänglich weitergegeben werden.
Quellen
-
Tom White (2015): Hadoop: The Definitive Guide (4th Edition). O’REILLY. ↩
-
Jeffrey Dean, Sanjay Ghemawat (2004): MapReduce: Simplified Data Processing on Large Clusters. https:/static. googleusercontent.com/media/research.google.com/en/ archive/mapreduce–osdi04.pdf. ↩
-
Alex Holmes (2015): Hadoop In Practice (2nd Edition). Manning. ↩