In Kaldea, Job allows you to schedule periodic updates of data tables.

Jobs in Kaldea can be connected to Airflow, allowing Airflow tasks to trigger their subsequent tasks created in Kaldea. Using this feature, you can create dependencies between Airflow and Kaldea-triggered tasks.

Requirements

  • Airflow version 1.10.3 and up

Initial setup

Google SSO users cannot currently configure airflow-kaldea. If you are a Google SSO user, please ask an admin from your organization with local login credentials to set up this connection instead.

Install the package in Airflow:

$ pip install airflow-kaldea

Click on + to create a new connection in Airflow Connection.

Set up your Airflow Connection in the Airflow interface.

  • Connection ID: kaldea_default
  • Connection Type: HTTP
  • Host: https://api.kaldea.com/
  • Login: your-kaldea-userid-here
  • Password: your-kaldea-password-here

Connect Airflow to Kaldea Job

How-to-guide on connecting Airflow DAG to Kaldea Job.

5 minute guide

In Kaldea

Step 1: Create a new job

  1. Navigate to Job.
  2. Click Create a Job.
  3. Give your Job a title by clicking ‘Untitled’.

    For this guide, I’m going to title it as “Connect Airflow to Job”.
  4. Click ‘Add a description’ to add more information.
  5. Even go further by adding adding labels. To add labels, click + Add label.

    I’m going to add Airflow.
  6. Select a “Update table” task card from the list of tables. Simply check the checkbox next to the table you would like to create an Airflow connection and dependency to.

    Tables with refresh queries are indicated with a green indicator and tables without refresh queries are indicated with a red indicator. For Airflow-triggered tasks, this doesn’t matter because refresh queries are only needed for tables triggered by Kaldea.

  7. Newly added tasks are, by default, triggered by Kaldea. Let’s change the trigger type to Airflow for this task card.
  8. Finally click Register to register this job.

Step 2: Create schedule for job

  1. Click 🗓️ icon to create a schedule for this job.

  2. Configure timezone.
    For this demo, I’m going select the timezone as Seoul.

  3. Configure frequency and time.

  4. Finally, click Create to finish this step.

Step 3: Copy code

  1. Click on the task card to reveal details.

  2. From task details, click 📋 icon to copy the example code to clipboard.

In Airflow

Step 4: Paste code

  1. Navigate to Airflow and paste the copied code.
    Your code will resemble something like this:

    Example code
    from airflow.models import DAG
    from airflow_kaldea.operators.kaldea_job_operator import KaldeaJobOperator
    
    default_args = {}
    
    dag = DAG(
        dag_id='data_dag',
        default_args=default_args,
        schedule_interval='0 * * * *',  # Job schedule time
    )
    
    kaldea_job = KaldeaJobOperator(
        task_id='{{table_name}}_kaldea_job',  # Target table name
        kaldea_job_id='{{kaldea_job_id}}',  # Job Id
        kaldea_task_id='{{kaldea_task_id}}',  # Task Id
        dag=dag,
    )
    

    Make sure that the schedule corresponds to the actual schedule_interval of the Airflow DAG to which you are attempting to link. Otherwise the integration will not work.

  2. Add necessary dependencies between the task and the added KaldeaJob Operator. e.g. airflow_dim_total_users >> kaldea_job_dim_total_users

In Kaldea

Step 5: Activate Job

  1. Finally, go back to Kaldea and activate job by pressing this toggle.