Dagster & Sling
Sling provides an easy-to-use YAML configuration layer for loading data from files, replicating data between databases, exporting custom SQL queries to cloud storage, and much more.
How it works
The Dagster integration allows you to derive Dagster assets from a replication configuration file. The typical pattern for building an ELT pipeline with Sling has three steps:
-
Define a Sling
replication.yaml
file that specifies the source and target connections, as well as which streams to sync from. -
Create a
SlingResource
and pass a list ofSlingConnectionResource
for each connection to theconnection
parameter, ensuring the resource uses the same name given to the connection in the Sling configuration. -
Use the
@dagster_sling.sling_assets
decorator to define an asset that runs the Sling replication job and yields from theSlingResource
method to run the sync.
We'll walk you through each of these steps in this guide.
Prerequisites
To follow the steps in this guide:
-
Familiarize yourself with Sling's replication configuration, if you've never worked with Sling before. The replication configuration is a YAML file that specifies the source and target connections, as well as which streams to sync from. The
dagster-sling
integration uses this configuration to build assets for both sources and destinations. -
To install the following libraries:
pip install dagster dagster-sling
Refer to the Dagster installation guide for more info.
Step 1: Set up a Sling replication configuration
Dagster's Sling integration is built around Sling's replication configuration. You may provide either a path to an existing replication.yaml
file or construct a dictionary that represents the configuration in Python. This configuration is passed to the Sling CLI to run the replication job.
- replication.yaml
- Python
replication.yaml
This example creates a replication configuration in a replication.yaml
file:
# replication.yaml
source: MY_POSTGRES
target: MY_SNOWFLAKE
defaults:
mode: full-refresh
object: '{stream_schema}_{stream_table}'
streams:
public.accounts:
public.users:
public.finance_departments:
object: 'departments'
Python
This example creates a replication configuration using Python:
replication_config = {
"source": "MY_POSTGRES",
"target": "MY_DUCKDB",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": None,
"public.finance_departments": {"object": "departments"},
},
}
Step 2: Create a Sling resource
Next, you'll create a SlingResource
object that contains references to the connections specified in the replication configuration:
# pyright: reportCallIssue=none
from dagster_sling import SlingConnectionResource, SlingResource
from dagster import EnvVar
sling_resource = SlingResource(
connections=[
# Using a connection string from an environment variable
SlingConnectionResource(
name="MY_POSTGRES",
type="postgres",
connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
),
# Using a hard-coded connection string
SlingConnectionResource(
name="MY_DUCKDB",
type="duckdb",
connection_string="duckdb:///var/tmp/duckdb.db",
),
# Using a keyword-argument constructor
SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
role="REPORTING",
),
]
)
A SlingResource
takes a connections
parameter, where each SlingConnectionResource
represents a connection to a source or target database. You may provide as many connections to the SlingResource
as needed.
The name
parameter in the SlingConnectionResource
should match the source
and target
keys in the replication configuration.
You can pass a connection string or arbitrary keyword arguments to the SlingConnectionResource
to specify the connection details. Refer to Sling's connections reference for the specific connection types and parameters.
Step 3: Define the Sling assets
Next, define a Sling asset using the @dagster_sling.sling_assets
decorator. Dagster will read the replication configuration to produce assets.
Each stream will render two assets, one for the source stream and one for the target destination. You can override how assets are named by passing in a custom DagsterSlingTranslator
object.
from dagster_sling import SlingResource, sling_assets
from dagster import Definitions, file_relative_path
replication_config = file_relative_path(__file__, "../sling_replication.yaml")
sling_resource = SlingResource(connections=[...]) # Add connections here
@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(context=context)
for row in sling.stream_raw_logs():
context.log.info(row)
Step 4: Create the Definitions object
The last step is to include the Sling assets and resource in a Definitions
object. This enables Dagster tools to load everything we've defined:
defs = Definitions(
assets=[
my_assets,
],
resources={
"sling": sling_resource,
},
)
That's it! You should now be able to view your assets in the Dagster UI and run the replication job.