View on GitHub

bifabrik

Microsoft Fabric ETL toolbox

Warehouse table destination

When loading data to a Fabric Warehouse, bifabrik can assist by

Connecting to a Fabric warehouse

While Fabric notebooks can easily connect to lakehouses, things are not so straightforward when connecting to a Fabric warehouse (a.k.a Synapse Data Warehouse). We cannot attach a warehouse to a notebook and we have to use TSQL to write any data.

bifabrik uses pyodbc with service principal authentication to connect to warehouses. For this to work, we need to set a few things up.

Allow service principal access

In the Power BI admin portal, under Developer settings, enable “Service principals can access Fabric APIs

image

Register an app and give it permissions

Next, head over to Azure and create an app registration. Also create a secret for its authentication.

Save the app’s client secret value in Azure Key Vault. (bifabrik doesn’t support direct input for credentials, just to be on the safe side.)

Make sure that this app has read and write permissions to the Fabric warehouse - either through giving the app the workspace contributor (or higher) role, or directly by setting permissions to the warehouse.

Note that you probably cannot give a service principal access to your personal workspace (the one called “My workspace” in the UI), so place your warehouse in some other workspace.

Configure destination warehouse in bifabrik

Your configuration can look like this

import bifabrik as bif

# client ID of the app registration
bif.config.security.servicePrincipalClientId = '56712345-1234-7890-abcd-abcd12344d14'

# key vault where you saved the app's client secret
bif.config.security.keyVaultUrl = 'https://kv-contoso.vault.azure.net/'

# name of the key vault secret that contains the app's client secret
bif.config.security.servicePrincipalClientSecretKVSecretName = 'contoso-clientSecret'

# name of the destination warehouse (you don't say!)
bif.config.destinationStorage.destinationWarehouseName = 'DW1'

# SQL connection string of the warehouse - can be found in the settings of the warehouse
bif.config.destinationStorage.destinationWarehouseConnectionString = 'dxtxxxxxxbue.datawarehouse.fabric.microsoft.com'

You can save this to a file for later use - learn more about configuration

Set the default lakehouse for the notebook

Why do we need a lakehouse here? To get better performance when loading data into the warehouse. The source data is first written to a temporary delta table in the lakehouse before being inserted to the warehouse using TSQL (see Ingest data into your Warehouse using Transact-SQL).

So, if you haven’t already, set a default lakehouse for your notebook, where the temporary tables will be stored.

default_lakehouse

From lakehouse to warehouse

Let’s say we have data in a lakehouse and want to aggregate it and save into into a warehouse. Here is one way to do that, using Spark SQL for the aggregation.

import bifabrik as bif

bif.config.security.servicePrincipalClientId = '56712345-1234-7890-abcd-abcd12344d14'
bif.config.security.keyVaultUrl = 'https://kv-contoso.vault.azure.net/'
bif.config.security.servicePrincipalClientSecretKVSecretName = 'contoso-clientSecret'
bif.config.destinationStorage.destinationWarehouseName = 'WH_GOLD'
bif.config.destinationStorage.destinationWarehouseConnectionString = 'dxtxxxxxxbue.datawarehouse.fabric.microsoft.com'

bif.fromSql('''
SELECT countryOrRegion `CountryOrRegion`
,YEAR(date) `Year` 
,COUNT(*) `PublicHolidayCount`
FROM LH_SILVER.publicholidays
GROUP BY countryOrRegion
,YEAR(date)
''').toWarehouseTable('HolidayCountsYearly').run()

With this, a WH_GOLD.dbo.HolidayCountsYearly table will be created (if it doesn’t exist yet) and data filled with the results of the Spark SQL query. By default, this is a full load, overwriting any previous data in the table.

We can also change the destination schema name and add an identity column:

bif.fromSql('''
SELECT countryOrRegion `CountryOrRegion`
,COUNT(*) `PublicHolidayCount`
FROM LH_SILVER.publicholidays
WHERE YEAR(date) = 2024
GROUP BY countryOrRegion
''').toWarehouseTable(targetTableName = 'HolidayCounts2024', targetSchemaName = 'pbi') \
.identityColumnPattern('{tablename}Id') \
.run()

