Collaborative filtering on Google Analytics data

This notebook demonstrates how to implement a WALS matrix refactorization approach to do collaborative filtering.

In [1]:
import os
PROJECT = "cloud-training-demos" # REPLACE WITH YOUR PROJECT ID
BUCKET = "cloud-training-demos-ml" # REPLACE WITH YOUR BUCKET NAME
REGION = "us-central1" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1

# Do not change these
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "1.13"
In [2]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION
Updated property [core/project].
Updated property [compute/region].
In [3]:
import tensorflow as tf
print(tf.__version__)
1.13.1

Create raw dataset

For collaborative filtering, we don't need to know anything about either the users or the content. Essentially, all we need to know is userId, itemId, and rating that the particular user gave the particular item.

In this case, we are working with newspaper articles. The company doesn't ask their users to rate the articles. However, we can use the time-spent on the page as a proxy for rating.

Normally, we would also add a time filter to this ("latest 7 days"), but our dataset is itself limited to a few days.

In [4]:
from google.cloud import bigquery
bq = bigquery.Client(project = PROJECT)

sql = """
#standardSQL
WITH CTE_visitor_page_content AS (
    SELECT
        fullVisitorID,
        (SELECT MAX(IF(index=10, value, NULL)) FROM UNNEST(hits.customDimensions)) AS latestContentId,  
        (LEAD(hits.time, 1) OVER (PARTITION BY fullVisitorId ORDER BY hits.time ASC) - hits.time) AS session_duration 
    FROM
        `cloud-training-demos.GA360_test.ga_sessions_sample`,   
        UNNEST(hits) AS hits
    WHERE 
        # only include hits on pages
        hits.type = "PAGE"

    GROUP BY   
        fullVisitorId,
        latestContentId,
        hits.time )

-- Aggregate web stats
SELECT   
    fullVisitorID as visitorId,
    latestContentId as contentId,
    SUM(session_duration) AS session_duration
FROM
    CTE_visitor_page_content
WHERE
    latestContentId IS NOT NULL 
GROUP BY
    fullVisitorID, 
    latestContentId
HAVING 
    session_duration > 0
ORDER BY 
    latestContentId 
"""

df = bq.query(sql).to_dataframe()
df.head()
Out[4]:
visitorId contentId session_duration
0 7337153711992174438 100074831 44652
1 5190801220865459604 100170790 1214205
2 2293633612703952721 100510126 47744
3 5874973374932455844 100510126 32109
4 1173698801255170595 100676857 10512
In [5]:
stats = df.describe()
stats
Out[5]:
session_duration
count 2.795940e+05
mean 1.269843e+05
std 2.344354e+05
min 1.000000e+00
25% 1.698800e+04
50% 5.771350e+04
75% 1.291545e+05
max 7.690598e+06
In [6]:
df[["session_duration"]].plot(kind="hist", logy=True, bins=100, figsize=[8,5])
Out[6]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fb2540afe48>
In [7]:
# The rating is the session_duration scaled to be in the range 0-1.  This will help with training.
median = stats.loc["50%", "session_duration"]
df["rating"] = 0.3 * df["session_duration"] / median
df.loc[df["rating"] > 1, "rating"] = 1
df[["rating"]].plot(kind="hist", logy=True, bins=100, figsize=[8,5])
Out[7]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fb245056470>
In [8]:
del df["session_duration"]
In [9]:
%%bash
rm -rf data
mkdir data
In [10]:
df.to_csv(path_or_buf = "data/collab_raw.csv", index = False, header = False)
In [11]:
!head data/collab_raw.csv
7337153711992174438,100074831,0.2321051400452234
5190801220865459604,100170790,1.0
2293633612703952721,100510126,0.2481776360816793
5874973374932455844,100510126,0.16690549004998828
1173698801255170595,100676857,0.05464232805149575
883397426232997550,10083328,0.9487035095774818
1808867070685560283,100906145,1.0
7615995624631762562,100906145,0.48418654214351925
5519169380728479914,100915139,0.20026163722525925
3427736932800080345,100950628,0.558924688331153

Create dataset for WALS

The raw dataset (above) won't work for WALS:

  1. The userId and itemId have to be 0,1,2 ... so we need to create a mapping from visitorId (in the raw data) to userId and contentId (in the raw data) to itemId.
  2. We will need to save the above mapping to a file because at prediction time, we'll need to know how to map the contentId in the table above to the itemId.
  3. We'll need two files: a "rows" dataset where all the items for a particular user are listed; and a "columns" dataset where all the users for a particular item are listed.

Mapping

In [12]:
import pandas as pd
import numpy as np
def create_mapping(values, filename):
    with open(filename, 'w') as ofp:
        value_to_id = {value:idx for idx, value in enumerate(values.unique())}
        for value, idx in value_to_id.items():
            ofp.write("{},{}\n".format(value, idx))
    return value_to_id

df = pd.read_csv(filepath_or_buffer = "data/collab_raw.csv",
                 header = None,
                 names = ["visitorId", "contentId", "rating"],
                dtype = {"visitorId": str, "contentId": str, "rating": np.float})
