This article is also available in English

Im modernen Data Engineering ist ein Wandel weg von der Entwicklung monolithischer Datenpipelines und hin zu modularen Datenprodukten eingetreten.

Datenprodukt-Komponenten
Datenprodukt-Komponenten

Ein Datenprodukt ist das zu liefernde Ergebnis, das alles in Verbindung mit einem Geschäftskonzept zur Erfüllung der Bedürfnisse eines Datenverbrauchers enthält:

Ein Datenprodukt wird üblicherweise in einem Git-Datenspeicher verwaltet. Databricks ist eine der beliebtesten modernen Datenplattformen. Wie können wir ein professionelles Datenprodukt mit Databricks entwickeln?

Ein als Databricks Asset Bundle implementierter Workflow-Auftrag
Ein als Databricks Asset Bundle implementierter Workflow-Auftrag

In diesem Artikel werden wir Data Contracts und die neuen Databricks Asset Bundles nutzen, die hervorragend miteinander kombiniert werden können, um Datenprodukte zu implementieren. Der gesamte Quellcode dieses Beispielprojekts ist auf GitHub verfügbar.

Definieren des Data Contract

Bevor wir mit der Implementierung beginnen, erörtern und definieren wir die geschäftlichen Anforderungen. Was benötigt unser Datenverbraucher von uns, wie stellt sich sein Use Case dar, was erwartet er als Datenmodell. Und wir müssen sicherstellen, dass wir von derselben Semantik und denselben Qualitätserwartungen ausgehen und dieselben Service-Level erwarten.

Wir nennen diesen Ansatz Contract-First. Wir beginnen mit der Gestaltung der Schnittstelle des gelieferten Datenmodells und seiner Metadaten als Data Contract. Wir nutzen den Data Contract, um die Implementierung voranzutreiben.

In unserem Beispiel möchte der Geschäftsführer eines E-Commerce-Unternehmens wissen, ob ein Problem mit Artikeln besteht, die über einen längeren Zeitraum nicht verkauft werden, d. h. Artikel ohne Umsatz in den letzten drei Monaten – sogenannte Ladenhüter.

Wir definieren in Zusammenarbeit mit dem Datenverbraucher einen Data Contract als YAML unter Verwendung der Data Contract Specification.

dataContractSpecification: 0.9.3
id: urn:datacontract:fulfillment:stock-last-sales
info:
  title: Last Sales
  version: 1.0.0
  description: |
    The data model contains all articles that are in stock.
    For every article the last sale timestamp is defined.
  owner: Fulfillment
  contact:
    name: John Doe (Data Product Owner)
    url: https://teams.microsoft.com/l/channel/19%3Ad7X0bzOrUrZ-QAGu0nTxlWACe5HOQ-8Joql71A_00000%40thread.tacv2/General?groupId=4d213734-d5a1-4130-8024-00000000&tenantId=b000e7de-5c4d-41a2-9e67-00000000
servers:
  development:
    type: databricks
    host: dbc-abcdefgh-1234.cloud.databricks.com
    catalog: acme
    schema: stock-last-sales
terms:
  usage: >
    Data can be used for reports, analytics and machine learning use cases.
    Order may be linked and joined by other tables
  limitations: >
    Not suitable for real-time use cases.
  billing: free
  noticePeriod: P3M
models:
  articles:
    description: One record per article that is currently in stock
    type: table
    fields:
      sku:
        description: The article number (stock keeping unit)
        type: string
        primary: true
        pattern: ^[A-Za-z0-9]{8,14}$
        minLength: 8
        maxLength: 14
        example: "96385074"
      quantity:
        description: The total amount of articles that are currently in stock in all warehouses.
        type: long
        minimum: 1
        required: true
      last_sale_timestamp:
        description: The business timestamp in UTC when there was the last sale for this article. Null means that the article was never sold.
        type: timestamp
      processing_timestamp:
        description: The technical timestamp in UTC when this row was updated
        type: timestamp
        required: true
servicelevels:
  availability:
    percentage: 99.9%
  retention:
    period: 1 year
  freshness:
    threshold: 25 hours
    timestampField: articles.processing_timestamp
  frequency:
    description: Data is updated once a day
    type: batch
    cron: 0 0 * * *