The schema will also be created automatically. The result can look like this: image

To show other configuration options, use help() on the table destination:

tableDestination = bif.fromSql('''
SELECT * FROM LH_SILVER.publicholidays
''') \
.toWarehouseTable(targetTableName = 'HolidayCounts2024', targetSchemaName = 'pbi')

help(tableDestination)

When creating the warehouse table, bifabrik maps the Spark dataframe types to SQL types similarly to a SQL analytics endpoint over a lakehouse, although in a simplified fashion. For example, strings are stored as VARCHAR(8000) and dates as DATETIME2(6).

Incremental load

Incremental load has the same options as in lakehouse table destination, where it is described in details. It bears repeating here briefly, though.

append

Also using watermark to filter the incoming data

bif.fromCsv('Files/CsvFiles/fact_append_*.csv').toWarehouseTable('TransactionsTable') \
    .increment('append').watermarkColumn('Date').run()

merge

bif.fromCsv('Files/CsvFiles/scd_source_*.csv').toWarehouseTable('Dimension1') \
    .increment('merge').mergeKeyColumns(['Code']).identityColumnPattern('{tablename}ID').run()

snapshot

bif.fromCsv('CsvFiles/fact_append_*.csv').toWarehouseTable('snapshot1') \
    .increment('snapshot').snapshotKeyColumns(['Date', 'Code']) \
    .run()

Identity column

The identity column can be configured the same way as for the lakehouse table destination.

Adding the N/A (Unknown) record

When loading dimensions for a dimensional model, it’s common practice to add an N/A (or “Unknown”) record into your dimensions so that you can link facts to that one if your lookups fail.

To accommodate this, bifabrik has the addNARecord option (ture / false, false by default). If enabled, it adds a record to the table that has -1 in the identity column, 0 in other numeric columns and “N/A” in string columns.

If the table already has a “-1” record, this will not add another one. Also, this option is only available when you have the identityColumnPattern configured.

bif.fromSql('''
 SELECT Variable_code, COUNT(*) ResponseCount
 FROM SurveyData
 GROUP BY Variable_code
''').toWarehouseTable('SurveyDataAgg') \
.identityColumnPattern('{tablename}ID') \
.addNARecord(True) \
.increment('merge') \
.mergeKeyColumns(['Variable_code']) \
.run()

Adding new columns

Sometimes, we need to add new columns to an existing table. If the table is a full-load (overwrite), this is no problem - when there is a change in the target schema, the table gets recreated with the new schema.

If there is a different increment menthod and the target table already exists, bifabrik will compare the structure of the target table against the incoming dataset. After this

This column adding feature is enabled by default. If you want, you can disable it like this:

import bifabrik as bif

# configure warehouse connection...

# disable adding columns for the whole session
bif.config.destinationTable.canAddNewColumns = False

# disable adding columns for a specific pipeline
bif.fromSql('''
SELECT countryOrRegion `CountryOrRegion`
,COUNT(*) `PublicHolidayCount`
FROM LH_SILVER.publicholidays
WHERE YEAR(date) = 2024
GROUP BY countryOrRegion
''').toWarehouseTable(targetTableName = 'HolidayCounts20242') \
.canAddNewColumns(False) \
.run()

Insert timestamp

You can append a column with the current timestamp to newly inserted columns - just set the insertDateColumn on the table destination.

If configured, this will be added at the end of the table.

bif.fromCsv('Files/CsvFiles/annual-enterprise-survey-2021.csv').toWarehouseTable('AnnualSurvey') \
    .insertDateColumn('InsertTimestamp').identityColumnPattern('{tablename}ID').run()

Warehouse to warehouse transformations

Besides this destination, there is also a Fabric warehouse T-SQL data source. You can use this to run warehouse to warehouse transformations from notebooks.

Back