This page shows you how to create and run an Apache Airflow workflow in Cloud Composer that completes the following tasks:
- Creates a Cloud Dataproc cluster
- Runs an Apache Hadoop wordcount job on the cluster, and outputs its results to Cloud Storage
- Deletes the cluster
This page also shows you how to access your Cloud Composer environment through the Google Cloud Platform Console, Cloud SDK, and Airflow web interface.
Before you begin
-
Sign in to your Google Account.
If you don't already have one, sign up for a new account.
-
Select or create a Google Cloud Platform project.
- Enable the Cloud Composer, Cloud Dataproc, and Cloud Storage APIs.
- To run
gcloud composercommands in a local terminal window, install the Cloud SDK on your client machine. You can also run the commands in Cloud Shell, in which the Cloud SDK is preinstalled. - Create a Cloud Composer environment
and wait until environment creation completes.
Open the Create Environment page
It takes up to one hour to deploy the Airflow web interface and complete the environment creation process. The environment creation process is completed when the green checkmark displays to the left of the environment name on the Environments page in the GCP Console.
- In your project, create a Cloud Storage bucket of any storage class and region. This bucket stores the results of the Hadoop wordcount job.
- Note the bucket path, such as
gs://my-bucket, to use later in a variable definition.
Viewing environment information
In the GCP Console, open the Environments page.
Click the name of the environment to see its details.
The Environment details page provides information, such as the Airflow web interface URL, Google Kubernetes Engine cluster ID, name of the Cloud Storage bucket, and path for the
/dagsfolder.In Airflow, a DAG is a collection of organized tasks that you want to schedule and run. DAGs, also called workflows, are defined in standard Python files. Cloud Composer only schedules the DAGs in the
/dagsfolder. The/dagsfolder is in the Cloud Storage bucket that Cloud Composer creates automatically when you create your environment.Note the zone in which you created your environment to use later in a variable definition.
Setting Airflow variables
Airflow variables are an Airflow-specific concept that is distinct from
environment variables. In this step, you'll set the following
three Airflow variables:
gcp_project, gcs_bucket, and gce_zone.
Using gcloud to set variables
To set Airflow variables using the gcloud command-line tool,
use the gcloud composer environments run command with the
variables sub-command. This gcloud composer command executes the
Airflow CLI sub-command variables.
The sub-command passes the arguments to the gcloud command line tool.
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION variables -- \
--set KEY VALUE
where:
ENVIRONMENT_NAMEis the name of the environment.LOCATIONis the Compute Engine region where the environment is located. Thegcloud composercommand requires including the--locationflag or setting the default location before running thegcloudcommand.KEYandVALUEspecify the variable and its value to set. Include a space two dashes space (--) between the left-sidegcloudcommand with gcloud-related arguments and the right-side Airflow sub-command-related arguments. Also include a space between theKEYandVALUEarguments.
To set the three variables, run the gcloud composer command once for
each row in the following table:
| KEY | VALUE | Details |
|---|---|---|
| gcp_project | your project-id | The Google Cloud Platform project you're using for this quickstart. |
| gcs_bucket | gs://my-bucket | The Cloud Storage bucket you created for this quickstart. |
| gce_zone | The Compute Engine zone for your environment, such as us-central1-a.
This is the zone where your Cloud Dataproc cluster will be created. |
See Available regions & zones. |
For example:
gcloud composer environments run test-environment
--location us-central1 variables -- --set gcp_project my-project-id
Output similar to the following displays:
kubeconfig entry generated for us-central1-test-environment-a3099dde-gke.
[2018-12-11 05:49:17,222] {settings.py:176} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2018-12-11 05:49:19,069] {default_celery.py:80} WARNING - You have configured a result_backend of redis://airflow-redis-service:6379/0, it is highly recommended to use an alternative resultbackend (i.e. a database).
[2018-12-11 05:49:19,081] {init_.py:51} INFO - Using executor CeleryExecutor
Using gcloud to view a variable
To see the value of a variable, run the Airflow CLI sub-command variables
with the get argument or use the Airflow UI.
For example:
gcloud composer environments run test-environment
--location us-central1 variables -- --get gcs_bucket
Viewing the sample workflow
Cloud Composer workflows are comprised of
DAGs (Directed Acyclic Graphs).
The code shown in quickstart.py is the workflow code.
To orchestrate the three workflow tasks, the DAG imports the following operators:
DataprocClusterCreateOperator: Creates a Cloud Dataproc cluster.DataProcHadoopOperator: Submits a Hadoop wordcount job and writes results to a Cloud Storage bucket.DataprocClusterDeleteOperator: Deletes the cluster to avoid incurring ongoing Compute Engine charges.
The tasks run sequentially.
The name of the DAG is composer_sample_quickstart, and the DAG runs once each day.
Because the start_date that is passed in to default_dag_args is
set to yesterday, Cloud Composer schedules the workflow to start
immediately after the DAG uploads.
Uploading the DAG to Cloud Storage
To upload the DAG:
Copy and save
quickstart.pyon your local client machine.Upload your local copy of
quickstart.pyto thedags/folder in the Cloud Storage bucket for your Cloud Composer environment.
For example:
gcloud composer environments storage dags import \ --environment my-environment --location us-central1 \ --source test-dags/quickstart.py
Cloud Composer adds the DAG to Airflow and schedules the DAG automatically. DAG changes occur within 3-5 minutes. You can see task status in the Airflow web interface.
Using the Airflow UI
To access the Airflow web interface using the GCP Console:
Open the Environments page.
In the Airflow webserver column for the environment, click the new window icon. The Airflow web UI opens in a new browser window.
For information about the Airflow UI, see Accessing the web interface.
Viewing Variables
The variables you set earlier are
persisted in your environment. You can view the variables by selecting
Admin > Variables from the Airflow UI menu bar.
Exploring DAG runs
When you upload your DAG file to the dags folder in
Cloud Storage, Cloud Composer parses the file. If no errors are
found, the name of the workflow appears in the DAG listing, and the workflow is
queued to run immediately.
Click composer_sample_quickstart to open the DAG details page. This page includes
a graphical representation of workflow tasks and dependencies.
Now, in the toolbar, click Graph View and then mouseover the graphic for each task to see its status. Note that the border around each task also indicates the status (green border = running; red = failed, etc.).
To run the workflow again from the Graph View:
- In the Airflow UI Graph View, click the
create_dataproc_clustergraphic. - Click Clear to reset the three tasks and then click OK to confirm.
- Click
create_dataproc_clusteragain in Graph View. - Select Run to re-queue the workflow.
You can also check the status and results of the composer-sample-quickstart workflow
by going to the following GCP Console pages:
Cloud Dataproc Clusters to monitor cluster creation and deletion. Note that the cluster created by the workflow is ephemeral: it only exists for the duration of the workflow and is deleted as part of the last workflow task.
Cloud Dataproc Jobs to view or monitor the Apache Hadoop wordcount job. Click the Job ID to see job log output.
Cloud Storage Browser to see the results of the wordcount in the
wordcountfolder in the Cloud Storage bucket you created for this quickstart.
Clean up
To avoid incurring charges to your GCP account for the resources used in this quickstart:
- (Optional) To save your data, download the data from the Cloud Storage bucket for the Cloud Composer environment and the storage bucket you created for this quickstart.
- Delete the Cloud Storage bucket you created for this quickstart.
- Delete the Cloud Storage bucket for the environment.
- Delete the Cloud Composer environment. Note that deleting your environment does not delete the storage bucket for the environment.
- (Optional) Delete the project.
- In the GCP Console, go to the Projects page.
- In the project list, select the project you want to delete and click Delete delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
What's next
- Read the Apache Airflow documentation.
- See the template-based dataproc_operator.py on GitHub.
- Learn about Cloud Composer features.


