Getting started
Great Expectations (GX) is a framework for describing data using expressive tests and then validating that the data meets test criteria. Great Expectations maintains the Great Expectations Airflow Provider to give users a convenient method for running validations directly from their DAGs. The Great Expectations Airflow Provider has three Operators to choose from, which vary in the amount of configuration they require and the flexibility they provide.
GXValidateDataFrameOperatorGXValidateBatchOperatorGXValidateCheckpointOperator
Operator use cases
When deciding which Operator best fits your use case, consider the location of the data you are validating, whether or not you need external alerts or actions to be triggered by the Operator, and what Data Context you will use. When picking a Data Context, consider whether or not you need to view how results change over time.
- If your data is in memory as a Spark or Pandas DataFrame, we recommend using the
GXValidateDataFrameOperator. This option requires only a DataFrame and your Expectations to create a validation result. - If your data is not in memory, we recommend configuring GX to connect to it by defining a BatchDefinition with the
GXValidateBatchOperator. This option requires a BatchDefinition and your Expectations to create a validation result. - If you want to trigger actions based on validation results, use the
GXValidateCheckpointOperator. This option supports all features of GX Core, so it requires the most configuration - you have to define a Checkpoint, BatchDefinition, ExpectationSuite, and ValidationDefinition to get validation results.
The Operators vary in which Data Contexts they support. All 3 Operators support Ephemeral and GX Cloud Data Contexts. Only the GXValidateCheckpointOperator supports the File Data Context.
- If the results are used only within the Airflow DAG by other tasks, we recommend using an Ephemeral Data Context. The serialized Validation Result will be available within the DAG as the task result, but will not persist externally for viewing the results across multiple runs. All 3 Operators support the Ephemeral Data Context.
- To persist and view results outside of Airflow, we recommend using a Cloud Data Context. Validation Results are automatically visible in the GX Cloud UI when using a Cloud Data Context, and the task result contains a link to the stored validation result. All 3 Operators support the Cloud Data Context.
- If you want to manage Validation Results yourself, use a File Data Context. With this option, Validation Results can be viewed in Data Docs. Only the
GXValidateCheckpointOperatorsupports the File Data Context.
Prerequisites
- Python version 3.10 to 3.13
- Great Expectations version 1.7.0+
- Apache Airflow® version 2.1.0+
Assumed knowledge
To get the most out of this getting started guide, make sure you have an understanding of:
- The basics of Great Expectations. See Try GX Core.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow Operators. See Operators 101.
- Airflow connections. See Managing your Connections in Apache Airflow.
Install the provider and dependencies
- Install the provider.
athena
- azure
- bigquery
- gcp
- mssql
- postgresql
- s3
- snowflake
- spark
Configure an Operator
After deciding which Operator best fits your use case, follow the Operator-specific instructions below to configure it.
Data Frame Operator
-
Import the Operator.
-
Instantiate the Operator with required and optional parameters.
from typing import TYPE_CHECKING from great_expectations_provider.operators.validate_dataframe import ( GXValidateDataFrameOperator, ) if TYPE_CHECKING: from pandas import DataFrame from great_expectations import ExpectationSuite from great_expectations.data_context import AbstractDataContext from great_expectations.expectations import Expectation def configure_dataframe() -> DataFrame: import pandas as pd # airflow best practice is to not import heavy dependencies in the top level return pd.read_csv("/path/to/my/data") def configure_expectations(context: AbstractDataContext) -> Expectation | ExpectationSuite: from great_expectations import ExpectationSuite return context.suites.add_or_update( ExpectationSuite( name="example expectation suite", expectations=[...] ) ) my_data_frame_operator = GXValidateDataFrameOperator( task_id="my_data_frame_operator", configure_dataframe=configure_dataframe, configure_expectations=configure_expectations, )task_id: alphanumeric name used in the Airflow UI and GX Cloud.configure_dataframe: function that returns a DataFrame to pass data to the Operator.configure_expectations: function that returns either a single Expectation or an Expectation Suite to validate against your data.result_format(optional): acceptsBOOLEAN_ONLY,BASIC,SUMMARY, orCOMPLETEto set the verbosity of returned Validation Results. Defaults toSUMMARY.context_type(optional): acceptsephemeralorcloudto set the Data Context used by the Operator. Defaults toephemeral, which does not persist results between runs. To save and view Validation Results in GX Cloud, usecloudand complete the additional Cloud Data Context configuration below.
For more details, explore this end-to-end code sample.
-
If you use a Cloud Data Context, create a free GX Cloud account to get your Cloud credentials and then set the following Airflow variables.
GX_CLOUD_ACCESS_TOKENGX_CLOUD_ORGANIZATION_IDGX_CLOUD_WORKSPACE_ID
The GXValidateDataFrameOperator creates an XCom that contains a link to view your results in the GX Cloud UI.
Batch Operator
-
Import the Operator.
-
Instantiate the Operator with required and optional parameters.
from typing import TYPE_CHECKING from great_expectations_provider.operators.validate_batch import ( GXValidateBatchOperator, ) if TYPE_CHECKING: from great_expectations import ExpectationSuite from great_expectations.data_context import AbstractDataContext from great_expectations.expectations import Expectation from great_expectations.core.batch_definition import BatchDefinition def configure_expectations(context: AbstractDataContext) -> Expectation | ExpectationSuite: from great_expectations import ExpectationSuite return context.suites.add_or_update( ExpectationSuite( name="example expectation suite", expectations=[...] ) ) def configure_postgres_batch_definition(context: AbstractDataContext) -> BatchDefinition: """This function takes a GX Context and returns a BatchDefinition that loads a batch of data from a PostgreSQL table.""" return ( context.data_sources.add_postgres( name="example data source", connection_string="${POSTGRES_CONNECTION_STRING}", # load credentials from environment ) .add_table_asset( name="example table asset", table_name="my_table", ) .add_batch_definition_whole_table( name="example batch definition" ) ) my_batch_operator = GXValidateBatchOperator( task_id="my_postgres_batch_operator", configure_batch_definition=configure_postgres_batch_definition, configure_expectations=configure_expectations, )task_id: alphanumeric name used in the Airflow UI and GX Cloud.configure_batch_definition: function that returns a BatchDefinition to configure GX to read your data.configure_expectations: function that returns either a single Expectation or an Expectation Suite to validate against your data.batch_parameters(optional): dictionary that specifies a time-based Batch of data to validate your Expectations against. Defaults to the first valid Batch found, which is the most recent Batch (with default sort ascending) or the oldest Batch if the Batch Definition has been configured to sort descending.result_format(optional): acceptsBOOLEAN_ONLY,BASIC,SUMMARY, orCOMPLETEto set the verbosity of returned Validation Results. Defaults toSUMMARY.context_type(optional): acceptsephemeralorcloudto set the Data Context used by the Operator. Defaults toephemeral, which does not persist results between runs. To save and view Validation Results in GX Cloud, usecloudand complete the additional Cloud Data Context configuration below.
For more details, explore this end-to-end code sample.
-
If you use a Cloud Data Context, create a free GX Cloud account to get your Cloud credentials and then set the following Airflow variables.
GX_CLOUD_ACCESS_TOKENGX_CLOUD_ORGANIZATION_IDGX_CLOUD_WORKSPACE_ID
When using GX Cloud, you can define your configuration in code like the example above, or you can use the Cloud UI to create and edit your configuration. GX Cloud automatically creates a Batch Definition and Expectation Suite for you when you configure a Data Asset. You can find the names of these default resources by inspecting your GX configuration with the Python API. The
GXValidateBatchOperatorcreates an XCom that contains a link to view your results in the GX Cloud UI.
Checkpoint Operator
-
Import the Operator.
-
Instantiate the Operator with required and optional parameters.
from typing import TYPE_CHECKING from pathlib import Path from great_expectations_provider.operators.validate_checkpoint import ( GXValidateCheckpointOperator, ) if TYPE_CHECKING: from great_expectations import ExpectationSuite, Checkpoint from great_expectations.data_context import AbstractDataContext def configure_csv_checkpoint(context: AbstractDataContext) -> Checkpoint: """This function takes a GX Context and returns a Checkpoint that can load CSV files from the data directory, validate them against an ExpectationSuite, and run Actions.""" # import GX objects locally per Airflow best practices import great_expectations.expectations as gxe from great_expectations import Checkpoint, ExpectationSuite, ValidationDefinition # Set up data source, asset, batch definition batch_definition = ( context.data_sources.add_pandas_filesystem( name="Load Datasource", base_directory=Path("/path/to/my/data") ) .add_csv_asset("Load Asset") .add_batch_definition_monthly( name="Load Batch Definition", regex=r"yellow_tripdata_sample_(?P<year>\d{4})-(?P<month>\d{2}).csv", ) ) # Set up expectation suite expectation_suite = context.suites.add( ExpectationSuite( name="Load ExpectationSuite", expectations=[...], ) ) # Set up validation definition validation_definition = context.validation_definitions.add( ValidationDefinition( name="Load Validation Definition", data=batch_definition, suite=expectation_suite, ) ) # Set up checkpoint checkpoint = context.checkpoints.add( Checkpoint( name="Load Checkpoint", validation_definitions=[validation_definition], actions=[], ) ) return checkpoint my_checkpoint_operator = GXValidateCheckpointOperator( task_id="my_checkpoint_operator", configure_checkpoint=configure_csv_checkpoint, )task_id: alphanumeric name used in the Airflow UI and GX Cloud.configure_checkpoint: function that returns a Checkpoint, which orchestrates a ValidationDefinition, BatchDefinition, and ExpectationSuite. The Checkpoint can also specify a Result Format and trigger actions based on Validation Results.batch_parameters(optional): dictionary that specifies a time-based Batch of data to validate your Expectations against. Defaults to the first valid Batch found, which is the most recent Batch (with default sort ascending) or the oldest Batch if the Batch Definition has been configured to sort descending.context_type(optional): acceptsephemeral,cloud, orfileto set the Data Context used by the Operator. Defaults toephemeral, which does not persist results between runs. To save and view Validation Results in GX Cloud, usecloudand complete the additional Cloud Data Context configuration below. To manage Validation Results yourself, usefileand complete the additional File Data Context configuration below.configure_file_data_context(optional): function that returns a FileDataContext. Applicable only when using a File Data Context. See the additional File Data Context configuration below for more information.
For more details, explore this end-to-end code sample.
-
If you use a Cloud Data Context, create a free GX Cloud account to get your Cloud credentials and then set the following Airflow variables.
GX_CLOUD_ACCESS_TOKENGX_CLOUD_ORGANIZATION_IDGX_CLOUD_WORKSPACE_ID
When using GX Cloud, you can define your configuration in code like the example above, or you can use the Cloud UI to create and edit your configuration. GX Cloud automatically creates a Checkpoint for you when you configure a Data Asset. You can find the name of this Checkpoint in the Cloud UI by selecting your Data Asset, navigating to the Validations tab, clicking the code snippet button in the upper right, and selecting Generate Snippet. The
GXValidateCheckpointOperatorcreates an XCom that contains a link to view your results in the GX Cloud UI. -
If you use a File Data Context, pass the
configure_file_data_contextparameter. This takes a function that returns a FileDataContext. By default, GX will write results in the configuration directory. If you are retrieving your FileDataContext from a remote location, you can yield the FileDataContext in theconfigure_file_data_contextfunction and write the directory back to the remote after control is returned to the generator.
Manage Data Source credentials with Airflow Connections
The Great Expectations Airflow Provider includes functions to retrieve connection credentials from other Airflow provider Connections. The following external Connections are supported:
Supported Connection Types
| Connection Type | API Function | External Provider Documentation |
|---|---|---|
| Amazon Redshift | build_redshift_connection_string(conn_id, schema=None) |
Amazon Provider |
| MySQL | build_mysql_connection_string(conn_id, schema=None) |
MySQL Provider |
| Microsoft SQL Server | build_mssql_connection_string(conn_id, schema=None) |
Microsoft SQL Server Provider |
| PostgreSQL | build_postgres_connection_string(conn_id, schema=None) |
PostgreSQL Provider |
| Snowflake | build_snowflake_connection_string(conn_id, schema=None) |
Snowflake Provider |
| Snowflake (Key-based Auth) | build_snowflake_key_connection(conn_id, schema=None) |
Snowflake Provider |
| Google Cloud BigQuery | build_gcpbigquery_connection_string(conn_id, schema=None) |
Google Cloud Provider |
| SQLite | build_sqlite_connection_string(conn_id) |
SQLite Provider |
| AWS Athena | build_aws_connection_string(conn_id, schema=None, database=None, s3_path=None, region=None) |
Amazon Provider |
Usage
To use these functions, first install the Airflow Provider that maintains the connection you need,
and use the Airflow UI to configure the Connection with your credentials.
Then, import the function you need from great_expectations_provider.common.external_connections
and use it within your configure_batch_definition or configure_checkpoint function.
from __future__ import annotations
from typing import TYPE_CHECKING
from great_expectations_provider.common.external_connections import (
build_postgres_connection_string,
)
if TYPE_CHECKING:
from great_expectations.data_context import AbstractDataContext
from great_expectations.core.batch_definition import BatchDefinition
def configure_postgres_batch_definition(
context: AbstractDataContext,
) -> BatchDefinition:
task_id = "example_task"
table_name = "example_table"
postgres_conn_id = "example_conn_id"
return (
context.data_sources.add_postgres(
name=task_id,
connection_string=build_postgres_connection_string(
conn_id=postgres_conn_id
),
)
.add_table_asset(
name=task_id,
table_name=table_name,
)
.add_batch_definition_whole_table(task_id)
)
Add the configured Operator to a DAG
After configuring an Operator, add it to a DAG. Explore our example DAGs, which have sample tasks that demonstrate Operator functionality.
Note that the shape of the Validation Results depends on both the Operator type and Result Format configuration.
- GXValidateDataFrameOperator and GXValidateBatchOperator return a serialized ExpectationSuiteValidationResult
- GXValidateCheckpointOperator returns a CheckpointResult.
- The included fields depend on the Result Format verbosity.
Run the DAG
Trigger the DAG manually or run it on a schedule to start validating your expectations of your data.