This article is also available in English

Im modernen Data Engineering ist ein Wandel weg von der Entwicklung monolithischer Datenpipelines und hin zu modularen Datenprodukten eingetreten.

Datenprodukt-Komponenten
Datenprodukt-Komponenten

Ein Datenprodukt ist das zu liefernde Ergebnis, das alles in Verbindung mit einem Geschäftskonzept zur Erfüllung der Bedürfnisse eines Datenverbrauchers enthält:

Ein Datenprodukt wird üblicherweise in einem Git-Datenspeicher verwaltet. Databricks ist eine der beliebtesten modernen Datenplattformen. Wie können wir ein professionelles Datenprodukt mit Databricks entwickeln?

Ein als Databricks Asset Bundle implementierter Workflow-Auftrag
Ein als Databricks Asset Bundle implementierter Workflow-Auftrag

In diesem Artikel werden wir Data Contracts und die neuen Databricks Asset Bundles nutzen, die hervorragend miteinander kombiniert werden können, um Datenprodukte zu implementieren. Der gesamte Quellcode dieses Beispielprojekts ist auf GitHub verfügbar.

Definieren des Data Contract

Bevor wir mit der Implementierung beginnen, erörtern und definieren wir die geschäftlichen Anforderungen. Was benötigt unser Datenverbraucher von uns, wie stellt sich sein Use Case dar, was erwartet er als Datenmodell. Und wir müssen sicherstellen, dass wir von derselben Semantik und denselben Qualitätserwartungen ausgehen und dieselben Service-Level erwarten.

Wir nennen diesen Ansatz Contract-First. Wir beginnen mit der Gestaltung der Schnittstelle des gelieferten Datenmodells und seiner Metadaten als Data Contract. Wir nutzen den Data Contract, um die Implementierung voranzutreiben.

In unserem Beispiel möchte der Geschäftsführer eines E-Commerce-Unternehmens wissen, ob ein Problem mit Artikeln besteht, die über einen längeren Zeitraum nicht verkauft werden, d. h. Artikel ohne Umsatz in den letzten drei Monaten – sogenannte Ladenhüter.

Wir definieren in Zusammenarbeit mit dem Datenverbraucher einen Data Contract als YAML unter Verwendung der Data Contract Specification.

dataContractSpecification: 0.9.3

id: urn:datacontract:fulfillment:stock-last-sales

info:

  title: Last Sales

  version: 1.0.0

  description: |

    The data model contains all articles that are in stock.

    For every article the last sale timestamp is defined.

  owner: Fulfillment

  contact:

    name: John Doe (Data Product Owner)

    url: https://teams.microsoft.com/l/channel/19%3Ad7X0bzOrUrZ-QAGu0nTxlWACe5HOQ-8Joql71A_00000%40thread.tacv2/General?groupId=4d213734-d5a1-4130-8024-00000000&tenantId=b000e7de-5c4d-41a2-9e67-00000000

servers:

  development:

    type: databricks

    host: dbc-abcdefgh-1234.cloud.databricks.com

    catalog: acme

    schema: stock-last-sales

terms:

  usage: >

    Data can be used for reports, analytics and machine learning use cases.

    Order may be linked and joined by other tables

  limitations: >

    Not suitable for real-time use cases.

  billing: free

  noticePeriod: P3M

models:

  articles:

    description: One record per article that is currently in stock

    type: table

    fields:

      sku:

        description: The article number (stock keeping unit)

        type: string

        primary: true

        pattern: ^[A-Za-z0-9]{8,14}$

        minLength: 8

        maxLength: 14

        example: "96385074"

      quantity:

        description: The total amount of articles that are currently in stock in all warehouses.

        type: long

        minimum: 1

        required: true

      last_sale_timestamp:

        description: The business timestamp in UTC when there was the last sale for this article. Null means that the article was never sold.

        type: timestamp

      processing_timestamp:

        description: The technical timestamp in UTC when this row was updated

        type: timestamp

        required: true

servicelevels:

  availability:

    percentage: 99.9%

  retention:

    period: 1 year

  freshness:

    threshold: 25 hours

    timestampField: articles.processing_timestamp

  frequency:

    description: Data is updated once a day

    type: batch

    cron: 0 0 * * *

