This project deploys a solution for a recommendation service on GCP, using the WALS algorithm in TensorFlow. Components include:
%%bash
pip install sh --upgrade pip # needed to execute shell scripts later
import os
PROJECT = 'my-project' # REPLACE WITH YOUR PROJECT ID
REGION = 'us-central1' # REPLACE WITH YOUR REGION e.g. us-central1
# do not change these
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = 'recserve_' + PROJECT
os.environ['REGION'] = REGION
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION
%%bash
# create GCS bucket with recserve_PROJECT_NAME if not exists
exists=$(gsutil ls -d | grep -w gs://${BUCKET}/)
if [ -n "$exists" ]; then
echo "Not creating recserve_bucket since it already exists."
else
echo "Creating recserve_bucket"
gsutil mb -l ${REGION} gs://${BUCKET}
fi
In IAM, change permissions for "Compute Engine default service account" from Editor to Owner. This is required so you can create and deploy App Engine versions from within Cloud Datalab. Note: the alternative is to run all app engine commands directly in Cloud Shell instead of from within Cloud Datalab.
Create an App Engine instance if you have not already by uncommenting and running the below code
# %%bash
# run app engine creation commands
# gcloud app create --region ${REGION} # see: https://cloud.google.com/compute/docs/regions-zones/
# gcloud app update --no-split-health-checks
This tutorial comes with a sample Google Analytics data set, containing page tracking events from the Austrian news site Kurier.at. The schema file '''ga_sessions_sample_schema.json''' is located in the folder data in the tutorial code, and the data file '''ga_sessions_sample.json.gz''' is located in a public Cloud Storage bucket associated with this tutorial. To upload this data set to BigQuery:
%%bash
gsutil -m cp gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/ga_sessions_sample.json.gz gs://${BUCKET}/data/ga_sessions_sample.json.gz
gsutil -m cp gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/recommendation_events.csv data/recommendation_events.csv
gsutil -m cp gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/recommendation_events.csv gs://${BUCKET}/data/recommendation_events.csv
Note: Ingesting the 400K rows of sample data. This usually takes 5-7 minutes.
%%bash
# create BigQuery dataset if it doesn't already exist
exists=$(bq ls -d | grep -w GA360_test)
if [ -n "$exists" ]; then
echo "Not creating GA360_test since it already exists."
else
echo "Creating GA360_test dataset."
bq --project_id=${PROJECT} mk GA360_test
fi
# create the schema and load our sample Google Analytics session data
bq load --source_format=NEWLINE_DELIMITED_JSON \
GA360_test.ga_sessions_sample \
gs://${BUCKET}/data/ga_sessions_sample.json.gz \
data/ga_sessions_sample_schema.json # can't load schema files from GCS
%%bash
cd wals_ml_engine
echo "creating distributable package"
python setup.py sdist
echo "copying ML package to bucket"
gsutil cp dist/wals_ml_engine-0.1.tar.gz gs://${BUCKET}/code/
%%bash
# view the ML train local script before running
cat wals_ml_engine/mltrain.sh
%%bash
cd wals_ml_engine
# train locally with unoptimized hyperparams
./mltrain.sh local ../data/recommendation_events.csv --data-type web_views --use-optimized
# Options if we wanted to train on CMLE. We will do this with Cloud Composer later
# train on ML Engine with optimized hyperparams
# ./mltrain.sh train ../data/recommendation_events.csv --data-type web_views --use-optimized
# tune hyperparams on ML Engine:
# ./mltrain.sh tune ../data/recommendation_events.csv --data-type web_views
This will take a couple minutes, and create a job directory under wals_ml_engine/jobs like "wals_ml_local_20180102_012345/model", containing the model files saved as numpy arrays.
ls wals_ml_engine/jobs
In the case of multiple models, take the most recent (tail -1)
%%bash
export JOB_MODEL=$(find wals_ml_engine/jobs -name "model" | tail -1)
gsutil cp ${JOB_MODEL}/* gs://${BUCKET}/model/
echo "Recommendation model file numpy arrays in bucket:"
gsutil ls gs://${BUCKET}/model/
%%bash
cd scripts
cat prepare_deploy_api.sh
%%bash
printf "\nCopy and run the deploy script generated below:\n"
cd scripts
./prepare_deploy_api.sh # Prepare config file for the API.
This will output somthing like:
To deploy: gcloud endpoints services deploy /var/folders/1m/r3slmhp92074pzdhhfjvnw0m00dhhl/T/tmp.n6QVl5hO.yaml
Be sure to __replace the below [FILE_NAME]__ with the results from above before running.
%%bash
#gcloud endpoints services deploy [REPLACE_WITH_TEMP_FILE_NAME.yaml]
gcloud endpoints services deploy /tmp/tmp.2QluUYw9AH.yaml
%%bash
# view the app deployment script
cat scripts/prepare_deploy_app.sh
%%bash
# prepare to deploy
cd scripts
./prepare_deploy_app.sh
You can ignore the script output "ERROR: (gcloud.app.create) The project [...] already contains an App Engine application. You can deploy your application using gcloud app deploy." This is expected.
The script will output something like:
To deploy: gcloud -q app deploy app/app_template.yaml_deploy.yaml
%%bash
gcloud -q app deploy app/app_template.yaml_deploy.yaml
This will take 7 - 10 minutes to deploy the app. While you wait, consider starting on Part Two below and completing the Cloud Composer DAG file.
Lastly, you are able to test the recommendation model API by submitting a query request. Note the example userId passed and numRecs desired as the URL parameters for the model input.
%%bash
cd scripts
./query_api.sh # Query the API.
#./generate_traffic.sh # Send traffic to the API.
If the call is successful, you will see the article IDs recommended for that specific user by the WALS ML model
(Example: curl "https://qwiklabs-gcp-12345.appspot.com/recommendation?userId=5448543647176335931&numRecs=5"
{"articles":["299824032","1701682","299935287","299959410","298157062"]} )
Part One is done! You have successfully created the back-end architecture for serving your ML recommendation system. But we're not done yet, we still need to automatically retrain and redeploy our model once new data comes in. For that we will use Cloud Composer and Apache Airflow.
In this section you will complete a partially written training.py DAG file and copy it to the DAGS folder in your Composer instance.
AIRFLOW_BUCKET = 'us-central1-mlcomposer-6e2cceb4-bucket' # REPLACE WITH AIRFLOW BUCKET NAME
os.environ['AIRFLOW_BUCKET'] = AIRFLOW_BUCKET
Apache Airflow orchestrates tasks out to other services through a DAG (Directed Acyclic Graph) file which specifies what services to call, what to do, and when to run these tasks. DAG files are written in python and are loaded automatically into Airflow once present in the Airflow/dags/ folder in your Cloud Composer bucket.
Your task is to complete the partially written DAG file below which will enable the automatic retraining and redeployment of our WALS recommendation model.
Complete the #TODOs in the Airflow DAG file below and execute the code block to save the file
%%writefile airflow/dags/training.py
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""DAG definition for recserv model training."""
import airflow
from airflow import DAG
# Reference for all available airflow operators:
# https://github.com/apache/incubator-airflow/tree/master/airflow/contrib/operators
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.hooks.base_hook import BaseHook
# from airflow.contrib.operators.mlengine_operator import MLEngineTrainingOperator
# above mlengine_operator currently doesnt support custom MasterType so we import our own plugins:
# custom plugins
from airflow.operators.app_engine_admin_plugin import AppEngineVersionOperator
from airflow.operators.ml_engine_plugin import MLEngineTrainingOperator
import datetime
def _get_project_id():
"""Get project ID from default GCP connection."""
extras = BaseHook.get_connection('google_cloud_default').extra_dejson
key = 'extra__google_cloud_platform__project'
if key in extras:
project_id = extras[key]
else:
raise ('Must configure project_id in google_cloud_default '
'connection from Airflow Console')
return project_id
PROJECT_ID = _get_project_id()
# Data set constants, used in BigQuery tasks. You can change these
# to conform to your data.
# TODO: Specify your BigQuery dataset name and table name
DATASET = 'GA360_test'
TABLE_NAME = 'ga_sessions_sample'
ARTICLE_CUSTOM_DIMENSION = '10'
# TODO: Confirm bucket name and region
# GCS bucket names and region, can also be changed.
BUCKET = 'gs://recserve_' + PROJECT_ID
REGION = 'us-east1'
# The code package name comes from the model code in the wals_ml_engine
# directory of the solution code base.
PACKAGE_URI = BUCKET + '/code/wals_ml_engine-0.1.tar.gz'
JOB_DIR = BUCKET + '/jobs'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 5,
'retry_delay': datetime.timedelta(minutes=5)
}
# Default schedule interval using cronjob syntax - can be customized here
# or in the Airflow console.
# TODO: Specify a schedule interval in CRON syntax to run once a day at 2100 hours (9pm)
# Reference: https://airflow.apache.org/scheduler.html
schedule_interval = '00 21 * * *'
# TODO: Title your DAG to be recommendations_training_v1
dag = DAG('recommendations_training_v1',
default_args=default_args,
schedule_interval=schedule_interval)
dag.doc_md = __doc__
#
#
# Task Definition
#
#
# BigQuery training data query
bql='''
#legacySql
SELECT
fullVisitorId as clientId,
ArticleID as contentId,
(nextTime - hits.time) as timeOnPage,
FROM(
SELECT
fullVisitorId,
hits.time,
MAX(IF(hits.customDimensions.index={0},
hits.customDimensions.value,NULL)) WITHIN hits AS ArticleID,
LEAD(hits.time, 1) OVER (PARTITION BY fullVisitorId, visitNumber
ORDER BY hits.time ASC) as nextTime
FROM [{1}.{2}.{3}]
WHERE hits.type = "PAGE"
) HAVING timeOnPage is not null and contentId is not null;
'''
bql = bql.format(ARTICLE_CUSTOM_DIMENSION, PROJECT_ID, DATASET, TABLE_NAME)
# TODO: Complete the BigQueryOperator task to truncate the table if it already exists before writing
# Reference: https://airflow.apache.org/integration.html#bigqueryoperator
t1 = BigQueryOperator(
task_id='bq_rec_training_data',
bql=bql,
destination_dataset_table='%s.recommendation_events' % DATASET,
write_disposition='WRITE_TRUNCATE', # specify to truncate on writes
dag=dag)
# BigQuery training data export to GCS
# TODO: Fill in the missing operator name for task #2 which
# takes a BigQuery dataset and table as input and exports it to GCS as a CSV
training_file = BUCKET + '/data/recommendation_events.csv'
t2 = BigQueryToCloudStorageOperator(
task_id='bq_export_op',
source_project_dataset_table='%s.recommendation_events' % DATASET,
destination_cloud_storage_uris=[training_file],
export_format='CSV',
dag=dag
)
# ML Engine training job
job_id = 'recserve_{0}'.format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
job_dir = BUCKET + '/jobs/' + job_id
output_dir = BUCKET
training_args = ['--job-dir', job_dir,
'--train-files', training_file,
'--output-dir', output_dir,
'--data-type', 'web_views',
'--use-optimized']
# TODO: Fill in the missing operator name for task #3 which will
# start a new training job to Cloud ML Engine
# Reference: https://airflow.apache.org/integration.html#cloud-ml-engine
# https://cloud.google.com/ml-engine/docs/tensorflow/machine-types
t3 = MLEngineTrainingOperator(
task_id='ml_engine_training_op',
project_id=PROJECT_ID,
job_id=job_id,
package_uris=[PACKAGE_URI],
training_python_module='trainer.task',
training_args=training_args,
region=REGION,
scale_tier='CUSTOM',
master_type='complex_model_m_gpu',
dag=dag
)
# App Engine deploy new version
t4 = AppEngineVersionOperator(
task_id='app_engine_deploy_version',
project_id=PROJECT_ID,
service_id='default',
region=REGION,
service_spec=None,
dag=dag
)
# TODO: Be sure to set_upstream dependencies for all tasks
t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)
%%bash
gsutil cp airflow/dags/training.py gs://${AIRFLOW_BUCKET}/dags # overwrite if it exists
gsutil cp -r airflow/plugins gs://${AIRFLOW_BUCKET} # copy custom plugins
Navigate to your Cloud Composer instance
Trigger a manual run of your DAG for testing
Ensure your DAG runs successfully (all nodes outlined in dark green and 'success' tag shows)
DAG not executing successfully? Follow these below steps to troubleshoot.
Click on the name of a DAG to view a run (ex: recommendations_training_v1)
Tips:
You have made it to the end of the end-to-end recommendation system lab. You have successfully setup an automated workflow to retrain and redeploy your recommendation model.
Looking to solidify your Cloud Composer skills even more? Complete the optional challenges below
Use either the BigQueryCheckOperator or the BigQueryValueCheckOperator to create a new task in your DAG that ensures the SQL query for training data is returning valid results before it is passed to Cloud ML Engine for training.
Hint: Check for COUNT() = 0 or other health check
Create a Cloud Function to automatically trigger your DAG when a new recommendation_events.csv file is loaded into your Google Cloud Storage Bucket.
Hint: Check the composer_gcf_trigger.ipynb lab for inspiration
Modify the BigQuery query in the DAG to only train on a portion of the data available in the dataset using a WHERE clause filtering on date. Next, parameterize the WHERE clause to be based on when the Airflow DAG is run
Hint: Make use of prebuilt Airflow macros like the below:
constants or can be dynamic based on Airflow macros
max_query_date = '2018-02-01' # {{ macros.ds_add(ds, -7) }}
min_query_date = '2018-01-01' # {{ macros.ds_add(ds, -1) }}