examples:
  - type: csv
    data: |
      sku,quantity,last_sale_timestamp,processing_timestamp
      1234567890123,5,2024-02-25T16:16:30.171798,2024-03-25T16:16:30.171807
      2345678901234,10,,2024-03-25T15:16:30.171811
      3456789012345,15,2024-03-02T12:16:30.171814,2024-03-25T14:16:30.171816
      4567890123456,20,,2024-03-25T13:16:30.171817
      5678901234567,25,2024-03-08T08:16:30.171819,2024-03-25T12:16:30.171821
      6789012345678,30,,2024-03-25T11:16:30.171823
      7890123456789,35,2024-03-14T04:16:30.171824,2024-03-25T10:16:30.171826
      8901234567890,40,,2024-03-25T09:16:30.171830
      9012345678901,45,2024-03-20T00:16:30.171833,2024-03-25T08:16:30.171835
      0123456789012,50,,2024-03-25T07:16:30.171837
quality:
  type: SodaCL
  specification:
    checks for articles:
      - row_count > 1000
datacontract.yaml

Der Datensatz wird alle derzeit im Bestand befindlichen Artikel beinhalten und umfasst last_sale_timestamp, das für den Geschäftsführer relevanteste Attribut. Der Geschäftsführer kann in seinem BI-Tool (wie etwa PowerBI, Redash etc.) nach Artikeln mit einem last_sale_timestamp älter als drei Monate filtern. Bedingungs- und Service-Level-Attribute weisen darauf hin, dass der Datensatz täglich um Mitternacht aktualisiert wird.

Erstellen des Databricks Asset Bundle

Nun ist es an der Zeit, ein Datenprodukt zu entwickeln, das diesen Data Contract implementiert. Databricks fügte vor Kurzem das Konzept der Databricks Asset Bundles hinzu, die sich perfekt eignen, um Datenprodukte zu strukturieren und zu entwickeln. Zum Zeitpunkt der Abfassung im März 2024 sind diese öffentlich als Vorschauversion verfügbar, d. h., sie sind bereit für die Nutzung in der Produktion.

Databricks Asset Bundles beinhalten alle Infrastruktur- und Codedateien, um tatsächlich Daten in Databricks umzuwandeln:

Die Databricks CLI bündelt diese Assets und setzt sie in der Databricks-Plattform ein, intern wird Terraform verwendet. Asset Bundles sind gut in die Databricks-Plattform integriert, d. h., es ist nicht möglich, Code oder Aufträge direkt in Databricks zu bearbeiten, was eine strenge Versionskontrolle der gesamten Code- und Pipeline-Konfiguration ermöglicht.

Bundles sind extrem nützlich, wenn mehrere Umgebungen bestehen, wie etwa Entwicklung, Staging und Produktion. Sie können dasselbe Bundle für mehrere Ziele mit verschiedenen Konfigurationen einsetzen.

Um ein Bundle zu erstellen, muss es in einem neuen Bundle initiiert werden:

databricks bundle init

Wir verwenden diese Konfiguration:

Werfen wir hinsichtlich der Bundle-Struktur einen kurzen Blick auf die relevantesten Dateien:

Anmerkung: Wir empfehlen, ein internes Bundle beizubehalten, das die Namenskonventionen, allgemeinen Vorschriften, Best Practices und Integrationen des Unternehmens enthält.

Mit Asset Bundles können wir unseren Code auch lokal in unserer bevorzugten IDE schreiben, wie etwa VS Code (unter Verwendung der Databricks-Erweiterung für Visual Studio Code), PyCharm oder IntelliJ IDEA (unter Verwendung von Databricks Connect).

Databricks Asset Bundle in IntelliJ IDEA
Databricks Asset Bundle in IntelliJ IDEA

Um eine lokale Python-Umgebung einzurichten, können wir venv verwenden und die Entwicklungsabhängigkeiten installieren:

python3.10 -m venv .venv
source .venv/bin/activate
pip install -r requirements-dev.txt

Generierung der Unity-Catalog-Tabelle

Wie organisieren wir die Daten für unser Datenprodukt? In diesem Beispiel verwenden wir Unity Catalog, um Speicher als verwaltete Tabellen zu handhaben. Auf Isolationsebene entscheiden wir, dass ein Datenprodukt ein Schema in Unity Catalog repräsentieren könnte.

Wir können den Data Contract YAML nutzen, um Infrastruktur-Code zu generieren:

Das Modell definiert die Tabellenstruktur des Zieldatenmodells. Mit dem Data Contract CLI tool, können wir den SQL-DDL-Code für die Anweisung CREATE TABLE generieren.

datacontract export --format sql datacontract.yaml

-- Data Contract: urn:datacontract:fulfillment:stock-last-sales
-- SQL Dialect: databricks
CREATE OR REPLACE TABLE acme.stock_last_sales.articles (
  sku STRING primary key,
  quantity BIGINT not null,
  last_sale_timestamp TIMESTAMP,
  processing_timestamp TIMESTAMP not null
);

