View on GitHub

bifabrik

Microsoft Fabric ETL toolbox

Lakehouse table destination

If you just want to do a full load into a lakehouse table, go ahead:

import bifabrik as bif

bif.fromCsv('Files/CsvFiles/annual-enterprise-survey-2021.csv') \
  .toTable('Survey2021').run()

# alternatively, you can use
# .toLakehouseTable('Survey2021').run()

This writes a parquet (delta) file into the Tables/ directory in the lakehouse. Fabric then makes a table out of it.

If you want to save data to a different lakehouse / workspace, check out cross-lakehouse pipelines

Incremental load

The increment setting of the destination task has these options

overwrite

This is the default. It just overwrites any data in the table, including the table schema.

The example seen above is equivalent to

bif.fromCsv('Files/CsvFiles/annual-enterprise-survey-2021.csv') \
  .toTable('Survey2021') \
  .increment('overwrite') \
  .run()

append

Appends the incoming data to the table

bif.fromCsv('CsvFiles/orders_240301.csv') \
  .toTable('FactOrder') \
  .increment('append') \
  .run()

You may also want to use watermark to filter the data before appending.

bif.fromSql('''

SELECT orderno, inset_timestamp, value
FROM LH_Stage.vNewOrders

''').toTable('FactOrder') \
  .increment('append').watermarkColumn('inset_timestamp') \
  .run()

The watermarkColumn configuration is a general feature of the table destination, not just for the append increment

When watermarkColumn is specified, bifabrik checks the maximum value of that column in the destination table and then filters the incoming dataset to only load the rows where {watermarkColumn} > {MAX(watermarkColumn) from target table}.

This is done before applying the increment logic.

merge

This is basically the slowly changing dimenstion type 1 option. To get this working, you need to configure the mergeKeyColumns array (the business key based on which to merge).

bif.fromSql('''

SELECT Variable_Code
,AVG(`Value`) AS AvgValue
FROM LakeHouse1.Survey2021
GROUP BY Variable_Code

''').toTable('SCDTest1') \
  .increment('merge') \
  .mergeKeyColumns(['Variable_Code']) \
  .run()

Internally, Spark SQL MERGE statement is used to insrt new rows and update rows that get matched using the merge key. The key can have multiple columns specified in the array.

For larger tables, the SparkSQL MERGE implementation can run into memory issues. Therefore, if both the destination table and source dataset cross a certain threshold, bifabrik wiil, by default, use a step-by-step method of merging. First, it will identify the records that will be affected by the merge. These are then copied to a temporary table and the merge is performed there so that the whole table doesn’t need to be part of the merge operation. Finally, the data is copied back to the original destination table. This approach has larger overhead, but it can handle large tables.

To check or change the settings, use the following properties

bif.config.destinationTable.largeTableMethodEnabled # default True
bif.config.destinationTable.largeTableMethodSourceThresholdGB # default 0.2
bif.config.destinationTable.largeTableMethodDestinationThresholdGB # default 20

SCD 2

Slowly chaning dimension type 2 tracks the history of changes in a table, creating a new row each time an attribute changes. Here we can see an example of changes in name. The RowStart, RowEnd, CurrentRow, as well as branch_history_SID (identity column can be configured through identityColumnPattern) are generated by bifabrik.

image

A basic configuration can look like this

import bifabrik as bif

(
bif
    .fromSql('''SELECT id, name, place, street, city, zip FROM Bronze.source_branch''')
    .toTable('branch_history')
    .increment('scd2')
    .mergeKeyColumns(['id'])
    .run()
)

You need to specify the mergeKeyColumns similarly to the merge increment.

There are a few other configuration options you could use:

A more complex setup could look like this:

import bifabrik as bif

(
bif
    .fromSql('''SELECT id, name, place, street, city, zip FROM Bronze.source_branch''')
    .toTable('branch_history2')
    .increment('scd2')
    .mergeKeyColumns(['id'])
    .identityColumnPattern('{tablename}_SID')
    .addNARecord(True)
    .scd2ExcludeColumns(['Audit_ID'])
    .scd2SoftDelete(True)
    .run()
)

The RowEnd timestamp is set to 9999-12-31 00:00 for active rows. When a new version of the record is loaded, The RowEnd will be set to the same timestamp as the RowStart of the new row. Thus, when writing a lookup JOIN to find the record active at time X, it could look like this:

