Monitoring big data with orchestrators: Airflow and Datadog

Giseung Beak
Chartboost Engineering
3 min readFeb 9, 2023

--

The characteristics of big data are often described using the five V’s: volume, veracity, velocity, variety, and value. Data volume has increased immensely as more users have interconnected with a variety of platform services, resulting in increasing quantity and size of data. Imagine seeing 100 petabytes of data but full of noise, abnormality, and bias. With traditional computing, it is very difficult to identify inaccurate data and ensure its quality. In order to maintain high veracity with large volumes of data, the data team at Chartboost utilizes the Apache Airflow workflow management platform with Datadog observability service.

Datadog can be integrated with Apache Airflow via its webhook. DatadogHook is a module that allows sending of any measurable metrics with the use of the Datadog API.

Architecture image

Maintenance is a crucial aspect of data engineering. As queries are generated and run frequently on a daily or even hourly basis, costs add up and job execution time to create or modify tables in a data warehouse increase. We designed an Airflow DAG that identifies any BigQuery queries that run longer than a predefined threshold. Our threshold is 70 minutes, but the value can be varied depending on your environment and your goals. Once longer-running queries are detected, the DAG uses the DatadogHook to send post requests. Engineers are notified via email or Slack when Datadog receives the requested traffic.

Datadog

Furthermore, data validation plays a very significant role in data engineering. The data team at Chartboost works across the internal teams so that each BigQuery table is constructed for different business purposes with its own logic. This can make it an arduous task for any engineer who attempts to investigate issues or debug full queries for any reason.

Fortunately, we have developed a tool that audits data in a programmatic way. In it, one table includes multiple numeric columns for campaign activities. We utilize these numeric columns to aggregate key components and confirm whether these numbers are always matched. If they are unmatched, the data team is notified via email, Slack, and PagerDuty.

The code below demonstrates how to use Apache Airflow along with Datadog and Google Cloud services.

from airflow import DAG
from airflow.providers.datadog.hooks.datadog import DatadogHook
from airflow.operators.python import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.python import BranchPythonOperator
from airflow.models import Variable
from cbutils.utils import send_task_success
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook


def email_body(ti):
bq_hook = BigQueryHook(gcp_conn_id='google_cloud_default')
sql_query = """
WITH test AS (
SELECT job_id, start_time, end_time
FROM `region-eu`.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION
WHERE creation_time = CURRENT_TIMESTAMP() AND TIMESTAMP_DIFF(end_time, start_time, MINUTE) > 45
)

SELECT job_id, TIMESTAMP_DIFF(end_time, start_time, MINUTE) AS num_minutes
FROM test
"""

bq_df = bq_hook.get_pandas(sql=sql_query, dialect='standard')
html = bq_df.to_html(index=False)
ti.xcom_push(key='html', value=html)


def revenue_stats(**kwargs):
bq_hook = BigQueryHook(gcp_conn_id='google_cloud_default')
execution_date = kwargs['execution_date']
sql_query = """
SELECT SUM(revenue) AS money_earned, SUM(expense) AS money_spent
FROM finance_test
WHERE dt = "{date}"
""".format(date=execution_date)


bq_df = bq_hook.get_pandas(sql=sql_query, dialect='standard')

money_earned = bq_df['money_earned'][0]
money_spent = bq_df['money_spent'][0]

datadog_hook = DatadogHook(datadog_conn_id='datadog_default')

datadog_hook.post_event(title="Title_Of_Event", text="Body_Of_Event")


with DAG(dag_id="datadog_test", schedule_interval="@daily", catchup=False, Tags=["P1", "test"]) as dag:

task_revenue_audit_to_datadog = PythonOperator(
task_id="send_alert_to_datadog",
python_callable=revenue_stats,
provide_context=True
)

task_send_alert_to_email_slack = EmailOperator(
task_id="send_alert_to_email_slack",
to=["test@chartboost.com", "test@chartboost.slack.com"],
subject="[Data Alert] Long running queries",
html_content="{{ task_instance.xcom_pull(task_ids='html') }}"
)

all_tasks = DummyOperator(
task_id='all_task',
on_success_callback=send_task_success,
trigger_rule='none_failed_min_one_success'
)

task_revenue_audit_to_datadog >> task_send_alert_to_email_slack >> all_tasks


Due to a large volume of data from a massive number of users, managing the complexity of data without using a modern technology stack is inevitable. Orchestrators such as Apache Airflow yields plenty of benefits to ensure the accuracy of massive data, integrate with cloud-scale applications like a Datadog, and facilitate the setting up of alerting systems. In this post, only two sample solutions were introduced, but the data team at Chartboost has implemented a significant number of dynamic solutions with a modern tech stack.

Giseung Beak (pronounced “beck”) is a data engineer at Chartboost. Prior to Chartboost, he worked at Vueling, OppenheimerFunds (Acquired by Invesco), and Selerity (Acquired by ION Investment Group).

--

--

Giseung Beak (pronounced "beck") is a Data Engineer at Chartboost. He holds a Master of Science degree in Data Analytics from Fordham University.