What is Dataform

Dataform, in a nutshell, allows you to orchestrate SQL workflows. This helps transform raw data into reliable datasets for analytics. It also provides a collaborative platform for managing SQL workflows, testing data quality, and documenting datasets.

Google bought Dataform in 2020 to integrate it with BigQuery, and since then they have been working to develop it entirely for GCP. Their latest big release 3.0 announced that Dataform will no longer support other data warehouses or databases apart from BigQuery, becoming an integrated part of the GCP platform. You can access it directly from the BigQuery service.

Its integration with BigQuery makes Dataform compelling over other options in the market for companies that already have their data stored in BigQuery.

How should I orchestrate Dataform

If you are using dbt, you either need an orchestrator, such as Airflow, or dbt cloud to execute the queries in your dbt models. This is a main concern since having your own infrastructure implies spending time and money on it. On the other hand, dbt cloud is quite expensive. Therefore, neither option is optimal.

On the contrary, Dataform is integrated into GCP (and it’s free). Therefore, it enables direct access to GCP orchestration tools. Dataform has an orchestration functionality, which allows you to orchestrate your Dataform pipelines. However, this feature is not customizable enough for most cases for the following reason: it is not possible to run the pipelines with dynamic variables such as date.

In the real world, most of the pipelines are incremental, since every day you are ingesting the new data from your sources. The straightforward approach is to use CURRENT_DATE() in the SQL query. However, if any run fails, re-executing it will mean running a different query since the date may be different. The recommended approach is to use a variable such as date. Then, when re-executing, it will ingest the data for the date provided as a variable and not a relative time related to the current date.

For this reason, using automated workflows is not suitable in most cases. However, for those projects that do not need to use dynamic variables, this approach has advantages:

  • Really simple and integrated inside Dataform

  • Scheduled queries on a cron interval

  • Can be fully automated with IaC (Terraform)

For a simple project, this is all we need, and having orchestration that easily is really convenient.

Other options to orchestrate Dataform are Airflow (or any other orchestrator) and Google Workflows. The first option is ideal but, as discussed before, it is time and cost expensive. The second one is the one we recommend and it will be explained in this article.

Orchestrating Dataform

There are several ways to execute Dataform. You can check a very informative comparison below from the article Mastering Dataform Execution in GCP: A Practical Guide with CI/CD Example.

 A comparison table of possibilities to run Dataform, detailing four methods: Manual Execution, Workflow Configuration, Cloud Composer, and Dataform API. It includes categories for recommendations, restrictions, event/scheduled driven capability, CI/CD usability, and pricing, highlighting differences such as the need for Python skills for Cloud Composer, the flexibility of the Dataform API, and the free pricing for Manual Execution and Workflow Configuration.
Possibilities to run Dataform

Ideally, the best approach to follow is using an orchestrator. This is because Dataform does not provide a UI where you can see every day’s run with its results. This complicates debugging on which dates the model runs and can lead to issues. It also provides functionality to run custom code for other functionalities needed before, between, or after your SQL pipelines.

Yet, we recommend leveraging its integration with GCP and orchestrating it using Google Workflows. This allows to automate the whole deployment with Terraform with very few simple scripts. Besides, Google provides its documentation and an already-made workflow for this. This solution proposes a balance between simplicity, cost, features, and maintainability. Consequently, here is the architecture.

 An architectural diagram of a production data platform showing a workflow where Cloud Scheduler triggers Cloud Workflows on a recurrent basis, which in turn triggers a Dataform pipeline with a date variable, resulting in SQL transformations in BigQuery.
Production Dataform Architecture

Demo: FinOps data product

To show the Dataform production environment, we will build a simple data product related to FinOps. This will consist of two models and a source. The source will be Google Cloud billing data table that contains the price for all used resources in our Google Cloud organization. You can check the code for the Dataform repository here.

 A screenshot of the Google Cloud Platform interface showing the BigQuery module. The image highlights a selected table 'ffnops_ls', under a dataset, with details such as the table ID, creation time, and last modified time displayed on the right. The table is partitioned by day with EU data location and is part of the 'mydemo-dataform-dev' project. An arrow points to the 'ffnops_ls' table in the Explorer tab.

Bootstrapping Dataform repository

You can check the official guidelines for the CLI here.

  • Before installing the Dataform CLI, install NPM.

  • Make sure to be inside the git folder you have created in your repository client (GitLab, GitHub, etc.)

# Install dataform
npm i -g @dataform/cli@^3.0.0-beta.4

# Initialize the repository in the current directory
dataform init . my