examples:

  - type: csv

    data: |

      sku,quantity,last_sale_timestamp,processing_timestamp

      1234567890123,5,2024-02-25T16:16:30.171798,2024-03-25T16:16:30.171807

      2345678901234,10,,2024-03-25T15:16:30.171811

      3456789012345,15,2024-03-02T12:16:30.171814,2024-03-25T14:16:30.171816

      4567890123456,20,,2024-03-25T13:16:30.171817

      5678901234567,25,2024-03-08T08:16:30.171819,2024-03-25T12:16:30.171821

      6789012345678,30,,2024-03-25T11:16:30.171823

      7890123456789,35,2024-03-14T04:16:30.171824,2024-03-25T10:16:30.171826

      8901234567890,40,,2024-03-25T09:16:30.171830

      9012345678901,45,2024-03-20T00:16:30.171833,2024-03-25T08:16:30.171835

      0123456789012,50,,2024-03-25T07:16:30.171837

quality:

  type: SodaCL

  specification:

    checks for articles:

      - row_count > 1000
datacontract.yaml

Der Datensatz wird alle derzeit im Bestand befindlichen Artikel beinhalten und umfasst last_sale_timestamp, das für den Geschäftsführer relevanteste Attribut. Der Geschäftsführer kann in seinem BI-Tool (wie etwa PowerBI, Redash etc.) nach Artikeln mit einem last_sale_timestamp älter als drei Monate filtern. Bedingungs- und Service-Level-Attribute weisen darauf hin, dass der Datensatz täglich um Mitternacht aktualisiert wird.

Erstellen des Databricks Asset Bundle

Nun ist es an der Zeit, ein Datenprodukt zu entwickeln, das diesen Data Contract implementiert. Databricks fügte vor Kurzem das Konzept der Databricks Asset Bundles hinzu, die sich perfekt eignen, um Datenprodukte zu strukturieren und zu entwickeln. Zum Zeitpunkt der Abfassung im März 2024 sind diese öffentlich als Vorschauversion verfügbar, d. h., sie sind bereit für die Nutzung in der Produktion.

Databricks Asset Bundles beinhalten alle Infrastruktur- und Codedateien, um tatsächlich Daten in Databricks umzuwandeln:

Die Databricks CLI bündelt diese Assets und setzt sie in der Databricks-Plattform ein, intern wird Terraform verwendet. Asset Bundles sind gut in die Databricks-Plattform integriert, d. h., es ist nicht möglich, Code oder Aufträge direkt in Databricks zu bearbeiten, was eine strenge Versionskontrolle der gesamten Code- und Pipeline-Konfiguration ermöglicht.

Bundles sind extrem nützlich, wenn mehrere Umgebungen bestehen, wie etwa Entwicklung, Staging und Produktion. Sie können dasselbe Bundle für mehrere Ziele mit verschiedenen Konfigurationen einsetzen.

Um ein Bundle zu erstellen, muss es in einem neuen Bundle initiiert werden:

databricks bundle init

Wir verwenden diese Konfiguration:

Werfen wir hinsichtlich der Bundle-Struktur einen kurzen Blick auf die relevantesten Dateien:

Anmerkung: Wir empfehlen, ein internes Bundle beizubehalten, das die Namenskonventionen, allgemeinen Vorschriften, Best Practices und Integrationen des Unternehmens enthält.

Mit Asset Bundles können wir unseren Code auch lokal in unserer bevorzugten IDE schreiben, wie etwa VS Code (unter Verwendung der Databricks-Erweiterung für Visual Studio Code), PyCharm oder IntelliJ IDEA (unter Verwendung von Databricks Connect).

Databricks Asset Bundle in IntelliJ IDEA
Databricks Asset Bundle in IntelliJ IDEA

Um eine lokale Python-Umgebung einzurichten, können wir venv verwenden und die Entwicklungsabhängigkeiten installieren:

python3.10 -m venv .venv
source .venv/bin/activate
pip install -r requirements-dev.txt

Generierung der Unity-Catalog-Tabelle

Wie organisieren wir die Daten für unser Datenprodukt? In diesem Beispiel verwenden wir Unity Catalog, um Speicher als verwaltete Tabellen zu handhaben. Auf Isolationsebene entscheiden wir, dass ein Datenprodukt ein Schema in Unity Catalog repräsentieren könnte.

Wir können den Data Contract YAML nutzen, um Infrastruktur-Code zu generieren:

Das Modell definiert die Tabellenstruktur des Zieldatenmodells. Mit dem Data Contract CLI tool, können wir den SQL-DDL-Code für die Anweisung CREATE TABLE generieren.

datacontract export --format sql datacontract.yaml

-- Data Contract: urn:datacontract:fulfillment:stock-last-sales
-- SQL Dialect: databricks
CREATE OR REPLACE TABLE acme.stock_last_sales.articles (
  sku STRING primary key,
  quantity BIGINT not null,
  last_sale_timestamp TIMESTAMP,
  processing_timestamp TIMESTAMP not null
);