df.to_csv(path_or_buf = "data/collab_raw.csv", index = False, header = False)
user_mapping = create_mapping(df["visitorId"], "data/users.csv")
item_mapping = create_mapping(df["contentId"], "data/items.csv")
In [13]:
!head -3 data/*.csv
==> data/collab_raw.csv <==
7337153711992174438,100074831,0.2321051400452234
5190801220865459604,100170790,1.0
2293633612703952721,100510126,0.2481776360816793

==> data/items.csv <==
727741,5272
179038175,626
299458287,4513

==> data/users.csv <==
6319375062712956077,33748
7933447845885715412,47057
5774017011910110015,76528
In [14]:
df["userId"] = df["visitorId"].map(user_mapping.get)
df["itemId"] = df["contentId"].map(item_mapping.get)
In [15]:
mapped_df = df[["userId", "itemId", "rating"]]
mapped_df.to_csv(path_or_buf = "data/collab_mapped.csv", index = False, header = False)
mapped_df.head()
Out[15]:
userId itemId rating
0 0 0 0.232105
1 1 1 1.000000
2 2 2 0.248178
3 3 2 0.166905
4 4 3 0.054642

Creating rows and columns datasets

In [16]:
import pandas as pd
import numpy as np
mapped_df = pd.read_csv(filepath_or_buffer = "data/collab_mapped.csv", header = None, names = ["userId", "itemId", "rating"])
mapped_df.head()
Out[16]:
userId itemId rating
0 0 0 0.232105
1 1 1 1.000000
2 2 2 0.248178
3 3 2 0.166905
4 4 3 0.054642
In [17]:
NITEMS = np.max(mapped_df["itemId"]) + 1
NUSERS = np.max(mapped_df["userId"]) + 1
mapped_df["rating"] = np.round(mapped_df["rating"].values, 2)
print("{} items, {} users, {} interactions".format( NITEMS, NUSERS, len(mapped_df) ))
5721 items, 82902 users, 279594 interactions
In [18]:
grouped_by_items = mapped_df.groupby("itemId")
iter = 0
for item, grouped in grouped_by_items:
    print(item, grouped["userId"].values, grouped["rating"].values)
    iter = iter + 1
    if iter > 5:
        break
0 [0] [0.23]
1 [1] [1.]
2 [2 3] [0.25 0.17]
3 [4] [0.05]
4 [5] [0.95]
5 [6 7] [1.   0.48]
In [19]:
import tensorflow as tf
grouped_by_items = mapped_df.groupby("itemId")
with tf.python_io.TFRecordWriter("data/users_for_item") as ofp:
    for item, grouped in grouped_by_items:
        example = tf.train.Example(features = tf.train.Features(feature = {
            "key": tf.train.Feature(int64_list = tf.train.Int64List(value = [item])),
            "indices": tf.train.Feature(int64_list = tf.train.Int64List(value = grouped["userId"].values)),
            "values": tf.train.Feature(float_list = tf.train.FloatList(value = grouped["rating"].values))
        }))
        ofp.write(example.SerializeToString())
In [20]:
grouped_by_users = mapped_df.groupby("userId")
with tf.python_io.TFRecordWriter("data/items_for_user") as ofp:
    for user, grouped in grouped_by_users:
        example = tf.train.Example(features = tf.train.Features(feature = {
            "key": tf.train.Feature(int64_list = tf.train.Int64List(value = [user])),
            "indices": tf.train.Feature(int64_list = tf.train.Int64List(value = grouped["itemId"].values)),
            "values": tf.train.Feature(float_list = tf.train.FloatList(value = grouped["rating"].values))
        }))
        ofp.write(example.SerializeToString())
In [21]:
!ls -lrt data
total 31908
-rw-r--r-- 1 jupyter jupyter 13152765 Jul 31 20:41 collab_raw.csv
-rw-r--r-- 1 jupyter jupyter  2134511 Jul 31 20:41 users.csv
-rw-r--r-- 1 jupyter jupyter    82947 Jul 31 20:41 items.csv
-rw-r--r-- 1 jupyter jupyter  7812739 Jul 31 20:41 collab_mapped.csv
-rw-r--r-- 1 jupyter jupyter  2252828 Jul 31 20:41 users_for_item
-rw-r--r-- 1 jupyter jupyter  7217822 Jul 31 20:41 items_for_user

To summarize, we created the following data files from collab_raw.csv:

  1. ```collab_mapped.csv``` is essentially the same data as in ```collab_raw.csv``` except that ```visitorId``` and ```contentId``` which are business-specific have been mapped to ```userId``` and ```itemId``` which are enumerated in 0,1,2,.... The mappings themselves are stored in ```items.csv``` and ```users.csv``` so that they can be used during inference.
  2. ```users_for_item``` contains all the users/ratings for each item in TFExample format
  3. ```items_for_user``` contains all the items/ratings for each user in TFExample format

Train with WALS

Once you have the dataset, do matrix factorization with WALS using the WALSMatrixFactorization in the contrib directory. This is an estimator model, so it should be relatively familiar.

As usual, we write an input_fn to provide the data to the model, and then create the Estimator to do train_and_evaluate. Because it is in contrib and hasn't moved over to tf.estimator yet, we use tf.contrib.learn.Experiment to handle the training loop.

In [22]:
import os
import tensorflow as tf
from tensorflow.python.lib.io import file_io
from tensorflow.contrib.factorization import WALSMatrixFactorization
  
def read_dataset(mode, args):
    def decode_example(protos, vocab_size):
        features = {
            "key": tf.FixedLenFeature(shape = [1], dtype = tf.int64),
            "indices": tf.VarLenFeature(dtype = tf.int64),
            "values": tf.VarLenFeature(dtype = tf.float32)}
        parsed_features = tf.parse_single_example(serialized = protos, features = features)
        values = tf.sparse_merge(sp_ids = parsed_features["indices"], sp_values = parsed_features["values"], vocab_size = vocab_size)
        # Save key to remap after batching
        # This is a temporary workaround to assign correct row numbers in each batch.
        # You can ignore details of this part and remap_keys().
        key = parsed_features["key"]
        decoded_sparse_tensor = tf.SparseTensor(indices = tf.concat(values = [values.indices, [key]], axis = 0), 
                                                values = tf.concat(values = [values.values, [0.0]], axis = 0), 
                                                dense_shape = values.dense_shape)
        return decoded_sparse_tensor
  
  
    def remap_keys(sparse_tensor):
        # Current indices of our SparseTensor that we need to fix
        bad_indices = sparse_tensor.indices # shape = (current_batch_size * (number_of_items/users[i] + 1), 2)
        # Current values of our SparseTensor that we need to fix
        bad_values = sparse_tensor.values # shape = (current_batch_size * (number_of_items/users[i] + 1),)

        # Since batch is ordered, the last value for a batch index is the user
        # Find where the batch index chages to extract the user rows
        # 1 where user, else 0
        user_mask = tf.concat(values = [bad_indices[1:,0] - bad_indices[:-1,0], tf.constant(value = [1], dtype = tf.int64)], axis = 0) # shape = (current_batch_size * (number_of_items/users[i] + 1), 2)

        # Mask out the user rows from the values
        good_values = tf.boolean_mask(tensor = bad_values, mask = tf.equal(x = user_mask, y = 0)) # shape = (current_batch_size * number_of_items/users[i],)
        item_indices = tf.boolean_mask(tensor = bad_indices, mask = tf.equal(x = user_mask, y = 0)) # shape = (current_batch_size * number_of_items/users[i],)
        user_indices = tf.boolean_mask(tensor = bad_indices, mask = tf.equal(x = user_mask, y = 1))[:, 1] # shape = (current_batch_size,)

        good_user_indices = tf.gather(params = user_indices, indices = item_indices[:,0]) # shape = (current_batch_size * number_of_items/users[i],)

        # User and item indices are rank 1, need to make rank 1 to concat
        good_user_indices_expanded = tf.expand_dims(input = good_user_indices, axis = -1) # shape = (current_batch_size * number_of_items/users[i], 1)
        good_item_indices_expanded = tf.expand_dims(input = item_indices[:, 1], axis = -1) # shape = (current_batch_size * number_of_items/users[i], 1)
        good_indices = tf.concat(values = [good_user_indices_expanded, good_item_indices_expanded], axis = 1) # shape = (current_batch_size * number_of_items/users[i], 2)

        remapped_sparse_tensor = tf.SparseTensor(indices = good_indices, values = good_values, dense_shape = sparse_tensor.dense_shape)
        return remapped_sparse_tensor

    
    def parse_tfrecords(filename, vocab_size):
        if mode == tf.estimator.ModeKeys.TRAIN:
            num_epochs = None # indefinitely
        else:
            num_epochs = 1 # end-of-input after this

        files = tf.gfile.Glob(filename = os.path.join(args["input_path"], filename))

        # Create dataset from file list
        dataset = tf.data.TFRecordDataset(files)
        dataset = dataset.map(map_func = lambda x: decode_example(x, vocab_size))
        dataset = dataset.repeat(count = num_epochs)
        dataset = dataset.batch(batch_size = args["batch_size"])
        dataset = dataset.map(map_func = lambda x: remap_keys(x))
        return dataset.make_one_shot_iterator().get_next()
  
    def _input_fn():
        features = {
            WALSMatrixFactorization.INPUT_ROWS: parse_tfrecords("items_for_user", args["nitems"]),
            WALSMatrixFactorization.INPUT_COLS: parse_tfrecords("users_for_item", args["nusers"]),
            WALSMatrixFactorization.PROJECT_ROW: tf.constant(True)
        }
        return features, None

    return _input_fn

This code is helpful in developing the input function. You don't need it in production.

In [24]:
def try_out():
    with tf.Session() as sess:
        fn = read_dataset(
            mode = tf.estimator.ModeKeys.EVAL, 
            args = {"input_path": "data", "batch_size": 4, "nitems": NITEMS, "nusers": NUSERS})
        feats, _ = fn()
        
        print(feats["input_rows"].eval())
        print(feats["input_rows"].eval())

try_out()
SparseTensorValue(indices=array([[   0,    0],
       [   0, 3522],
       [   0, 3583],
       [   1,    1],
       [   1, 2359],
       [   1, 3133],
       [   1, 4864],
       [   1, 4901],
       [   1, 4906],
       [   1, 5667],
       [   2,    2],
       [   3,    2],
       [   3, 1467]]), values=array([0.23, 0.05, 0.18, 1.  , 0.11, 0.55, 0.3 , 0.72, 0.46, 0.3 , 0.25,
       0.17, 0.13], dtype=float32), dense_shape=array([   4, 5721]))
SparseTensorValue(indices=array([[   4,    3],
       [   5,    4],
       [   5, 5042],
       [   5, 5525],
       [   5, 5553],
       [   6,    5],
       [   7,    5]]), values=array([0.05, 0.95, 0.63, 1.  , 0.16, 1.  , 0.48], dtype=float32), dense_shape=array([   4, 5721]))
In [25]:
def find_top_k(user, item_factors, k):
    all_items = tf.matmul(a = tf.expand_dims(input = user, axis = 0), b = tf.transpose(a = item_factors))
    topk = tf.nn.top_k(input = all_items, k = k)
    return tf.cast(x = topk.indices, dtype = tf.int64)
    
def batch_predict(args):
    import numpy as np
    with tf.Session() as sess:
        estimator = tf.contrib.factorization.WALSMatrixFactorization(
            num_rows = args["nusers"], 
            num_cols = args["nitems"],
            embedding_dimension = args["n_embeds"],
            model_dir = args["output_dir"])
        
        # This is how you would get the row factors for out-of-vocab user data
        # row_factors = list(estimator.get_projections(input_fn=read_dataset(tf.estimator.ModeKeys.EVAL, args)))
        # user_factors = tf.convert_to_tensor(np.array(row_factors))

        # But for in-vocab data, the row factors are already in the checkpoint
        user_factors = tf.convert_to_tensor(value = estimator.get_row_factors()[0]) # (nusers, nembeds)
        # In either case, we have to assume catalog doesn"t change, so col_factors are read in
        item_factors = tf.convert_to_tensor(value = estimator.get_col_factors()[0])# (nitems, nembeds)

        # For each user, find the top K items
        topk = tf.squeeze(input = tf.map_fn(fn = lambda user: find_top_k(user, item_factors, args["topk"]), elems = user_factors, dtype = tf.int64))
        with file_io.FileIO(os.path.join(args["output_dir"], "batch_pred.txt"), mode = 'w') as f:
            for best_items_for_user in topk.eval():
                f.write(",".join(str(x) for x in best_items_for_user) + '\n')

def train_and_evaluate(args):
    train_steps = int(0.5 + (1.0 * args["num_epochs"] * args["nusers"]) / args["batch_size"])
    steps_in_epoch = int(0.5 + args["nusers"] / args["batch_size"])
    print("Will train for {} steps, evaluating once every {} steps".format(train_steps, steps_in_epoch))
    def experiment_fn(output_dir):
        return tf.contrib.learn.Experiment(
            tf.contrib.factorization.WALSMatrixFactorization(
                num_rows = args["nusers"], 
                num_cols = args["nitems"],
                embedding_dimension = args["n_embeds"],
                model_dir = args["output_dir"]),
            train_input_fn = read_dataset(tf.estimator.ModeKeys.TRAIN, args),
            eval_input_fn = read_dataset(tf.estimator.ModeKeys.EVAL, args),
            train_steps = train_steps,
            eval_steps = 1,
            min_eval_frequency = steps_in_epoch
        )

    from tensorflow.contrib.learn.python.learn import learn_runner
    learn_runner.run(experiment_fn = experiment_fn, output_dir = args["output_dir"])
    
    batch_predict(args)
In [26]:
import shutil
shutil.rmtree(path = "wals_trained", ignore_errors=True)
train_and_evaluate({
    "output_dir": "wals_trained",
    "input_path": "data/",
    "num_epochs": 0.05,
    "nitems": NITEMS,
    "nusers": NUSERS,

    "batch_size": 512,
    "n_embeds": 10,
    "topk": 3
  })
Will train for 8 steps, evaluating once every 162 steps
WARNING:tensorflow:From <ipython-input-25-4ad1e7c785ce>:49: run (from tensorflow.contrib.learn.python.learn.learn_runner) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.estimator.train_and_evaluate.
WARNING:tensorflow:From /home/jupyter/.local/lib/python3.5/site-packages/tensorflow/contrib/learn/python/learn/estimators/estimator.py:1179: BaseEstimator.__init__ (from tensorflow.contrib.learn.python.learn.estimators.estimator) is deprecated and will be removed in a future version.
Instructions for updating:
Please replace uses of any Estimator from tf.contrib.learn with an Estimator from tf.estimator.*
WARNING:tensorflow:From /home/jupyter/.local/lib/python3.5/site-packages/tensorflow/contrib/learn/python/learn/estimators/estimator.py:427: RunConfig.__init__ (from tensorflow.contrib.learn.python.learn.estimators.run_config) is deprecated and will be removed in a future version.
Instructions for updating:
When switching to tf.estimator.Estimator, use tf.estimator.RunConfig instead.
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_tf_config': gpu_options {
  per_process_gpu_memory_fraction: 1.0
}
, '_task_id': 0, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fb220516da0>, '_save_checkpoints_steps': None, '_task_type': None, '_tf_random_seed': None, '_save_summary_steps': 100, '_num_ps_replicas': 0, '_keep_checkpoint_max': 5, '_log_step_count_steps': 100, '_keep_checkpoint_every_n_hours': 10000, '_environment': 'local', '_eval_distribute': None, '_session_config': None, '_train_distribute': None, '_evaluation_master': '', '_num_worker_replicas': 0, '_device_fn': None, '_master': '', '_protocol': None, '_save_checkpoints_secs': 600, '_model_dir': 'wals_trained'}
WARNING:tensorflow:From <ipython-input-25-4ad1e7c785ce>:45: Experiment.__init__ (from tensorflow.contrib.learn.python.learn.experiment) is deprecated and will be removed in a future version.
Instructions for updating:
Please switch to tf.estimator.train_and_evaluate. You will also have to convert to a tf.estimator.Estimator.
WARNING:tensorflow:From /home/jupyter/.local/lib/python3.5/site-packages/tensorflow/contrib/learn/python/learn/monitors.py:279: BaseMonitor.__init__ (from tensorflow.contrib.learn.python.learn.monitors) is deprecated and will be removed after 2016-12-05.
Instructions for updating:
Monitors are deprecated. Please use tf.train.SessionRunHook.
WARNING:tensorflow:From /home/jupyter/.local/lib/python3.5/site-packages/tensorflow/python/framework/op_def_library.py:263: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Colocations handled automatically by placer.
WARNING:tensorflow:From /home/jupyter/.local/lib/python3.5/site-packages/tensorflow/contrib/factorization/python/ops/wals.py:315: ModelFnOps.__new__ (from tensorflow.contrib.learn.python.learn.estimators.model_fn) is deprecated and will be removed in a future version.
Instructions for updating:
When switching to tf.estimator.Estimator, use tf.estimator.EstimatorSpec. You can use the `estimator_spec` method to create an equivalent one.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into wals_trained/model.ckpt.
INFO:tensorflow:SweepHook running init op.
INFO:tensorflow:SweepHook running prep ops for the row sweep.
INFO:tensorflow:Next fit step starting.
INFO:tensorflow:loss = 96509.96, step = 1
INFO:tensorflow:Next fit step starting.
INFO:tensorflow:Next fit step starting.
INFO:tensorflow:Next fit step starting.
INFO:tensorflow:Next fit step starting.
INFO:tensorflow:Next fit step starting.
INFO:tensorflow:Next fit step starting.
INFO:tensorflow:Next fit step starting.
INFO:tensorflow:Saving checkpoints for 8 into wals_trained/model.ckpt.
INFO:tensorflow:Loss for final step: 110142.75.
WARNING:tensorflow:From /home/jupyter/.local/lib/python3.5/site-packages/tensorflow/python/ops/metrics_impl.py:363: to_float (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.cast instead.
INFO:tensorflow:Starting evaluation at 2019-07-31T20:43:12Z
INFO:tensorflow:Graph was finalized.
WARNING:tensorflow:From /home/jupyter/.local/lib/python3.5/site-packages/tensorflow/python/training/saver.py:1266: checkpoint_exists (from tensorflow.python.training.checkpoint_management) is deprecated and will be removed in a future version.
Instructions for updating:
Use standard file APIs to check for files with this prefix.
INFO:tensorflow:Restoring parameters from wals_trained/model.ckpt-8
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/1]
INFO:tensorflow:Finished evaluation at 2019-07-31-20:43:12
INFO:tensorflow:Saving dict for global step 8: global_step = 8, loss = 96509.96
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_tf_config': gpu_options {
  per_process_gpu_memory_fraction: 1.0
}
, '_task_id': 0, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fb2207064e0>, '_save_checkpoints_steps': None, '_task_type': None, '_tf_random_seed': None, '_save_summary_steps': 100, '_num_ps_replicas': 0, '_keep_checkpoint_max': 5, '_log_step_count_steps': 100, '_keep_checkpoint_every_n_hours': 10000, '_environment': 'local', '_eval_distribute': None, '_session_config': None, '_train_distribute': None, '_evaluation_master': '', '_num_worker_replicas': 0, '_device_fn': None, '_master': '', '_protocol': None, '_save_checkpoints_secs': 600, '_model_dir': 'wals_trained'}
In [27]:
!ls wals_trained
batch_pred.txt			  model.ckpt-0.index
checkpoint			  model.ckpt-0.meta
eval				  model.ckpt-8.data-00000-of-00001
events.out.tfevents.1564605788.r  model.ckpt-8.index
graph.pbtxt			  model.ckpt-8.meta
model.ckpt-0.data-00000-of-00001
In [28]:
!head wals_trained/batch_pred.txt
284,5609,36
284,2754,42
284,3168,534
2621,5528,2694
4409,5295,343
5161,3267,3369
5479,1335,55
5479,1335,55
4414,284,5572
284,241,2359

Run as a Python module

Let's run it as Python module for just a few steps.

In [33]:
os.environ["NITEMS"] = str(NITEMS)
os.environ["NUSERS"] = str(NUSERS)
In [34]:
%%bash
rm -rf wals.tar.gz wals_trained
gcloud ai-platform local train \
    --module-name=walsmodel.task \
    --package-path=${PWD}/walsmodel \
    -- \
    --output_dir=${PWD}/wals_trained \
    --input_path=${PWD}/data \
    --num_epochs=0.01 --nitems=${NITEMS} --nusers=${NUSERS} \
    --job-dir=./tmp
Will train for 2 steps, evaluating once every 162 steps
WARNING: The `gcloud ml-engine` commands have been renamed and will soon be removed. Please use `gcloud ai-platform` instead.
WARNING: Logging before flag parsing goes to stderr.
W0731 20:45:02.831105 139726482318784 deprecation_wrapper.py:119] From walsmodel/model.py:27: The name tf.logging.set_verbosity is deprecated. Please use tf.compat.v1.logging.set_verbosity instead.

W0731 20:45:02.831403 139726482318784 deprecation_wrapper.py:119] From walsmodel/model.py:27: The name tf.logging.INFO is deprecated. Please use tf.compat.v1.logging.INFO instead.

W0731 20:45:02.832739 139726482318784 deprecation.py:323] From walsmodel/model.py:163: run (from tensorflow.contrib.learn.python.learn.learn_runner) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.estimator.train_and_evaluate.
W0731 20:45:02.832900 139726482318784 deprecation.py:323] From /usr/local/lib/python2.7/dist-packages/tensorflow/contrib/learn/python/learn/estimators/estimator.py:1179: __init__ (from tensorflow.contrib.learn.python.learn.estimators.estimator) is deprecated and will be removed in a future version.
Instructions for updating:
Please replace uses of any Estimator from tf.contrib.learn with an Estimator from tf.estimator.*
W0731 20:45:02.832978 139726482318784 deprecation.py:323] From /usr/local/lib/python2.7/dist-packages/tensorflow/contrib/learn/python/learn/estimators/estimator.py:427: __init__ (from tensorflow.contrib.learn.python.learn.estimators.run_config) is deprecated and will be removed in a future version.
Instructions for updating:
When switching to tf.estimator.Estimator, use tf.estimator.RunConfig instead.
I0731 20:45:02.833267 139726482318784 estimator.py:428] Using default config.
I0731 20:45:02.833632 139726482318784 estimator.py:456] Using config: {'_save_checkpoints_secs': 600, '_num_ps_replicas': 0, '_keep_checkpoint_max': 5, '_task_type': None, '_train_distribute': None, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f1442928cd0>, '_tf_config': gpu_options {
  per_process_gpu_memory_fraction: 1.0
}
, '_protocol': None, '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_session_config': None, '_model_dir': '/home/jupyter/training-data-analyst/courses/machine_learning/deepdive/10_recommend/wals_trained/', '_tf_random_seed': None, '_master': '', '_device_fn': None, '_num_worker_replicas': 0, '_task_id': 0, '_log_step_count_steps': 100, '_experimental_max_worker_delay_secs': None, '_evaluation_master': '', '_eval_distribute': None, '_environment': u'cloud', '_save_summary_steps': 100}
W0731 20:45:02.834263 139726482318784 deprecation.py:323] From walsmodel/model.py:159: __init__ (from tensorflow.contrib.learn.python.learn.experiment) is deprecated and will be removed in a future version.
Instructions for updating:
Please switch to tf.estimator.train_and_evaluate. You will also have to convert to a tf.estimator.Estimator.
W0731 20:45:02.834763 139726482318784 deprecation.py:323] From /usr/local/lib/python2.7/dist-packages/tensorflow/contrib/learn/python/learn/monitors.py:279: __init__ (from tensorflow.contrib.learn.python.learn.monitors) is deprecated and will be removed after 2016-12-05.
Instructions for updating:
Monitors are deprecated. Please use tf.train.SessionRunHook.
W0731 20:45:03.079253 139726482318784 deprecation_wrapper.py:119] From walsmodel/model.py:90: The name tf.gfile.Glob is deprecated. Please use tf.io.gfile.glob instead.

W0731 20:45:03.183283 139726482318784 deprecation_wrapper.py:119] From walsmodel/model.py:42: The name tf.FixedLenFeature is deprecated. Please use tf.io.FixedLenFeature instead.

W0731 20:45:03.183634 139726482318784 deprecation_wrapper.py:119] From walsmodel/model.py:43: The name tf.VarLenFeature is deprecated. Please use tf.io.VarLenFeature instead.

W0731 20:45:03.183834 139726482318784 deprecation_wrapper.py:119] From walsmodel/model.py:45: The name tf.parse_single_example is deprecated. Please use tf.io.parse_single_example instead.

W0731 20:45:03.248234 139726482318784 deprecation.py:323] From walsmodel/model.py:46: sparse_merge (from tensorflow.python.ops.sparse_ops) is deprecated and will be removed in a future version.
Instructions for updating:
No similar op available at this time.
W0731 20:45:03.344137 139726482318784 deprecation.py:323] From /usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/array_ops.py:1354: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
W0731 20:45:03.405380 139726482318784 deprecation.py:323] From walsmodel/model.py:98: make_one_shot_iterator (from tensorflow.python.data.ops.dataset_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `for ... in dataset:` to iterate over a dataset. If using `tf.estimator`, return the `Dataset` object directly from your input function. As a last resort, you can use `tf.compat.v1.data.make_one_shot_iterator(dataset)`.
W0731 20:45:03.584995 139726482318784 deprecation.py:323] From /usr/local/lib/python2.7/dist-packages/tensorflow/python/training/training_util.py:236: initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
W0731 20:45:04.254898 139726482318784 deprecation.py:323] From /usr/local/lib/python2.7/dist-packages/tensorflow/contrib/factorization/python/ops/wals.py:315: __new__ (from tensorflow.contrib.learn.python.learn.estimators.model_fn) is deprecated and will be removed in a future version.
Instructions for updating:
When switching to tf.estimator.Estimator, use tf.estimator.EstimatorSpec. You can use the `estimator_spec` method to create an equivalent one.
I0731 20:45:04.255398 139726482318784 basic_session_run_hooks.py:541] Create CheckpointSaverHook.
I0731 20:45:04.667010 139726482318784 monitored_session.py:240] Graph was finalized.
2019-07-31 20:45:04.667537: I tensorflow/core/platform/cpu_feature_guard.cc:145] This TensorFlow binary is optimized with Intel(R) MKL-DNN to use the following CPU instructions in performance critical operations:  AVX2 FMA
To enable them in non-MKL-DNN operations, rebuild TensorFlow with the appropriate compiler flags.
2019-07-31 20:45:05.229729: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2300000000 Hz
2019-07-31 20:45:05.231372: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55a93e5e12b0 executing computations on platform Host. Devices:
2019-07-31 20:45:05.231418: I tensorflow/compiler/xla/service/service.cc:175]   StreamExecutor device (0): <undefined>, <undefined>
2019-07-31 20:45:05.233992: I tensorflow/core/common_runtime/process_util.cc:115] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
2019-07-31 20:45:05.885688: W tensorflow/compiler/jit/mark_for_compilation_pass.cc:1412] (One-time warning): Not using XLA:CPU for cluster because envvar TF_XLA_FLAGS=--tf_xla_cpu_global_jit was not set.  If you want XLA:CPU, either set that envvar, or use experimental_jit_scope to enable XLA:CPU.  To confirm that XLA is active, pass --vmodule=xla_compilation_cache=1 (as a proper command-line flag, not via TF_XLA_FLAGS) or set the envvar XLA_FLAGS=--xla_hlo_profile.
I0731 20:45:06.246146 139726482318784 session_manager.py:500] Running local_init_op.
I0731 20:45:06.287065 139726482318784 session_manager.py:502] Done running local_init_op.
I0731 20:45:06.788110 139726482318784 basic_session_run_hooks.py:606] Saving checkpoints for 0 into /home/jupyter/training-data-analyst/courses/machine_learning/deepdive/10_recommend/wals_trained/model.ckpt.
I0731 20:45:07.044361 139726482318784 wals.py:77] SweepHook running init op.
I0731 20:45:07.117719 139726482318784 wals.py:85] SweepHook running prep ops for the row sweep.
I0731 20:45:07.555083 139726482318784 wals.py:90] Next fit step starting.
I0731 20:45:08.939277 139726482318784 basic_session_run_hooks.py:262] loss = 96596.016, step = 1
I0731 20:45:08.940334 139726482318784 wals.py:90] Next fit step starting.
I0731 20:45:09.295263 139726482318784 basic_session_run_hooks.py:606] Saving checkpoints for 2 into /home/jupyter/training-data-analyst/courses/machine_learning/deepdive/10_recommend/wals_trained/model.ckpt.
I0731 20:45:09.418965 139726482318784 estimator.py:525] Loss for final step: 164339.45.
I0731 20:45:09.904969 139726482318784 evaluation.py:255] Starting evaluation at 2019-07-31T20:45:09Z
I0731 20:45:09.976733 139726482318784 monitored_session.py:240] Graph was finalized.
W0731 20:45:09.977391 139726482318784 deprecation.py:323] From /usr/local/lib/python2.7/dist-packages/tensorflow/python/training/saver.py:1276: checkpoint_exists (from tensorflow.python.training.checkpoint_management) is deprecated and will be removed in a future version.
Instructions for updating:
Use standard file APIs to check for files with this prefix.
I0731 20:45:09.978878 139726482318784 saver.py:1280] Restoring parameters from /home/jupyter/training-data-analyst/courses/machine_learning/deepdive/10_recommend/wals_trained/model.ckpt-2
I0731 20:45:10.079866 139726482318784 session_manager.py:500] Running local_init_op.
I0731 20:45:10.110157 139726482318784 session_manager.py:502] Done running local_init_op.
I0731 20:45:10.525317 139726482318784 evaluation.py:167] Evaluation [1/1]
I0731 20:45:10.600009 139726482318784 evaluation.py:275] Finished evaluation at 2019-07-31-20:45:10
I0731 20:45:10.600291 139726482318784 estimator.py:347] Saving dict for global step 2: global_step = 2, loss = 96596.016
W0731 20:45:10.696297 139726482318784 deprecation_wrapper.py:119] From walsmodel/model.py:122: The name tf.Session is deprecated. Please use tf.compat.v1.Session instead.

I0731 20:45:10.697436 139726482318784 estimator.py:428] Using default config.
I0731 20:45:10.697824 139726482318784 estimator.py:456] Using config: {'_save_checkpoints_secs': 600, '_num_ps_replicas': 0, '_keep_checkpoint_max': 5, '_task_type': None, '_train_distribute': None, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f140269e190>, '_tf_config': gpu_options {
  per_process_gpu_memory_fraction: 1.0
}
, '_protocol': None, '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_session_config': None, '_model_dir': '/home/jupyter/training-data-analyst/courses/machine_learning/deepdive/10_recommend/wals_trained/', '_tf_random_seed': None, '_master': '', '_device_fn': None, '_num_worker_replicas': 0, '_task_id': 0, '_log_step_count_steps': 100, '_experimental_max_worker_delay_secs': None, '_evaluation_master': '', '_eval_distribute': None, '_environment': u'cloud', '_save_summary_steps': 100}

Run on Cloud

In [ ]:
%%bash
gsutil -m cp data/* gs://${BUCKET}/wals/data
In [ ]:
%%bash
OUTDIR=gs://${BUCKET}/wals/model_trained
JOBNAME=wals_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ai-platform jobs submit training $JOBNAME \
    --region=$REGION \
    --module-name=walsmodel.task \
    --package-path=${PWD}/walsmodel \
    --job-dir=$OUTDIR \
    --staging-bucket=gs://$BUCKET \
    --scale-tier=BASIC_GPU \
    --runtime-version=$TFVERSION \
    -- \
    --output_dir=$OUTDIR \
    --input_path=gs://${BUCKET}/wals/data \
    --num_epochs=10 --nitems=${NITEMS} --nusers=${NUSERS}

This took 10 minutes for me.

Get row and column factors

Once you have a trained WALS model, you can get row and column factors (user and item embeddings) from the checkpoint file. We'll look at how to use these in the section on building a recommendation system using deep neural networks.

In [36]:
def get_factors(args):
    with tf.Session() as sess:
        estimator = tf.contrib.factorization.WALSMatrixFactorization(
            num_rows = args["nusers"], 
            num_cols = args["nitems"],
            embedding_dimension = args["n_embeds"],
            model_dir = args["output_dir"])
        
        row_factors = estimator.get_row_factors()[0]
        col_factors = estimator.get_col_factors()[0]
    return row_factors, col_factors
In [41]:
args = {
    "output_dir": "gs://{}/wals/model_trained".format(BUCKET),
    "nitems": NITEMS,
    "nusers": NUSERS,
    "n_embeds": 10
  }

user_embeddings, item_embeddings = get_factors(args)
print(user_embeddings[:3])
print(item_embeddings[:3])
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_environment': 'local', '_is_chief': True, '_keep_checkpoint_every_n_hours': 10000, '_num_worker_replicas': 0, '_session_config': None, '_task_type': None, '_eval_distribute': None, '_tf_config': gpu_options {
  per_process_gpu_memory_fraction: 1.0
}
, '_master': '', '_log_step_count_steps': 100, '_model_dir': 'gs://qwiklabs-gcp-cbc8684b07fc2dbd-bucket/wals/model_trained', '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f4bd8302f28>, '_device_fn': None, '_keep_checkpoint_max': 5, '_task_id': 0, '_evaluation_master': '', '_save_checkpoints_steps': None, '_protocol': None, '_train_distribute': None, '_save_checkpoints_secs': 600, '_save_summary_steps': 100, '_tf_random_seed': None, '_num_ps_replicas': 0}
[[ 3.3451824e-06 -1.1986867e-05  4.8447573e-06 -1.5209486e-05
  -1.7004859e-07  1.1976428e-05  9.8887876e-06  7.2386983e-06
  -7.0237149e-07 -7.9796819e-06]
 [-2.5300323e-03  1.4055537e-03 -9.8291773e-04 -4.2533795e-03
  -1.4166030e-03 -1.9530674e-03  8.5932651e-04 -1.5276540e-03
   2.1342330e-03  1.2041229e-03]
 [ 9.5228699e-21  5.5453966e-21  2.2947056e-21 -5.8859543e-21
   7.7516509e-21 -2.7640896e-20  2.3587296e-20 -3.9876822e-21
   1.7312470e-20  2.5409211e-20]]
[[-1.2125404e-06 -8.6304914e-05  4.4657736e-05 -6.8423047e-05
   5.8551927e-06  9.7241784e-05  6.6776753e-05  1.6673854e-05
  -1.2708440e-05 -5.1148414e-05]
 [-1.1353870e-01  5.9097271e-02 -4.6105500e-02 -1.5460028e-01
  -1.9166643e-02 -7.3236257e-02  3.5582058e-02 -5.6805085e-02
   7.5831160e-02  7.5306065e-02]
 [ 7.1989548e-20  4.4574543e-20  6.5149121e-21 -4.6291777e-20
   8.8196718e-20 -2.3245078e-19  1.9459292e-19  4.0191465e-20
   1.6273659e-19  2.2836562e-19]]

You can visualize the embedding vectors using dimensional reduction techniques such as PCA.

In [42]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.decomposition import PCA

pca = PCA(n_components = 3)
pca.fit(user_embeddings)
user_embeddings_pca = pca.transform(user_embeddings)

fig = plt.figure(figsize = (8,8))
ax = fig.add_subplot(111, projection = "3d")
xs, ys, zs = user_embeddings_pca[::150].T
ax.scatter(xs, ys, zs)
Out[42]:
<mpl_toolkits.mplot3d.art3d.Path3DCollection at 0x7f4b9406c438>
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.