This will create a folder structure with workflow_settings.yaml file such as the following.

  • You can remove the default dataset entry to use the one you will set in your models

  • Set the project you want to use to store your models

  • Set the location of your BigQuery datasets

  • Set the defaultAssertionDataset to a dataset where you want to store your tests. Example: bqdts_finops_tests

dataformCoreVersion: 3.0.0-beta.4
defaultProject: my-project
defaultLocation: EU
defaultDataset: dataform
defaultAssertionDataset: dataform_assertions

The final structure is the following. The folders dm, int, sources, and stg are created manually to host there the models for each of the layers.

A snapshot of a file directory structure, likely from a development environment or version control system like Git. It shows folders named 'definitions' with subfolders 'dm', 'int', 'sources', and 'stg', and 'includes'. Also listed are files: a JSON file for credentials (-df-credentials.json), a Git ignore file (.gitignore), a README file (README.md), and a YAML file for workflow settings (workflow_settings.yaml).

We will then build int_billing_data.sqlx and net_cost_grouped.sqlx.

Declaring sources and models

We declare the source as such.

config {
    type: "declaration",
    database: dataform.projectConfig.defaultProject,
    schema: "bqdts_finops_lz",
    name: "finops_lz",
}

# Then, you can access it with the ref function ${ref("finops_lz")}

Then, we build the models. Here are a few best practices we recommend:

  • Set the tag of the stage (int, stg, dm) so that it is possible to run a stage using the tag.

  • Set the type. It can be incremental, table, or view.

  • Set the schema where to write this model

  • Set the partition and clustering if needed

  • Set the assertions, these are tests. You can see the documentation here.

  • In the pre_operations, you need to remove the current partition to have a similar behavior as dbt for incremental models. First, delete the partition you are inserting so that you don’t repeat data in your final table.

Here is an example of the model int_billing_data.sqlx. The final model net_cost_grouped.sqlx uses the same columns but adds up cost and discount to get the net_cost. This way we have a simple production-ready finops data product.

config {
  tags: ["int"],
  type: "incremental",
  schema: "bqdts_finops_int",
  bigquery: {
    partitionBy: "export_date",
    clusterBy: ["project_id", "service_name", "service_description"],
  },
  assertions: {
    nonNull: ["export_date"],
    rowConditions: [
      `service_name is not null`
    ]
  }
}

pre_operations {
  ${when(incremental(), `DELETE FROM ${self()} WHERE export_date = "${dataform.projectConfig.vars.date}"`) }
}

SELECT 
    CAST(export_time as DATE) as export_date,
    project.id as project_id,
    service.description as service_name,
    sku.description as service_description,
    currency as currency,
    SUM(cost) as cost,
    IFNULL(SUM(
        (
            SELECT sum(credit.amount)
            FROM UNNEST(credits) credit
            WHERE credit.type != 'PROMOTION'
        )
    ), 0) as discount,
from ${ref("finops_lz")}
${when(incremental(), `WHERE DATE(_PARTITIONTIME) = "${dataform.projectConfig.vars.date}"`) }
GROUP BY export_date, project_id, service_name, service_description, currency

Then, we are ready to deploy our infrastructure in Google Cloud to run our data product.

Deploying Dataform with Terraform

The repository contains a top-level folder in which shared resources will be created and a module inside the folder dataform. This module will contain all resources that need to be deployed per environment (dev, stg, prd). You can find all the code shown in this section in the following repository.

An image displaying a Terraform repository structure with a directory named 'dataform'. Inside it are Terraform files related to different configurations such as 'bigquery.tf', 'dataform.tf', 'iam_sa.tf', 'project.tf', 'sa.tf', 'scheduler.tf', 'variables.tf', and 'workflows.tf'. On the left, outside the 'dataform' directory, are other Terraform files including 'folders.tf', 'iam_user.tf', 'locals.tf', 'main.tf', 'projects.tf', 'secrets.tf', and 'variables.tf'. An arrow is pointing to the 'dataform' directory, indicating a focus on this part of the repository.
Terraform repository structure

To build the desired architecture, we need the following components.

1. Project and folder infrastructure

Starting with the basics, we will create a folder with three projects. The operations, development, and production project. Ideally we would also have a staging project, but we will omit it for this demo. The top folder and operations project are created in the top-level folder. The development and production projects are created inside the dataform module.

An image depicting a project and folder structure within a development environment. There is a main folder named 'dataform-demo' with a numeric identifier, under which there are three projects: 'mydemo-dataform-dev', 'mydemo-dataform-ops', and 'mydemo-dataform-prd', each with their own distinct alphanumeric identifiers.
Project and folder structure

