×
Workflow Orchestration

Google Cloud
Cloud Composer v2

Managed Apache Airflow workflow orchestration service. Apache Airflow is an open source platform to orchestrate data pipelines programmatically in Python.

Availability: Regional
Standard: Python 3 / Airflow 2
Author: Michaël Bettan
01

Overview & Key Capabilities

Cloud Composer v2 provides a managed environment for Apache Airflow, making it a great fit for hybrid and multi-cloud architectures. It allows you to author, schedule, and monitor data pipelines natively within Airflow.

Use Cases

  • Orchestration of data ingestion, including hybrid use cases
  • Data processing
  • Orchestration of proprietary API endpoints jobs
  • Custom code tasks run directly in Airflow
  • ML / AI pipelines

Billing Components

  • Web, Database core hours
  • Database SQL core hours
  • Web and Data storage
  • Network egress for all
  • Workers and Scheduler GKE nodes
  • GCS Bucket (DAGs)
  • Cloud Monitoring logs usage
02

Architecture & Components

An Environment is the fundamental instance construct of Cloud Composer. It is deployed automatically across two distinct Google Cloud projects: your Customer project and a Google-managed tenant project.

Customer Project
Where you create your environments. You can create more than one environment in a single customer project. Hosts the GKE cluster and Cloud Storage bucket.
Google-managed Tenant Project
Provides unified access control and an additional layer of data security. Each environment has its own tenant project containing Cloud SQL, Identity-Aware-Proxy, and App Engine Flexible.
Airflow Web Server
Runs on App Engine Flexible and serves the Airflow UI.
Airflow Database
Hosted on Cloud SQL; acts as the backend database storing all Airflow metadata.
Environment's Bucket
A Cloud Storage bucket used to store DAGs, plugins, data dependencies, and Airflow logs.
Airflow Scheduler
Controls the scheduling of DAG runs and individual tasks. Distributes tasks to Airflow workers by using a Redis queue (runs as GKE deployments).
Airflow Workers
Execute individual tasks from DAGs by taking them from the Redis queue. Run as GKE deployments.
Redis Queue
Holds a queue of individual tasks. Runs as a GKE StatefulSet application so that messages persist across container restarts.

VPC Peering Note

For each Private IP environment, Cloud Composer creates one VPC peering connection for the tenant project network.

03

Airflow DAGs & Operators

Airflow DAGs (Directed Acyclic Graphs)

A DAG is synonymous with a Workflow pipeline. Acyclic means the workflow travels in one direction without creating a closed loop.

Example Pipeline Flow:

STEP 1

Export

Export BigQuery data

STEP 2

Move

Move to GCS folder

STEP 3

Cleanse

Cleanse data

STEP 4

Ingest

Ingest into Snowflake

STEP 5

Notify

Send Slack Message

Airflow Operators are the core building blocks of DAGs. Operators invoke the tasks you want to complete and are usually atomic in nature (one operator per task).

DummyOperator
Used primarily for visual clarity in the UI (e.g., establishing start or end points).
BashOperator & PythonOperator
Executes bash commands or Python code respectively within the task context.
GoogleCloudStorageToBigQueryOperator
Transfers and loads data originating from Cloud Storage directly into BigQuery tables.
BigQueryToCloudStorageOperator
Exports data from BigQuery to GCS. Pay attention to the different export formats (Avro, Parquet) and compression options (Snappy, Deflate) for cost optimization.
GoogleCloudStorageToGoogleCloudStorageOperator
Efficiently copies files between different GCS buckets.
EmailOperator & Others
Sends emails. Many other operators exist for interacting with services like DataProc, Dataflow, and Pub/Sub.
04

Failure & ETL Patterns

Failure and Retry Mechanisms

Push-based ETL

Cloud Functions + Airflow

  • Data ingestion is initiated by an external event (e.g., a new file in Cloud Storage, a Pub/Sub message).
  • The event triggers a Cloud Function, which triggers the specific Airflow DAG execution.
  • Reactive approach: Ideal for near real-time data processing dependent on the arrival of new data.
  • Cloud Function acts as a lightweight intermediary, decoupling the event source from the Airflow execution.

Pull-based ETL

Scheduled Airflow DAGs

  • Initiated on a predefined schedule (e.g., cron expression) directly within Airflow.
  • Airflow Scheduler autonomously checks for scheduled DAG runs and executes them at designated times.
  • Proactive approach: Best suited for batch processing of data at regular intervals where volume is predictable.
  • Simpler to manage than push-based systems but less responsive to immediate data changes.
05

Logging, Monitoring & Security

Logging & Monitoring

  • Operational Logs (System): E.g., Scheduler logs. Viewable via Logs Viewers in the console from Cloud Monitoring.
  • Task Logs: Each DAG task has an associated log accessible via the Airflow UI, or directly in the Logs folder in GCS.
  • Natively monitor system and task health directly from the Environment UI.

Security & IAM Roles

  • Composer Administrator: Full control of all Composer resources.
  • Environment and Storage Object Administrator: Administrative control over environments and GCS bucket objects.
  • Environment User and Storage Object Viewer: View-only access to environments and underlying storage objects.
  • Composer User: Basic user access for Airflow operations.

Self-Assessment Questions

Q1. In the Cloud Composer architecture, where are your DAGs, plugins, and Airflow logs physically stored?

In the Environment's Cloud Storage bucket, which is hosted within your Customer project.

Q2. Which architectural component is responsible for storing Airflow metadata?

The Airflow Database, which is hosted on Cloud SQL in the Google-managed tenant project.

Q3. You need to process data in near real-time whenever a new file lands in Cloud Storage. Which ETL pattern should you use?

Push-based ETL using a Cloud Function triggered by the GCS event to launch the Airflow DAG reactively.

Q4. How does Airflow ensure that individual tasks persist across container restarts?

Tasks are held in a Redis queue running as a GKE StatefulSet application.

Q5. You want to execute a custom Python function if a specific DAG task fails. How can you implement this?

By using the `on_failure_callback` parameter designed to handle custom logic upon task failure.
Study Progress — Exam Readiness 100%