This article is also available in English
Im modernen Data Engineering ist ein Wandel weg von der Entwicklung monolithischer Datenpipelines und hin zu modularen Datenprodukten eingetreten.
Ein Datenprodukt ist das zu liefernde Ergebnis, das alles in Verbindung mit einem Geschäftskonzept zur Erfüllung der Bedürfnisse eines Datenverbrauchers enthält:
- Tabellen zur Speicherung der Daten
- Code zur Umwandlung von Daten
- Tests, um zu überprüfen und zu überwachen, ob die Daten korrekt sind
- Ausgabe-Ports, um den Zugriff auf die Daten zu ermöglichen
- Eingabe-Ports, um Daten aus Quellsystemen aufzunehmen oder auf andere Datenprodukte zuzugreifen
- Data Contracts zur Beschreibung der API
- Dokumentation
- Meta-Informationen wie etwa Eigentum
Ein Datenprodukt wird üblicherweise in einem Git-Datenspeicher verwaltet. Databricks ist eine der beliebtesten modernen Datenplattformen. Wie können wir ein professionelles Datenprodukt mit Databricks entwickeln?
In diesem Artikel werden wir Data Contracts und die neuen Databricks Asset Bundles nutzen, die hervorragend miteinander kombiniert werden können, um Datenprodukte zu implementieren. Der gesamte Quellcode dieses Beispielprojekts ist auf GitHub verfügbar.
Definieren des Data Contract
Bevor wir mit der Implementierung beginnen, erörtern und definieren wir die geschäftlichen Anforderungen. Was benötigt unser Datenverbraucher von uns, wie stellt sich sein Use Case dar, was erwartet er als Datenmodell. Und wir müssen sicherstellen, dass wir von derselben Semantik und denselben Qualitätserwartungen ausgehen und dieselben Service-Level erwarten.
Wir nennen diesen Ansatz Contract-First. Wir beginnen mit der Gestaltung der Schnittstelle des gelieferten Datenmodells und seiner Metadaten als Data Contract. Wir nutzen den Data Contract, um die Implementierung voranzutreiben.
In unserem Beispiel möchte der Geschäftsführer eines E-Commerce-Unternehmens wissen, ob ein Problem mit Artikeln besteht, die über einen längeren Zeitraum nicht verkauft werden, d. h. Artikel ohne Umsatz in den letzten drei Monaten – sogenannte Ladenhüter.
Wir definieren in Zusammenarbeit mit dem Datenverbraucher einen Data Contract als YAML unter Verwendung der Data Contract Specification.
dataContractSpecification: 0.9.3
id: urn:datacontract:fulfillment:stock-last-sales
info:
title: Last Sales
version: 1.0.0
description: |
The data model contains all articles that are in stock.
For every article the last sale timestamp is defined.
owner: Fulfillment
contact:
name: John Doe (Data Product Owner)
url: https://teams.microsoft.com/l/channel/19%3Ad7X0bzOrUrZ-QAGu0nTxlWACe5HOQ-8Joql71A_00000%40thread.tacv2/General?groupId=4d213734-d5a1-4130-8024-00000000&tenantId=b000e7de-5c4d-41a2-9e67-00000000
servers:
development:
type: databricks
host: dbc-abcdefgh-1234.cloud.databricks.com
catalog: acme
schema: stock-last-sales
terms:
usage: >
Data can be used for reports, analytics and machine learning use cases.
Order may be linked and joined by other tables
limitations: >
Not suitable for real-time use cases.
billing: free
noticePeriod: P3M
models:
articles:
description: One record per article that is currently in stock
type: table
fields:
sku:
description: The article number (stock keeping unit)
type: string
primary: true
pattern: ^[A-Za-z0-9]{8,14}$
minLength: 8
maxLength: 14
example: "96385074"
quantity:
description: The total amount of articles that are currently in stock in all warehouses.
type: long
minimum: 1
required: true
last_sale_timestamp:
description: The business timestamp in UTC when there was the last sale for this article. Null means that the article was never sold.
type: timestamp
processing_timestamp:
description: The technical timestamp in UTC when this row was updated
type: timestamp
required: true
servicelevels:
availability:
percentage: 99.9%
retention:
period: 1 year
freshness:
threshold: 25 hours
timestampField: articles.processing_timestamp
frequency:
description: Data is updated once a day
type: batch
cron: 0 0 * * *
examples:
- type: csv
data: |
sku,quantity,last_sale_timestamp,processing_timestamp
1234567890123,5,2024-02-25T16:16:30.171798,2024-03-25T16:16:30.171807
2345678901234,10,,2024-03-25T15:16:30.171811
3456789012345,15,2024-03-02T12:16:30.171814,2024-03-25T14:16:30.171816
4567890123456,20,,2024-03-25T13:16:30.171817
5678901234567,25,2024-03-08T08:16:30.171819,2024-03-25T12:16:30.171821
6789012345678,30,,2024-03-25T11:16:30.171823
7890123456789,35,2024-03-14T04:16:30.171824,2024-03-25T10:16:30.171826
8901234567890,40,,2024-03-25T09:16:30.171830
9012345678901,45,2024-03-20T00:16:30.171833,2024-03-25T08:16:30.171835
0123456789012,50,,2024-03-25T07:16:30.171837
quality:
type: SodaCL
specification:
checks for articles:
- row_count > 1000
Der Datensatz wird alle derzeit im Bestand befindlichen Artikel beinhalten und umfasst last_sale_timestamp
, das für den Geschäftsführer relevanteste Attribut. Der Geschäftsführer kann in seinem BI-Tool (wie etwa PowerBI, Redash etc.) nach Artikeln mit einem last_sale_timestamp
älter als drei Monate filtern. Bedingungs- und Service-Level-Attribute weisen darauf hin, dass der Datensatz täglich um Mitternacht aktualisiert wird.
Erstellen des Databricks Asset Bundle
Nun ist es an der Zeit, ein Datenprodukt zu entwickeln, das diesen Data Contract implementiert. Databricks fügte vor Kurzem das Konzept der Databricks Asset Bundles hinzu, die sich perfekt eignen, um Datenprodukte zu strukturieren und zu entwickeln. Zum Zeitpunkt der Abfassung im März 2024 sind diese öffentlich als Vorschauversion verfügbar, d. h., sie sind bereit für die Nutzung in der Produktion.
Databricks Asset Bundles beinhalten alle Infrastruktur- und Codedateien, um tatsächlich Daten in Databricks umzuwandeln:
- Infrastruktur-Ressourcen
- Arbeitsbereich-Konfiguration
- Quelldateien wie Notebooks und Python-Skripte
- Unit-Tests
Die Databricks CLI bündelt diese Assets und setzt sie in der Databricks-Plattform ein, intern wird Terraform verwendet. Asset Bundles sind gut in die Databricks-Plattform integriert, d. h., es ist nicht möglich, Code oder Aufträge direkt in Databricks zu bearbeiten, was eine strenge Versionskontrolle der gesamten Code- und Pipeline-Konfiguration ermöglicht.
Bundles sind extrem nützlich, wenn mehrere Umgebungen bestehen, wie etwa Entwicklung, Staging und Produktion. Sie können dasselbe Bundle für mehrere Ziele mit verschiedenen Konfigurationen einsetzen.
Um ein Bundle zu erstellen, muss es in einem neuen Bundle initiiert werden:
databricks bundle init
Wir verwenden diese Konfiguration:
- Zu verwendete Vorlage: default-python
- Eindeutiger Name für dieses Projekt: stock_last_sales
- Einfügen eines Stub-(Sample-)Notebooks in
stock_last_sales/src
: yes - Einfügen einer Stub (Sample) Delta Live Tables Pipeline in
stock_last_sales/src
: no - Einfügen eines Stub-(Sample-)Python-Pakets in
stock_last_sales/src
: yes
Werfen wir hinsichtlich der Bundle-Struktur einen kurzen Blick auf die relevantesten Dateien:
- databricks.yml Die Bundle-Konfiguration und Einsatzziele
- src/ Der Ordner für den Umwandlungscode
- tests/ Der Ordner für die Platzierung der Unit-Tests
- resources/ Die Auftragsdefinition für die Definition des Workflows
Anmerkung: Wir empfehlen, ein internes Bundle beizubehalten, das die Namenskonventionen, allgemeinen Vorschriften, Best Practices und Integrationen des Unternehmens enthält.
Mit Asset Bundles können wir unseren Code auch lokal in unserer bevorzugten IDE schreiben, wie etwa VS Code (unter Verwendung der Databricks-Erweiterung für Visual Studio Code), PyCharm oder IntelliJ IDEA (unter Verwendung von Databricks Connect).
Um eine lokale Python-Umgebung einzurichten, können wir venv verwenden und die Entwicklungsabhängigkeiten installieren:
python3.10 -m venv .venv
source .venv/bin/activate
pip install -r requirements-dev.txt
Generierung der Unity-Catalog-Tabelle
Wie organisieren wir die Daten für unser Datenprodukt? In diesem Beispiel verwenden wir Unity Catalog, um Speicher als verwaltete Tabellen zu handhaben. Auf Isolationsebene entscheiden wir, dass ein Datenprodukt ein Schema in Unity Catalog repräsentieren könnte.
Wir können den Data Contract YAML nutzen, um Infrastruktur-Code zu generieren:
Das Modell definiert die Tabellenstruktur des Zieldatenmodells. Mit dem Data Contract CLI tool, können wir den SQL-DDL-Code für die Anweisung CREATE TABLE generieren.
datacontract export --format sql datacontract.yaml
-- Data Contract: urn:datacontract:fulfillment:stock-last-sales
-- SQL Dialect: databricks
CREATE OR REPLACE TABLE acme.stock_last_sales.articles (
sku STRING primary key,
quantity BIGINT not null,
last_sale_timestamp TIMESTAMP,
processing_timestamp TIMESTAMP not null
);
Das Data Contract CLI tool ist ebenfalls als eine Python Library datacontract-cli verfügbar. Wir fügen es zu requirements-dev.txt hinzu und verwenden es direkt in einem Databricks-Notebook, um die Tabelle in Unity Catalog zu erstellen:
Die Unity-Catalog-Tabelle ist eine verwaltete Tabelle, die intern zur effizienten Speicherung das Delta-Format nutzt.
Entwickeln des Umwandlungscodes
Jetzt schreiben wir die Hauptumwandlungslogik. Mit Python-basierten Databricks Asset Bundles können wir unsere Daten-Pipelines entwickeln als:
- Databricks-Notebooks,
- Delta-Live-Tabellen oder
- Python-Dateien
In diesem Datenprodukt werden wir einfache Python-Dateien für unsere Hauptumwandlungslogik schreiben, die als Wheel-Pakete eingesetzt werden.
Unsere Umwandlung nimmt alle verfügbaren Bestände, die wir von einem Eingabe-Port wie etwa dem Betriebssystem erhalten, das die aktuellen Bestandsdaten verwaltet, und fügt den Datenframe mit dem Zeitstempel des letzten Verkaufs für jede SKU per Left-Join hinzu. Die Umsatzinformationen sind ebenfalls ein Eingabe-Port, z. B. ein weiteres vorgelagertes Datenprodukt, das von dem Kassenteam bereitgestellt wird. Wir speichern den resultierenden Datenframe in der zuvor generierten Tabellenstruktur.
Dank dieser Option bleibt der Code wiederverwendbar, ist leicht mit Unit-Tests zu testen und wir können ihn auf unseren lokalen Computern ausführen. Als professionelle Datentechniker stellen wir sicher, dass die Funktion calculate_last_sales
wie erwartet ausgeführt wird, indem sie gute Unit-Tests schreibt.
Wir aktualisieren die Auftragskonfiguration, um den Python-Code als python_wheel_task
auszuführen, und konfigurieren das Steuerprogramm und den entsprechenden Rechencluster.
# The main job for stock_last_sales. resources:
jobs:
stock_last_sales_job:
name: stock_last_sales_job
schedule:
# Run every day at midnight
quartz_cron_expression: '0 0 0 * * ?'
timezone_id: Europe/Amsterdam
tasks:
- task_key: create_unity_catalog_table
job_cluster_key: job_cluster
notebook_task:
notebook_path: ../src/create_unity_catalog_table.ipynb
libraries:
- pypi:
package: datacontract-cli
- task_key: main_task
depends_on:
- task_key: create_unity_catalog_table
job_cluster_key: job_cluster
python_wheel_task:
package_name: stock_last_sales
entry_point: main
libraries:
- whl: ../dist/*.whl
job_clusters:
- job_cluster_key: job_cluster
new_cluster:
spark_version: 13.3.x-scala2.12
node_type_id: i3.xlarge
autoscale:
min_workers: 1
max_workers: 4
Wenn wir uns sicher sind, können wir das Bundle in unseren Databricks-dev-Instanzen (derzeit manuell) einsetzen:
databricks bundle deploy
Wir lösen eine manuelle Ausführung unseres Workflows aus:
databricks bundle run stock_last_sales_job
In Databricks können wir sehen, dass der Workflow erfolgreich ausgeführt wurde:
Die Daten sind jetzt in der zuvor erstellten Tabelle enthalten.
Testen des Data Contract
Unsere Aufgabe ist noch nicht ganz beendet. Woher wissen wir, dass die Daten korrekt sind? Während die Unit-Tests uns Sicherheit hinsichtlich des Umwandlungscodes geben, benötigen wir auch eine Abnahmeprüfung, um zu überprüfen, ob wir den vereinbarten Data Contract korrekt implementiert haben.
Für diese Prüfung können wir das Data Contract CLI tool verwenden:
export DATACONTRACT_DATABRICKS_HTTP_PATH=/sql/1.0/warehouses/b053xxxxxxx
export DATACONTRACT_DATABRICKS_TOKEN=dapia1926f7c64b7595880909xxxxxxxxxx
datacontract test datacontract.yaml
Das datacontract Tool verwendet alle Schema- und Formatinformationen aus _model_, den Qualitätsattributen und den Metadaten und vergleicht diese mit dem tatsächlichen Datensatz. Es liest die Verbindungsdetails aus dem Serverabschnitt und verbindet sich mit Databricks, führt alle Prüfungen aus und bietet eine umfassende Übersicht.
Wir möchten diesen Test mit jeder Pipelineausführung durchführen und erstellen daher erneut eine Notebook-Aufgabe für den Test:
Einsatz mit CI/CD
Für einen automatischen Test wird das Asset Bundle auf Databricks bereitgestellt, und nach der einmaligen Ausführung des Auftrags richten wir eine CI/CD-Pipeline in GitHub unter Verwendung einer GitHub Aktion ein.
name: "Deploy Databricks Assets Bundle"
on:
workflow_dispatch:
push:
branches: [ "main" ]
jobs:
test:
name: "Run unit tests"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{matrix.python-version}}
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-dev.txt
- name: Test with pytest
run: pytest
env:
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.SP_TOKEN }}
DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}
deploy:
name: "Deploy bundle to DEV"
runs-on: ubuntu-latest
needs:
- test
steps:
- uses: actions/checkout@v4
- uses: databricks/setup-cli@main
- run: databricks bundle deploy
working-directory: .
env:
DATABRICKS_TOKEN: ${{ secrets.SP_TOKEN }}
DATABRICKS_BUNDLE_ENV: dev
run_pipieline:
name: "Run pipeline"
runs-on: ubuntu-latest
needs:
- deploy
steps:
- uses: actions/checkout@v4
- uses: databricks/setup-cli@main
- run: databricks bundle run stock_last_sales_job --refresh-all
working-directory: .
env:
DATABRICKS_TOKEN: ${{ secrets.SP_TOKEN }}
DATABRICKS_BUNDLE_ENV: dev
Jedes Mal, wenn wir den Code aktualisieren, wird das Asset Bundle automatisch auf Databricks bereitgestellt.
Metadaten veröffentlichen
Damit andere die Datenprodukte finden, verstehen und ihnen vertrauen können, wollen wir sie in einem Datenproduktverzeichnis registrieren.
In diesem Beispiel verwenden wir den Data Mesh Manager – eine Plattform für das Registrieren, Verwalten und Entdecken von Datenprodukten, Data Contracts und globalen Vorschriften.
Wir erstellen erneut eine Notebook-Aufgabe (oder Python-Codeaufgabe), um die Metadaten im Data Mesh Manager zu veröffentlichen, und fügen dann die Aufgabe zu unserem Workflow hinzu. Wir können Databricks Secrets verwenden, um den API-Schlüssel in Databricks bereitzustellen.
databricks secrets create-scope datamesh_manager
databricks secrets put-secret datamesh_manager api_key
Zusammenfassung
Jetzt kann sich der Geschäftsführer über ein BI-Tool (wie etwa PowerBI, Tableau, Redash oder in Databricks) mit dieser Tabelle verbinden und seine Frage bezüglich des Artikels beantworten.
Databricks Asset Bundles sind perfekt geeignet, um professionelle Datenprodukte auf Databricks zu entwickeln, da es alle Ressourcen und Konfigurationen (Code, Tests, Speicher, Berechnen, Steuerungsprogramm, Metadaten etc.) bündelt, die benötigt werden, um den Datenverbrauchern Datensätze in hoher Qualität zu liefern.
Es ist einfach, Data Contracts zur Definition der Anforderungen und die Data Contract CLI zur Automatisierung von Abnahmeprüfungen zu integrieren.
Der Quellcode für das Beispielprojekt ist auf GitHub verfügbar.