2. External repository

Dataform connects with an external repository to read the queries and other Dataform metadata. You can read the official documentation here. In a nutshell, you will need a key to access the repository in a secret. This is done in the ops project.

resource "google_secret_manager_secret" "gitlab_token" {
  project   = module.project_ops_demo.project_id
  secret_id = "gitlab_token"

  replication {
    user_managed {
      replicas {
        location = "europe-west1"
      }
    }
  }
}

resource "google_secret_manager_secret_version" "gitlab_token" {
  secret = google_secret_manager_secret.gitlab_token.id

  secret_data = var.dataform_demo_gitlab_token
}

3. BigQuery datasets

If they are not yet provisioned, you will need all the BigQuery datasets to create the tables with Dataform. Dataform creates the datasets by itself, but it is a best practice to do it with Terraform. We suggest a landing zone (lz), staging (stg), intermediate (int), and data mart (dm) strategy. A dataset for Dataform tests is also needed.

locals {
  bq_datasets = toset(["bqdts_finops_lz", "bqdts_finops_stg", "bqdts_finops_int", "bqdts_finops_dm", "bqdts_finops_tests"])
}

resource "google_bigquery_dataset" "bq_datasets" {
  for_each      = toset(var.bq_datasets)
  dataset_id    = each.value
  friendly_name = each.value
  location      = "EU"
  project       = module.project.project_id
}

4. Service accounts & IAM

You need two service accounts for dev, and two for production. One is responsible for executing the Google Workflow that will then trigger the Dataform pipeline. The second one is responsible for triggering the Dataform pipeline.

resource "google_service_account" "dataform_runner" {
  account_id   = "dataform-${var.env}-runner"
  display_name = "Dataform ${var.env} runner"
  project      = module.project.project_id
  description  = "Default account which Dataform pipelines will run with"
}

resource "google_service_account" "workflow_dataform" {
  project      = module.project.project_id
  account_id   = "${var.env}-dataform-workflow-runner"
  display_name = "${var.env} Dataform Workflow Runner"
  description  = "Runs dataform workflows in ${var.env}"
}

The permissions needed are the following.

  • Secret accessor to the Dataform default SA to access the secret for the repository

  • SA token creator to the Dataform default SA to the Dataform runner SA, so that it can run workflows

  • BigQuery jobUser and DataEditor to the Dataform runner SA

  • Workflows invoker, and Dataform editor to the Workflow runner SA

  • SA token creator to the Workflow runner SA to the Dataform runner SA so that the workflow can trigger the dataform pipeline

5. Dataform repository

This resource is connected to the repository where your SQL files will be stored. Also, you will be able to see this repository the Dataform UI with its runs and the result of each.

 The image is a screenshot of the Dataform UI within Google Cloud Platform, showing a log of workflow execution logs. The logs are organized with columns for state, start time, duration, source type, state, contents, and a column indicating the environment (dev, in this case). The log entries include various actions and states, with each entry associated with a Git source type, suggesting integration with version control. This UI is part of the 'dataform_demo' project workspace.
Dataform UI for workflow execution logs
resource "google_dataform_repository" "dataform_repository" {
  provider = google-beta

  name         = "dataform_demo"
  display_name = "dataform_demo"

  project = module.project.project_id
  region  = "europe-west1"

  service_account = google_service_account.dataform_runner.email
  git_remote_settings {
    url                                 = "https://gitlab.com/demos2261901/dataform-demo.git"
    default_branch                      = "main"
    authentication_token_secret_version = var.gitlab_secret.id
  }

  workspace_compilation_overrides {
    default_database = module.project.project_id
  }
}

6. Workflow that triggers Dataform

The date variable is a Dataform variable used to run the pipeline in an incremental model. This way, the SQL queries know which data they need to retrieve and process. It defaults to use the last day in format YYYY-MM-DD

All the variables in the init step can be modified when triggering the workflow. There is a description of each of them:

  • date: Date to use in your incremental queries

  • project_id: Overwrites the default project.

  • git_commitish: Selects the git branch to use.

  • dataform_tags: Runs only a selection of tags.

  • dataform_targets: Runs only a selection of actions (models).

  • include_dependencies / include_dependents: Runs dependencies/dependents actions of selected tags/actions.

  • dataform_service_account: Dataform will perform all operations on BigQuery using this service account. If an empty string, default Dataform SA specified in your repository settings is used.

  • fully_refresh_incremental_tables: Recreates from scratch incremental tables.

  • wait_for_dataform_status_check: If true, the workflow will wait until Dataform transformations finish and return an error if Dataform does as well. If false, it will send the request to Dataform to execute the transformations and return success independently of Dataform’s result.

  • compile_only: Compile the Dataform code and do not execute it. Can be useful in a CI/CD pipeline to check there is no Dataform compilation errors before applying something.

