Unit Testing Azure Data Factory Pipelines in CI/CD with Python

In a previous DataTalks,Wouter Pardon talked about how to implement CI/CD for Azure Data Factory and SQL Server, focusing on making deployments easier and improving the quality of data pipelines. In this post, he wants to dive deeper into unit testing for Azure Data Factory (ADF). It’ll explain how Azure DevOps can help you run your tests automatically and create test reports that give useful insights into your pipeline

Why Unit Testing is Critical in ADF Pipelines

ADF pipelines are complex workflows that involve multiple interconnected activities, dependencies, and runtime parameters. Without thorough testing, small issues can ripple through the pipeline, leading to costly failures. Here’s why unit tests matter:

  • Catch Errors Early:
    Testing individual pipeline activities in isolation ensures issues are identified before deployment.
  • Boost Developer Confidence:
    Unit tests validate pipeline configurations, making it safer to merge changes into shared branches.
  • Automate Validation:
    As part of CI pipelines, unit tests act as safeguards, ensuring that only properly validated pipelines are deployed.

How Unit Tests Fit into CI/CD

In a typical CI/CD process, unit testing plays a critical role during the Continuous Integration stage:

  • Feature Branch:
    Developers implement new features.

  • Pull Requests:
    Before merging into the Development (Dev) branch, CI pipelines run all tests to identify and address issues early.

  • Development Branch:
    Once features are integrated, they’re validated together before moving to the Main branch.

  • Main Branch:
    Tested and validated pipelines are deployed to production.

Unit Testing ADF: A Real-Life Example

Let’s walk through a real-world scenario: copying customer order files from Azure Blob Storage to SQL Server. We’ll use the Python data_factory_testing_framework library to validate the pipeline configuration.

Example - Pipeline Overview

In this example, we have two pipelines set up. The first pipeline’s job is to list all the order files in the order folder of a landing Azure Blob Storage container. This process ensures that all files are identified and ready for processing. Once that’s completed, for each file, the pipeline from figure 2 kicks in to perform the next set of tasks. This setup helps streamline the workflow by breaking down the process into manageable steps and ensures that each file is handled individual.

Fig.1 - Start pipeline

In figure 2, we have a copy activity that moves the landing file into a bronze table. Additionally, the activity takes care of the table’s structure: if the table doesn’t exist in the database, it’s created. If it already exists, the table is truncated to clear old data and prepare it for the new batch.

Fig.2 - Copy pipeline

Example - Test Suite

Now, in this topic, we’ll break down the steps involved in implementing the unit tests for this example. We’ll go through each phase of the testing process, from setting up the testing environment to running the tests and interpreting the results. Understanding these steps will help you validate the data flow and ensure the robustness of the pipelines effectively.

1. Setting Up the Test Framework

This code snippet sets up a basic unit test using pytest for testing Azure Data Factory (ADF) pipelines with the data_factory_testing_framework.

The pytest.fixture decorator creates a fixture named test_framework, which initializes a TestFramework object. This object is configured with the following parameters:

  • framework_type: Specifies that the framework is for Data Factory.
  • root_folder_path: Defines the path to the root folder of the ADF code, which is where the pipeline artifacts are stored.

This setup allows for easy integration of tests within the ADF environment, helping you validate the functionality of pipelines by checking for expected behaviors and outputs.

				
					import pytest

from data_factory_testing_framework import TestFramework, TestFrameworkType


@pytest.fixture

def test_framework() -> TestFramework:

    return TestFramework(

        framework_type=TestFrameworkType.DataFactory,

        root_folder_path='PATH',

    )
				
			
2. Testing the File Listing Activity

After setting up our basic test framework, we can now start with testing the first activity in Figure 2, which is the GetAllOrderFiles activity. The test_list_files function is a unit test for the GetAllOrderFiles activity in an Azure Data Factory pipeline. It uses the TestFramework to get the pipeline and sets up a state with necessary parameters, such as the database name.

The purpose of this test is to validate the configuration of the GetAllOrderFiles activity. It checks several key properties:

  • Dataset Configuration: It asserts that the dataset associated with the activity refers to the correct Azure Blob Storage folder (‘LandingFolder’), is of type DatasetReference, and points to the correct folder (‘orders’).
  • Store Settings: It confirms that the store settings for reading data from Azure Blob Storage are correctly configured with the type set to AzureBlobStorageReadSettings.

