Mananged elements
Experimental
#

This guide provides an intro to Dagster managed elements. Managed elements allow you to centralize the configuration for your data stack, specifying configuration in Python code. You can check-in and version your config with version control or programatically generate config for more complex use-cases.

Currently, the following integrations support managed elements:


Getting started#

In order to use managed elements, you must first install the dagster-managed-elements package. This includes the base APIs used by each implementation, as well as making the managed elements CLI available, available as dagster-me.

Every managed element is controlled by a ManagedElementReconciler implementation. Typically, all of the reconcilers in your project are colocated in a single Python module, which the dagster-me CLI will use to discover reconcilers.

Using the dagster-me CLI#

The dagster-me CLI allows you to check the synchronization status of your managed elements, as well as to apply any config changes.

Checking the status of your managed elements#

To load all managed element reconcilers in a module and check their status, you can run:

dagster-me check --module my_python_module.my_submodule

This will print out a diff of the current state and the desired state for each reconciler.

You may also specify a working directory to search for the module in:

dagster-me check --module my_python_module.my_submodule --working-directory ./my_project

Additionally, you can specify one or more Python object paths to select a specific reconciler:

dagster-me check --module my_python_module.my_submodule:my_reconciler

Applying changes#

To apply changes to your managed elements, you can instead run the apply command:

dagster-me apply --module my_python_module.my_submodule

Using managed elements to configure Airbyte#

As an alternative to configuring Airbyte using its UI, you may create and update Airbyte sources, destinations, and connections using managed elements.

Defining a reconciler#

The first step is to create a reconciler, which is pointed at a specific Airbyte instance using an Airbyte resource. It is provided with a list of connections to reconcile, which we will set up next.

from dagster_airbyte import AirbyteManagedElementReconciler, airbyte_resource

airbyte_instance = airbyte_resource.configured(
    {
        "host": "localhost",
        "port": "8000",
        # If using basic auth, include username and password:
        "username": "airbyte",
        "password": {"env": "AIRBYTE_PASSWORD"},
    }
)

airbyte_reconciler = AirbyteManagedElementReconciler(
    airbyte=airbyte_instance,
    connections=[],
)

Defining sources and destinations#

Next, we will define our sources and destinations. These are defined using the AirbyteSource and AirbyteDestination classes, respectively.

from dagster_airbyte import AirbyteSource, AirbyteDestination

cereals_csv_source = AirbyteSource(
    name="cereals-csv",
    source_type="File",
    source_configuration={
        "url": "https://docs.dagster.io/assets/cereal.csv",
        "format": "csv",
        "provider": {"storage": "HTTPS", "user_agent": False},
        "dataset_name": "cereals",
    },
)

local_json_destination = AirbyteDestination(
    name="local-json",
    destination_type="Local JSON",
    destination_configuration={"destination_path": "/local/cereals_out.json"},
)

Defining a connection#

Finally, we will define a connection between our source and destination. This is defined using the AirbyteConnection class.

from dagster_airbyte import AirbyteConnection, AirbyteSyncMode

cereals_connection = AirbyteConnection(
    name="download-cereals",
    source=cereals_csv_source,
    destination=local_json_destination,
    stream_config={"cereals": AirbyteSyncMode.FULL_REFRESH_OVERWRITE},
)

We'll supply our new connection to the reconciler we defined earlier:

airbyte_reconciler = AirbyteManagedElementReconciler(
    airbyte=airbyte_instance,
    connections=[cereals_connection],
)

Applying changes#

Next, we'll use the dagster-me CLI to sanity check our reconciler and apply any changes:

dagster-me check --module my_python_module.my_submodule:reconciler

Found 1 managed elements, checking...
+ cereals-csv:
  + url: https://docs.dagster.io/assets/cereal.csv
  + format: csv
  + dataset_name: cereals
  + provider:
    + user_agent: False
    + storage: HTTPS
+ local-json:
  + destination_path: /local/cereals_out.json
+ download-cereals:
  + source: cereals-csv
  + destination: local-json
  + normalize data: None
  + streams:
    + cereals: FULL_REFRESH_OVERWRITE

Since this looks good, we'll apply the changes:

dagster-me apply --module my_python_module.my_submodule:reconciler

Now, we should see our new connection in the Airbyte UI!