locals {
  branch = var.env == "prd" ? "main" : var.env
}

resource "google_workflows_workflow" "dataform" {
  project         = module.project.project_id
  name            = "dataform_workflow"
  region          = "europe-west1"
  description     = "Trigger dataform workflow"
  service_account = google_service_account.workflow_dataform.id
  call_log_level  = "LOG_ERRORS_ONLY"

  source_contents = <<-EOF
main:
    params: [args]
    steps:
    - init:
        assign:
          - date: $${default(map.get(args, "date"), text.substring(time.format(sys.now() - 86400), 0, 10))} # Yesterday's date in format YYYY-MM-DD
          - project_id: ${module.project.project_id}
          - git_commitish: $${default(map.get(args, "git_commitish"), "${local.branch}")}
          - dataform_region: europe-west1
          - dataform_repository_name: ${google_dataform_repository.dataform_repository.name}
          - dataform_repository_id: ${google_dataform_repository.dataform_repository.id}
          - dataform_api_version: v1beta1
          - dataform_tags: $${default(map.get(args, "dataform_tags"), [])}
          - dataform_targets: $${default(map.get(args, "dataform_targets"), [])}
          - dataform_service_account: ${google_service_account.dataform_runner.email}
          - include_dependencies: $${default(map.get(args, "include_dependencies"), false)}
          - include_dependents: $${default(map.get(args, "include_dependents"), false)}
          - fully_refresh_incremental_tables: $${default(map.get(args, "fully_refresh_incremental_tables"), false)}
          - wait_for_dataform_status_check: $${default(map.get(args, "wait_for_dataform_status_check"), true)}
          - compile_only: $${default(map.get(args, "compile_only"), false)}
    - createCompilationResult:
        try:
            call: http.post
            args:
                url: $${"https://dataform.googleapis.com/" + dataform_api_version + "/" + dataform_repository_id + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    git_commitish: $${git_commitish}
                    codeCompilationConfig:
                        vars: { "date": "$${date}" }
                        defaultDatabase: $${project_id}
            result: compilationResult
        retry:
            maxRetries: 2
            interval: 10s
    - earlyStopBeforeDataformWorkflowInvocation:
        switch:
            - condition: $${"compilationErrors" in compilationResult.body}
              raise:
                  message: $${"Error while compiling Dataform repository :" + " " +  compilationResult.body.name}
                  compilationErrors: $${compilationResult.body.compilationErrors}
            - condition: $${compile_only}
              return: "Dataform compilation successfully done. No errors found."
    - createWorkflowInvocation:
        call: http.post
        args:
            url: $${"https://dataform.googleapis.com/" + dataform_api_version + "/" + dataform_repository_id + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: $${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - $${dataform_tags}
                    includedTargets:
                    - $${dataform_targets}
                    transitiveDependenciesIncluded: $${include_dependencies}
                    transitiveDependentsIncluded: $${include_dependents}
                    fullyRefreshIncrementalTablesEnabled: $${fully_refresh_incremental_tables}
                    serviceAccount: $${dataform_service_account}
        result: workflowInvocation
    - earlyStopBeforeDataformStatusCheck:
        switch:
            - condition: $${not wait_for_dataform_status_check}
              return: $${"Dataform workflow invocation successfully created :" + " " + workflowInvocation.body.name}
    - getInvocationResult:
        call: http.get
        args:
            url:  $${"https://dataform.googleapis.com/" + dataform_api_version + "/" + workflowInvocation.body.name}
            auth:
                type: OAuth2
        result: invocationResult
    - waitForResult:
        call: sys.sleep
        args:
            seconds: 10
        next: checkInvocationResult
    - checkInvocationResult:
        switch:
            - condition: $${invocationResult.body.state == "RUNNING"}
              next: getInvocationResult
            - condition: $${invocationResult.body.state == "SUCCEEDED"}
              return: $${"Dataform workflow invocation finished with status 'succeeded' :" + " " + invocationResult.body.name}
            - condition: $${invocationResult.body.state == "CANCELLED" or invocationResult.body.state == "FAILED" or invocationResult.body.state == "CANCELING"}
              steps:
                - raiseException:
                    raise: $${"Error while running Dataform workflow :" + " " +  invocationResult.body.name + " " + invocationResult.body.state}

Here’s an overview of what each step of the pipeline does:

  1. createCompilationResult: This initial step compiles the Dataform source code from the designated Git branch.

  2. earlyStopBeforeDataformWorkflowInvocation: If any errors are detected during the compilation phase, the workflow halts, displaying the encountered errors. Also, if compile_only is set to true, the workflow stops here.

  3. createWorkflowInvocation: Involves triggering the compiled Dataform code for invocation.

  4. earlyStopBeforeDataformStatusCheck: If you do not care about the Dataform transformation outputs, this step will stop the workflow execution before the status check.

  5. getInvocationResult: This step retrieves the status of the executed Dataform invocation. The possible results include “RUNNING”, “SUCCEEDED”, “FAILED”, or “CANCELLED”.

  6. waitForResult + checkInvocationResult: Continuously monitors the Dataform status obtained from the previous step. It iterates every 10 seconds until the status changes from “RUNNING”. Finally, it displays the definitive state of the Dataform process.

7. Scheduler

In order to run it automatically, we trigger the workflow every day at 8 am.

resource "google_cloud_scheduler_job" "daily_dataform" {
  project          = module.project.project_id
  name             = "daily_dataform"
  region           = "europe-west1"
  description      = "Gets today's date and trigger dataform daily workflow"
  schedule         = "0 8 * * *"
  time_zone        = "Europe/Paris"
  attempt_deadline = "320s"

  http_target {
    http_method = "POST"
    uri         = "https://workflowexecutions.googleapis.com/v1/${google_workflows_workflow.dataform.id}/executions"
    body = base64encode(
      jsonencode({
        "argument" : jsonencode({}),
        "callLogLevel" : "CALL_LOG_LEVEL_UNSPECIFIED"
        }
    ))

    oauth_token {
      service_account_email = google_service_account.workflow_dataform.email
      scope                 = "https://www.googleapis.com/auth/cloud-platform"
    }
  }
}

Infrastructure deployed

Then, we have a Cloud scheduler that triggers the Workflow pipeline once a day at 8:00, which we see below (the first one was a test).

 The image shows a user interface from Google Cloud Platform for a specific workflow named 'dataform_workflow' within the 'mydemo-dataform-prd' project. The view is focused on the 'EXECUTIONS' tab, displaying a list of workflow executions with their state (such as 'checkinnovationSuccess'), status (Succeeded), execution ID, workflow revision ID, start time, end time, and duration. The log shows two successful executions of the workflow with actions available on the rightmost side of each log entry. This indicates that it's a Google Workflow configured to trigger Dataform tasks.
Google Workflow that triggers Dataform

The Dataform workflow is triggered right after the workflow.

 The image shows the Google Cloud Platform interface for a project named 'dataform_demo', highlighting a section for 'WORKFLOW EXECUTION LOGS'. It displays logs for Dataform workflow executions with columns for state, start time, duration, source type, source, and contents. There are two entries in the log; one with a duration of 2 minutes and 4 seconds and the other with 21 seconds, both sourced from Git commits in the main branch with the actions labeled as "All actions".
Dataform workflow execution logs

We see the tables populated in the BigQuery production project.

 The image is a screenshot of the Google Cloud Platform's BigQuery module within a production project. It shows the BigQuery interface with an 'Explorer' sidebar where several datasets are listed, one of which is expanded to show tables, including the selected table 'net_cost_grouped'. The main pane displays the 'PREVIEW' tab for the selected table, showing a data excerpt with columns such as 'service_name', 'sku_description', 'currency', and 'net_cost'. This table is a partitioned table, as indicated by the label. There's also a section labeled 'Job history' at the bottom, suggesting the interface allows for monitoring query execution history.
BigQuery production project

Conclusion

As Dataform is integrated into Google Cloud, it is really easy to use it across different services, which makes it a great fit for organizations having data in BigQuery. Automating its deployment with Terraform is fairly easy as we have seen, and its maintenance requires little to no time thanks to its integrations.

While there are other tools in the market that perform similar tasks, it is definitively a good idea to select Dataform as the choice for SQL orchestrators in Google Cloud.

Thank you

If you enjoyed reading this article, stay tuned as we regularly publish technical articles on data stack technologies, concepts, and how to put it all together to have a smooth flow of data end to end. Follow Astrafy on LinkedIn to be notified of the next article.
If you are looking for support on your Google Cloud data & ML use cases, feel free to reach out to us at sales@astrafy.io.