Das Data Contract CLI tool ist ebenfalls als eine Python Library datacontract-cli verfügbar. Wir fügen es zu requirements-dev.txt hinzu und verwenden es direkt in einem Databricks-Notebook, um die Tabelle in Unity Catalog zu erstellen:

Nutzung der Data Contract CLI zur Erstellung der Unity-Catalog-Tabelle
Nutzung der Data Contract CLI zur Erstellung der Unity-Catalog-Tabelle

Die Unity-Catalog-Tabelle ist eine verwaltete Tabelle, die intern zur effizienten Speicherung das Delta-Format nutzt.

Die erstellte Tabelle in Unity Catalog

Entwickeln des Umwandlungscodes

Jetzt schreiben wir die Hauptumwandlungslogik. Mit Python-basierten Databricks Asset Bundles können wir unsere Daten-Pipelines entwickeln als:

In diesem Datenprodukt werden wir einfache Python-Dateien für unsere Hauptumwandlungslogik schreiben, die als Wheel-Pakete eingesetzt werden.

Unsere Umwandlung nimmt alle verfügbaren Bestände, die wir von einem Eingabe-Port wie etwa dem Betriebssystem erhalten, das die aktuellen Bestandsdaten verwaltet, und fügt den Datenframe mit dem Zeitstempel des letzten Verkaufs für jede SKU per Left-Join hinzu. Die Umsatzinformationen sind ebenfalls ein Eingabe-Port, z. B. ein weiteres vorgelagertes Datenprodukt, das von dem Kassenteam bereitgestellt wird. Wir speichern den resultierenden Datenframe in der zuvor generierten Tabellenstruktur.

Umwandlungscode als reiner Python-Code
Umwandlungscode als reiner Python-Code

Dank dieser Option bleibt der Code wiederverwendbar, ist leicht mit Unit-Tests zu testen und wir können ihn auf unseren lokalen Computern ausführen. Als professionelle Datentechniker stellen wir sicher, dass die Funktion calculate_last_sales wie erwartet ausgeführt wird, indem sie gute Unit-Tests schreibt.

Unit-Tests
Unit-Tests

Wir aktualisieren die Auftragskonfiguration, um den Python-Code als python_wheel_task auszuführen, und konfigurieren das Steuerprogramm und den entsprechenden Rechencluster.