Das Data Contract CLI tool ist ebenfalls als eine Python Library datacontract-cli verfügbar. Wir fügen es zu requirements-dev.txt hinzu und verwenden es direkt in einem Databricks-Notebook, um die Tabelle in Unity Catalog zu erstellen:

Nutzung der Data Contract CLI zur Erstellung der Unity-Catalog-Tabelle
Nutzung der Data Contract CLI zur Erstellung der Unity-Catalog-Tabelle

Die Unity-Catalog-Tabelle ist eine verwaltete Tabelle, die intern zur effizienten Speicherung das Delta-Format nutzt.

Die erstellte Tabelle in Unity Catalog

Entwickeln des Umwandlungscodes

Jetzt schreiben wir die Hauptumwandlungslogik. Mit Python-basierten Databricks Asset Bundles können wir unsere Daten-Pipelines entwickeln als:

In diesem Datenprodukt werden wir einfache Python-Dateien für unsere Hauptumwandlungslogik schreiben, die als Wheel-Pakete eingesetzt werden.

Unsere Umwandlung nimmt alle verfügbaren Bestände, die wir von einem Eingabe-Port wie etwa dem Betriebssystem erhalten, das die aktuellen Bestandsdaten verwaltet, und fügt den Datenframe mit dem Zeitstempel des letzten Verkaufs für jede SKU per Left-Join hinzu. Die Umsatzinformationen sind ebenfalls ein Eingabe-Port, z. B. ein weiteres vorgelagertes Datenprodukt, das von dem Kassenteam bereitgestellt wird. Wir speichern den resultierenden Datenframe in der zuvor generierten Tabellenstruktur.

Umwandlungscode als reiner Python-Code
Umwandlungscode als reiner Python-Code

Dank dieser Option bleibt der Code wiederverwendbar, ist leicht mit Unit-Tests zu testen und wir können ihn auf unseren lokalen Computern ausführen. Als professionelle Datentechniker stellen wir sicher, dass die Funktion calculate_last_sales wie erwartet ausgeführt wird, indem sie gute Unit-Tests schreibt.

Unit-Tests
Unit-Tests

Wir aktualisieren die Auftragskonfiguration, um den Python-Code als python_wheel_task auszuführen, und konfigurieren das Steuerprogramm und den entsprechenden Rechencluster.