RowStart <= X AND RowEnd > X

snapshot

For this option, you need to configure the snapshotKeyColumns array to the column or columnss that specify the snapshot.

bif.fromCsv('CsvFiles/fact_append_pt1.csv').toTable('snapshot1') \
.increment('snapshot').snapshotKeyColumns(['Date', 'Code']) \
.run()

Here, the snapshot is “replaced”. That is, all the rows from the target table that match one of the snapshot keys in the source table get deleted, before the new rows are inserted.

This solves the deleted rows issue - if some rows get deleted in the source, you can remove them in the lakehouse by reloading the corresponding snapshot. The merge increment, by comparison, does not remove rows that were deleted in the source.

Identity column

You can configure the identityColumnPattern to add an auto-increment column to the table. The name of the column can contain the name of the table / lakehouse.

import bifabrik as bif

bif \
  .fromCsv('CsvFiles/orders_pt2.csv') \
  .delimiter(';') \
  .toTable('DimOrder') \
  .increment('merge') \
  .mergeKeyColumns(['Code']) \
  .identityColumnPattern('{tablename}ID') \
  .run()

The values are 1, 2, 3…, each new record gets a new number.

If configured, the column will be placed at the beginning of the table.

Supported placeholders:
{tablename}     : the delta table name
{lakehousename} : the lakehouse name

Adding the N/A (Unknown) record

When loading dimensions for a dimensional model, it’s common practice to add an N/A (or “Unknown”) record into your dimensions so that you can link facts to that one if your lookups fail.

To accommodate this, bifabrik has the addNARecord option (ture / false, false by default). If enabled, it adds a record to the table that has -1 in the identity column, 0 in other numeric columns and “N/A” in string columns.

If the table already has a “-1” record, this will not add another one. Also, this option is only available when you have the identityColumnPattern configured.

bif.fromCsv('Files/CsvFiles/annual-enterprise-survey-*.csv') \
.toTable('SurveyData') \
.identityColumnPattern('{tablename}ID') \
.addNARecord(True) \
.run()

The result can look like this

image

Insert timestamp

You can append a column with the current timestamp to newly inserted columns - just set the insertDateColumn on the table destination.

If configured, this will be added at the end of the table.

bif.fromCsv('CsvFiles/dimBranch.csv') \
    .toTable('DimBranch') \
    .insertDateColumn('InsertTimestamp').run()

Fixing invalid column names

The characters (space),;{}()\n\t= are invalid in delta tables. By default, bifabrik replace these with and underscore (_) so that these invalid characters in column names don’t stop you from loading your data.

You may want to just remove the invalid characters or replace them with something else. In that case, use the invalidCharactersInColumnNamesReplacement setting.

bif.fromCsv('CsvFiles/dim_branch.csv') \
    .toTable('DimBranch') \
    .invalidCharactersInColumnNamesReplacement('').run()

Adding new columns

Sometimes, we need to add new columns to an existing table. If the table is a full-load (overwrite), this is no problem - the table gets overwritten including the schema.

If there is a different increment menthod and the target table already exists, bifabrik will compare the structure of the target table against the incoming dataset. After this

This column adding feature is enabled by default. If you want, you can disable it like this:

import bifabrik as bif

# disable adding columns for the whole session
bif.config.destinationTable.canAddNewColumns = False

# disable adding columns for a specific pipeline
bif.fromSql('''
  SELECT ShipmentId
  ,CountryCode
  ,FullName
  ,SalesType
  ,SalesTypeShortcut
  ,WarehouseId, 'ABC' AS NewColumn1 
  FROM DimBranchZ LIMIT 3
''').toTable('SchemaMerge1').increment('append') \
    .canAddNewColumns(False).run()

Learn more about configuration

Also, some of these table utilities can come in handy.

Everything all at once

An example combining different aspects of the configuration

import bifabrik as bif

(
bif
  .fromCsv('CsvFiles/fact_append_pt2.csv')
  .toTable('snapshotTable1')
  .increment('snapshot')
  .snapshotKeyColumns(['Date', 'Code'])
  .identityColumnPattern('{tablename}ID')
  .insertDateColumn('RowStartDate')
  .run()
)

You can also save your configuration preferences to a JSON file and then apply it to all tables loaded in one session - read more about configuration

Back