Connecting Airflow DAGs to Kaldea Jobs
In Kaldea, Job allows you to schedule periodic updates of data tables.
Jobs in Kaldea can be connected to Airflow, allowing Airflow tasks to trigger their subsequent tasks created in Kaldea. Using this feature, you can create dependencies between Airflow and Kaldea-triggered tasks.
Requirements
- Airflow version 1.10.3 and up
Initial setup
Google SSO users cannot currently configure airflow-kaldea
. If you are a
Google SSO user, please ask an admin from your organization with local login
credentials to set up this connection instead.
Install the package in Airflow:
$ pip install airflow-kaldea
Click on +
to create a new connection in Airflow Connection.

Set up your Airflow Connection in the Airflow interface.
- Connection ID:
kaldea_default
- Connection Type:
HTTP
- Host:
https://api.kaldea.com/
- Login:
your-kaldea-userid-here
- Password:
your-kaldea-password-here
Connect Airflow to Kaldea Job
How-to-guide on connecting Airflow DAG to Kaldea Job.
In Kaldea
Step 1: Create a new job
- Navigate to
Job
. - Click
Create a Job
. - Give your Job a title by clicking ‘Untitled’.
For this guide, I’m going to title it as “Connect Airflow to Job”. - Click ‘Add a description’ to add more information.
- Even go further by adding adding labels. To add labels, click
+ Add label
.
I’m going to addAirflow
. - Select a “Update table” task card from the list of tables. Simply check the checkbox next to the table you would like to create an Airflow connection and dependency to.
Tables with refresh queries are indicated with a green indicator and tables without refresh queries are indicated with a red indicator. For Airflow-triggered tasks, this doesn’t matter because refresh queries are only needed for tables triggered by Kaldea.
- Newly added tasks are, by default, triggered by Kaldea. Let’s change the trigger
type to Airflow for this task card.
- Finally click
Register
to register this job.
Step 2: Create schedule for job
-
Click 🗓️ icon to create a schedule for this job.
-
Configure timezone.
For this demo, I’m going select the timezone as Seoul.
-
Configure frequency and time.
-
Finally, click
Create
to finish this step.
Step 3: Copy code
-
Click on the task card to reveal details.
-
From task details, click 📋 icon to copy the example code to clipboard.
In Airflow
Step 4: Paste code
-
Navigate to Airflow and paste the copied code.
Your code will resemble something like this:Example codefrom airflow.models import DAG from airflow_kaldea.operators.kaldea_job_operator import KaldeaJobOperator default_args = {} dag = DAG( dag_id='data_dag', default_args=default_args, schedule_interval='0 * * * *', # Job schedule time ) kaldea_job = KaldeaJobOperator( task_id='{{table_name}}_kaldea_job', # Target table name kaldea_job_id='{{kaldea_job_id}}', # Job Id kaldea_task_id='{{kaldea_task_id}}', # Task Id dag=dag, )
Make sure that the schedule corresponds to the actual
schedule_interval
of the Airflow DAG to which you are attempting to link. Otherwise the integration will not work. -
Add necessary dependencies between the task and the added KaldeaJob Operator. e.g.
airflow_dim_total_users >> kaldea_job_dim_total_users
In Kaldea
Step 5: Activate Job
- Finally, go back to Kaldea and activate job by pressing this toggle.