# The main job for stock_last_sales. resources:
  jobs:
    stock_last_sales_job:
      name: stock_last_sales_job

      schedule:
        # Run every day at midnight
        quartz_cron_expression: '0 0 0 * * ?'
        timezone_id: Europe/Amsterdam

      tasks:
        - task_key: create_unity_catalog_table
          job_cluster_key: job_cluster
          notebook_task:
            notebook_path: ../src/create_unity_catalog_table.ipynb
          libraries:
            - pypi:
                package: datacontract-cli

        - task_key: main_task
          depends_on:
            - task_key: create_unity_catalog_table
          job_cluster_key: job_cluster
          python_wheel_task:
            package_name: stock_last_sales
            entry_point: main
          libraries:
            - whl: ../dist/*.whl

      job_clusters:
        - job_cluster_key: job_cluster
          new_cluster:
            spark_version: 13.3.x-scala2.12
            node_type_id: i3.xlarge
            autoscale:
                min_workers: 1
                max_workers: 4
_resources/stock_last_sales_job.yml

Wenn wir uns sicher sind, können wir das Bundle in unseren Databricks-dev-Instanzen (derzeit manuell) einsetzen:

databricks bundle deploy

Wir lösen eine manuelle Ausführung unseres Workflows aus:

databricks bundle run stock_last_sales_job

In Databricks können wir sehen, dass der Workflow erfolgreich ausgeführt wurde:

Ein Job in Databricks
Ein Job in Databricks

Die Daten sind jetzt in der zuvor erstellten Tabelle enthalten.

Das Ergebnis unserer Daten-Pipeline
Das Ergebnis unserer Daten-Pipeline

Testen des Data Contract

Unsere Aufgabe ist noch nicht ganz beendet. Woher wissen wir, dass die Daten korrekt sind? Während die Unit-Tests uns Sicherheit hinsichtlich des Umwandlungscodes geben, benötigen wir auch eine Abnahmeprüfung, um zu überprüfen, ob wir den vereinbarten Data Contract korrekt implementiert haben.

Für diese Prüfung können wir das Data Contract CLI tool verwenden:

export DATACONTRACT_DATABRICKS_HTTP_PATH=/sql/1.0/warehouses/b053xxxxxxx
export DATACONTRACT_DATABRICKS_TOKEN=dapia1926f7c64b7595880909xxxxxxxxxx
datacontract test datacontract.yaml

Das datacontract Tool verwendet alle Schema- und Formatinformationen aus _model_, den Qualitätsattributen und den Metadaten und vergleicht diese mit dem tatsächlichen Datensatz. Es liest die Verbindungsdetails aus dem Serverabschnitt und verbindet sich mit Databricks, führt alle Prüfungen aus und bietet eine umfassende Übersicht.

Testergebnisse Data Contract
Testergebnisse Data Contract

Wir möchten diesen Test mit jeder Pipelineausführung durchführen und erstellen daher erneut eine Notebook-Aufgabe für den Test:

Data Contract Test als Notebook
Data Contract Test als Notebook

Einsatz mit CI/CD

Für einen automatischen Test wird das Asset Bundle auf Databricks bereitgestellt, und nach der einmaligen Ausführung des Auftrags richten wir eine CI/CD-Pipeline in GitHub unter Verwendung einer GitHub Aktion ein.

name: "Deploy Databricks Assets Bundle"
on:
  workflow_dispatch:
  push:
    branches: [ "main" ]

jobs:
  test:
    name: "Run unit tests"
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python ${{matrix.python-version}}
        uses: actions/setup-python@v5
        with:
          python-version: "3.10"
      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements-dev.txt
      - name: Test with pytest
        run: pytest
        env:
          DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
          DATABRICKS_TOKEN: ${{ secrets.SP_TOKEN }}
          DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}

  deploy:
    name: "Deploy bundle to DEV"
    runs-on: ubuntu-latest

    needs:
      - test

    steps:
      - uses: actions/checkout@v4
      - uses: databricks/setup-cli@main
      - run: databricks bundle deploy
        working-directory: .
        env:
          DATABRICKS_TOKEN: ${{ secrets.SP_TOKEN }}
          DATABRICKS_BUNDLE_ENV: dev
  run_pipieline:
    name: "Run pipeline"
    runs-on: ubuntu-latest

    needs:
      - deploy

    steps:
      - uses: actions/checkout@v4
      - uses: databricks/setup-cli@main
      - run: databricks bundle run stock_last_sales_job --refresh-all
        working-directory: .
        env:
          DATABRICKS_TOKEN: ${{ secrets.SP_TOKEN }}
          DATABRICKS_BUNDLE_ENV: dev
.github/workflows/deploy.yml

Jedes Mal, wenn wir den Code aktualisieren, wird das Asset Bundle automatisch auf Databricks bereitgestellt.

CI/CD-Workflow in GitHub
CI/CD-Workflow in GitHub

Metadaten veröffentlichen

Damit andere die Datenprodukte finden, verstehen und ihnen vertrauen können, wollen wir sie in einem Datenproduktverzeichnis registrieren.

In diesem Beispiel verwenden wir den Data Mesh Manager – eine Plattform für das Registrieren, Verwalten und Entdecken von Datenprodukten, Data Contracts und globalen Vorschriften.

Metadaten in Data Mesh Manager
Metadaten in Data Mesh Manager

Wir erstellen erneut eine Notebook-Aufgabe (oder Python-Codeaufgabe), um die Metadaten im Data Mesh Manager zu veröffentlichen, und fügen dann die Aufgabe zu unserem Workflow hinzu. Wir können Databricks Secrets verwenden, um den API-Schlüssel in Databricks bereitzustellen.

databricks secrets create-scope datamesh_manager
databricks secrets put-secret datamesh_manager api_key
Metadaten veröffentlichen
Metadaten veröffentlichen

Zusammenfassung

Jetzt kann sich der Geschäftsführer über ein BI-Tool (wie etwa PowerBI, Tableau, Redash oder in Databricks) mit dieser Tabelle verbinden und seine Frage bezüglich des Artikels beantworten.

Analysen in Databricks
Analysen in Databricks

Databricks Asset Bundles sind perfekt geeignet, um professionelle Datenprodukte auf Databricks zu entwickeln, da es alle Ressourcen und Konfigurationen (Code, Tests, Speicher, Berechnen, Steuerungsprogramm, Metadaten etc.) bündelt, die benötigt werden, um den Datenverbrauchern Datensätze in hoher Qualität zu liefern.

Es ist einfach, Data Contracts zur Definition der Anforderungen und die Data Contract CLI zur Automatisierung von Abnahmeprüfungen zu integrieren.

Der Quellcode für das Beispielprojekt ist auf GitHub verfügbar.

Mehr erfahren