# The main job for stock_last_sales. resources:

  jobs:

    stock_last_sales_job:

      name: stock_last_sales_job


      schedule:

        # Run every day at midnight

        quartz_cron_expression: '0 0 0 * * ?'

        timezone_id: Europe/Amsterdam


      tasks:

        - task_key: create_unity_catalog_table

          job_cluster_key: job_cluster

          notebook_task:

            notebook_path: ../src/create_unity_catalog_table.ipynb

          libraries:

            - pypi:

                package: datacontract-cli


        - task_key: main_task

          depends_on:

            - task_key: create_unity_catalog_table

          job_cluster_key: job_cluster

          python_wheel_task:

            package_name: stock_last_sales

            entry_point: main

          libraries:

            - whl: ../dist/*.whl


      job_clusters:

        - job_cluster_key: job_cluster

          new_cluster:

            spark_version: 13.3.x-scala2.12

            node_type_id: i3.xlarge

            autoscale:

                min_workers: 1

                max_workers: 4
_resources/stock_last_sales_job.yml

Wenn wir uns sicher sind, können wir das Bundle in unseren Databricks-dev-Instanzen (derzeit manuell) einsetzen:

databricks bundle deploy

Wir lösen eine manuelle Ausführung unseres Workflows aus:

databricks bundle run stock_last_sales_job

In Databricks können wir sehen, dass der Workflow erfolgreich ausgeführt wurde:

Ein Job in Databricks
Ein Job in Databricks

Die Daten sind jetzt in der zuvor erstellten Tabelle enthalten.

Das Ergebnis unserer Daten-Pipeline
Das Ergebnis unserer Daten-Pipeline

Testen des Data Contract

Unsere Aufgabe ist noch nicht ganz beendet. Woher wissen wir, dass die Daten korrekt sind? Während die Unit-Tests uns Sicherheit hinsichtlich des Umwandlungscodes geben, benötigen wir auch eine Abnahmeprüfung, um zu überprüfen, ob wir den vereinbarten Data Contract korrekt implementiert haben.

Für diese Prüfung können wir das Data Contract CLI tool verwenden:

export DATACONTRACT_DATABRICKS_HTTP_PATH=/sql/1.0/warehouses/b053xxxxxxx
export DATACONTRACT_DATABRICKS_TOKEN=dapia1926f7c64b7595880909xxxxxxxxxx
datacontract test datacontract.yaml

Das datacontract Tool verwendet alle Schema- und Formatinformationen aus _model_, den Qualitätsattributen und den Metadaten und vergleicht diese mit dem tatsächlichen Datensatz. Es liest die Verbindungsdetails aus dem Serverabschnitt und verbindet sich mit Databricks, führt alle Prüfungen aus und bietet eine umfassende Übersicht.

Testergebnisse Data Contract
Testergebnisse Data Contract

Wir möchten diesen Test mit jeder Pipelineausführung durchführen und erstellen daher erneut eine Notebook-Aufgabe für den Test:

Data Contract Test als Notebook
Data Contract Test als Notebook

Einsatz mit CI/CD

Für einen automatischen Test wird das Asset Bundle auf Databricks bereitgestellt, und nach der einmaligen Ausführung des Auftrags richten wir eine CI/CD-Pipeline in GitHub unter Verwendung einer GitHub Aktion ein.

name: "Deploy Databricks Assets Bundle"

on:

  workflow_dispatch:

  push:

    branches: [ "main" ]



jobs:

  test:

    name: "Run unit tests"

    runs-on: ubuntu-latest

    steps:

      - uses: actions/checkout@v4

      - name: Set up Python ${{matrix.python-version}}

        uses: actions/setup-python@v5

        with:

          python-version: "3.10"

      - name: Install dependencies

        run: |

          python -m pip install --upgrade pip

          pip install -r requirements-dev.txt

      - name: Test with pytest

        run: pytest

        env:

          DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}

          DATABRICKS_TOKEN: ${{ secrets.SP_TOKEN }}

          DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}


  deploy:

    name: "Deploy bundle to DEV"

    runs-on: ubuntu-latest


    needs:

      - test


    steps:

      - uses: actions/checkout@v4

      - uses: databricks/setup-cli@main

      - run: databricks bundle deploy

        working-directory: .

        env:

          DATABRICKS_TOKEN: ${{ secrets.SP_TOKEN }}

          DATABRICKS_BUNDLE_ENV: dev

  run_pipieline:

    name: "Run pipeline"

    runs-on: ubuntu-latest


    needs:

      - deploy


    steps:

      - uses: actions/checkout@v4

      - uses: databricks/setup-cli@main

      - run: databricks bundle run stock_last_sales_job --refresh-all

        working-directory: .

        env:

          DATABRICKS_TOKEN: ${{ secrets.SP_TOKEN }}

          DATABRICKS_BUNDLE_ENV: dev
.github/workflows/deploy.yml

Jedes Mal, wenn wir den Code aktualisieren, wird das Asset Bundle automatisch auf Databricks bereitgestellt.

CI/CD-Workflow in GitHub
CI/CD-Workflow in GitHub

Metadaten veröffentlichen

Damit andere die Datenprodukte finden, verstehen und ihnen vertrauen können, wollen wir sie in einem Datenproduktverzeichnis registrieren.

In diesem Beispiel verwenden wir den Data Mesh Manager – eine Plattform für das Registrieren, Verwalten und Entdecken von Datenprodukten, Data Contracts und globalen Vorschriften.

Metadaten in Data Mesh Manager
Metadaten in Data Mesh Manager

Wir erstellen erneut eine Notebook-Aufgabe (oder Python-Codeaufgabe), um die Metadaten im Data Mesh Manager zu veröffentlichen, und fügen dann die Aufgabe zu unserem Workflow hinzu. Wir können Databricks Secrets verwenden, um den API-Schlüssel in Databricks bereitzustellen.

databricks secrets create-scope datamesh_manager
databricks secrets put-secret datamesh_manager api_key
Metadaten veröffentlichen
Metadaten veröffentlichen

Zusammenfassung

Jetzt kann sich der Geschäftsführer über ein BI-Tool (wie etwa PowerBI, Tableau, Redash oder in Databricks) mit dieser Tabelle verbinden und seine Frage bezüglich des Artikels beantworten.

Analysen in Databricks
Analysen in Databricks

Databricks Asset Bundles sind perfekt geeignet, um professionelle Datenprodukte auf Databricks zu entwickeln, da es alle Ressourcen und Konfigurationen (Code, Tests, Speicher, Berechnen, Steuerungsprogramm, Metadaten etc.) bündelt, die benötigt werden, um den Datenverbrauchern Datensätze in hoher Qualität zu liefern.

Es ist einfach, Data Contracts zur Definition der Anforderungen und die Data Contract CLI zur Automatisierung von Abnahmeprüfungen zu integrieren.

Der Quellcode für das Beispielprojekt ist auf GitHub verfügbar.

Mehr erfahren