By asserting these properties, this test ensures that the GetAllOrderFiles activity is correctly set up to list all order files in the specified Azure Blob Storage folder, which is crucial for the correct execution of the pipeline.

				
					def test_list_files(test_framework: TestFramework):

    pipeline = test_framework.get_pipeline_by_name("ExecutePipelineActivity")
    state = PipelineRunState(
        parameters=[
            RunParameter(RunParameterType.Global, "DatabaseName", "dev"),
        ]
    )

    list = pipeline.get_activity_by_name('GetAllOrderFiles')
    list.evaluate(state)

    #Asserting that we list all order files in the orders folder on our azure blob storage
    assert list.type_properties['dataset']['referenceName'] == 'LandingFolder'
    assert list.type_properties['dataset']['type'] == 'DatasetReference'
    assert list.type_properties['dataset']['parameters']['folder'] == 'orders'
    assert list.type_properties['storeSettings']['type'] == 'AzureBlobStorageReadSettings'
				
			
3. Testing the For Each Activity

In this function, test_for_each_order_file, we validate the configuration and behavior of the ForEachFile activity within an Azure Data Factory (ADF) pipeline. This activity iterates over each file returned by the GetAllOrderFiles activity, executing a sub-pipeline for each file. The test asserts:

  • Dependencies: It checks that ForEachFile depends on the GetAllOrderFiles activity.
  • Items in the Loop: It validates that the list of files returned by GetAllOrderFiles is correctly assigned to the ForEachFile’s result property.
  • Nested Activities: The test verifies that ForEachFile contains a nested ExecutePipeline activity (ExecuteCopyTest) for processing each file individually.
  • Pipeline Reference: It ensures that the nested pipeline (CopyTest) is correctly referenced, and the parameters include the folder parameter set to ‘orders’.
				
					def test_for_each_order_file(test_framework: TestFramework):
        pipeline = test_framework.get_pipeline_by_name("ExecutePipelineActivity")
        state = PipelineRunState(
            parameters=[
                RunParameter(RunParameterType.Global, "DatabaseName", "dev"),
            ]
        )

        state.add_activity_result(
            activity_name="GetAllOrderFiles",
            status="Succeede",
            output={
                        "childItems": [
                            {
                                "name": "customer_orders.csv",
                                "type": "File"
                            },
                            {
                                "name": "customer_orders3.csv",
                                "type": "File"
                            }
                        ],
                        "effectiveIntegrationRuntime": "AutoResolveIntegrationRuntime (West Europe)",
                        "executionDuration": 9,
                        "durationInQueue": {
                            "integrationRuntimeQueue": 0
                        },
                        "billingReference": {
                            "activityType": "PipelineActivity",
                            "billableDuration": [
                                {
                                    "meterType": "AzureIR",
                                    "duration": 0.016666666666666666,
                                    "unit": "Hours"
                                }
                            ]
                        }
                }
            
        )

        for_each_activity = pipeline.get_activity_by_name('ForEachFile')
        for_each_activity.evaluate(state)




        assert len(for_each_activity.depends_on) == 1
        assert for_each_activity.depends_on.pop().activity == 'GetAllOrderFiles'
        assert for_each_activity.type_properties['items'].result == [ {
                                "name": "customer_orders.csv",
                                "type": "File"
                            },
                            {
                                "name": "customer_orders3.csv",
                                "type": "File"
                            }] 
        assert len(for_each_activity.type_properties["activities"]) == 1
        assert for_each_activity.type_properties["activities"][0]["name"] == "ExecuteCopyTest"
        assert for_each_activity.type_properties["activities"][0]["type"] == "ExecutePipeline"
        assert for_each_activity.type_properties["activities"][0]["typeProperties"]['pipeline']['referenceName'] == "CopyTest"
        assert for_each_activity.type_properties["activities"][0]["typeProperties"]['parameters']['folder'] == 'orders'

				
			
4. Testing the Copy Activity

For our last test, we test the Copy activity of the CopyTest pipeline. At the start of this function, test_copy_customer__orders_csv_from_blob_to_sql, we indicate that this is our last test in the series. This test validates the configuration and behavior of the CopyTest pipeline, specifically focusing on copying files from an Azure Blob Storage container to a SQL Server database. It ensures that the data transfer from Azure Blob Storage to SQL Server is correctly configured. The test asserts:

  • Source Configuration:
    • The test asserts that the source is a DelimitedTextSource and uses `AzureBlobStorageReadSettings` with the recursive property set to `True`. This configuration ensures that all files within the folder are read.
    • It also checks that the file format settings are correctly configured for reading delimited text files.
  • Sink Configuration:
    • The test ensures the sink is a `SqlServerSink` with a pre-copy script to truncate the destination table (`bronze.{table_name}`). This prevents duplicate entries in the SQL table.
    • It asserts that the `writeBehavior` is set to `insert`, `sqlWriterUseTableLock` is `False`, and the `tableOption` is set to `autoCreate`.
  • Translator Configuration:
    • The test verifies that the translator is of type `TabularTranslator` with the `treatBooleanAsNumber` setting set to `False`. This ensures correct data type conversion when copying data to SQL Server.
				
					@pytest.mark.parametrize(
    'file_name', ['customer_orders.csv', 'customer_orders3.csv']
)
def test_copy_customer__orders_csv_from_blob_to_sql(test_framework: TestFramework, file_name:str):
    ''''
        Unit test for a csv copy from azure blob storage to a sql server database
    '''
    
    pipeline = test_framework.get_pipeline_by_name("CopyTest")
    copy_blob_activity = pipeline.get_activity_by_name('copy_from_landing_to_bronze')
    state = PipelineRunState(
        parameters=[
            RunParameter(RunParameterType.Pipeline, "folder", "orders"),
            RunParameter(RunParameterType.Pipeline, "fileName", file_name),
            RunParameter(RunParameterType.Global, "DatabaseName", "dev"),
        ]
    )

    copy_blob_activity.evaluate(state)

 
    source = copy_blob_activity.type_properties['source']
    sink = copy_blob_activity.type_properties['sink']
    translator = copy_blob_activity.type_properties['translator']
 
 

    table_name = file_name.replace('.csv', '')

    #We assert that the source action is azure read blob storage for delimited text files
    assert source['type'] == 'DelimitedTextSource'
    assert source['storeSettings']['type'] == 'AzureBlobStorageReadSettings'
    assert source['storeSettings']['recursive'] == True
    assert source['formatSettings']['type'] == 'DelimitedTextReadSettings'

    #We asssert that the sink is a sql server sink and has a truncate script executed before the copy starts 
    assert sink['type'] == 'SqlServerSink'
    assert sink['preCopyScript'].result == f'truncate table bronze.{table_name}'
    assert sink['writeBehavior'] == 'insert'
    assert sink['sqlWriterUseTableLock'] == False
    assert sink['tableOption'] == 'autoCreate'
    
    
    assert translator['type'] == 'TabularTranslator'
    assert translator['typeConversionSettings']['treatBooleanAsNumber'] == False
				
			
Putting all together in CD/CD

When all of our tests are created, the next step is to add them to the ADF repository. However, to activate these tests, we need to adapt our CI pipeline as follows:

				
					- task: UsePythonVersion@0
    inputs:
        versionSpec: "3.9"
        addToPath: true

- script: |
        python -m venv .venv
        source .venv/bin/activate
        python -m pip install --upgrade pip
        pip install data_factory_testing_framework
        pip install pytest-azurepipelines
        python -m pytest -vv $(Build.Repository.LocalPath)/build/tests.py
    displayName: "pytest"
				
			

After creating our pull request, we can observe that the tests are automatically executed as part of the CI pipeline. The results are compiled into a test report, providing a clear overview of the test outcomes. This report, as shown in figure 3, highlights which tests passed and which may have failed, helping us identify issues early in the development process.

Key takeaways

Unit testing Azure Data Factory (ADF) pipelines is a crucial step in building reliable and efficient data workflows. By integrating tests into a CI/CD framework, you can identify issues early, validate configurations, and ensure that only properly tested pipelines are deployed. Automated tests not only improve developer confidence but also streamline the deployment process by catching errors before they reach production.

Using tools like pytest and data_factory_testing_framework, we demonstrated how to test key ADF activities such as file listing, iteration, and data copying. These tests validate configurations, ensure data integrity, and guarantee that pipelines function as intended. Embedding these tests into CI pipelines with Azure DevOps provides automated reporting, offering clear insights into pipeline stability and highlighting any areas needing improvement.

By making unit testing an integral part of your CI/CD process, you can save time, enhance workflow resilience, and deliver high-quality data pipelines with confidence.

// Related articles

Read our other posts

// CONTACT US

Want to know more?