This notebook demonstrates how to implement a WALS matrix refactorization approach to do collaborative filtering.
import os
PROJECT = "cloud-training-demos" # REPLACE WITH YOUR PROJECT ID
BUCKET = "cloud-training-demos-ml" # REPLACE WITH YOUR BUCKET NAME
REGION = "us-central1" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1
# Do not change these
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "1.13"
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION
import tensorflow as tf
print(tf.__version__)
For collaborative filtering, we don't need to know anything about either the users or the content. Essentially, all we need to know is userId, itemId, and rating that the particular user gave the particular item.
In this case, we are working with newspaper articles. The company doesn't ask their users to rate the articles. However, we can use the time-spent on the page as a proxy for rating.
Normally, we would also add a time filter to this ("latest 7 days"), but our dataset is itself limited to a few days.
from google.cloud import bigquery
bq = bigquery.Client(project = PROJECT)
sql = """
#standardSQL
WITH CTE_visitor_page_content AS (
SELECT
fullVisitorID,
(SELECT MAX(IF(index=10, value, NULL)) FROM UNNEST(hits.customDimensions)) AS latestContentId,
(LEAD(hits.time, 1) OVER (PARTITION BY fullVisitorId ORDER BY hits.time ASC) - hits.time) AS session_duration
FROM
`cloud-training-demos.GA360_test.ga_sessions_sample`,
UNNEST(hits) AS hits
WHERE
# only include hits on pages
hits.type = "PAGE"
GROUP BY
fullVisitorId,
latestContentId,
hits.time )
-- Aggregate web stats
SELECT
fullVisitorID as visitorId,
latestContentId as contentId,
SUM(session_duration) AS session_duration
FROM
CTE_visitor_page_content
WHERE
latestContentId IS NOT NULL
GROUP BY
fullVisitorID,
latestContentId
HAVING
session_duration > 0
ORDER BY
latestContentId
"""
df = bq.query(sql).to_dataframe()
df.head()
stats = df.describe()
stats
df[["session_duration"]].plot(kind="hist", logy=True, bins=100, figsize=[8,5])
# The rating is the session_duration scaled to be in the range 0-1. This will help with training.
median = stats.loc["50%", "session_duration"]
df["rating"] = 0.3 * df["session_duration"] / median
df.loc[df["rating"] > 1, "rating"] = 1
df[["rating"]].plot(kind="hist", logy=True, bins=100, figsize=[8,5])
del df["session_duration"]
%%bash
rm -rf data
mkdir data
df.to_csv(path_or_buf = "data/collab_raw.csv", index = False, header = False)
!head data/collab_raw.csv
The raw dataset (above) won't work for WALS:
import pandas as pd
import numpy as np
def create_mapping(values, filename):
with open(filename, 'w') as ofp:
value_to_id = {value:idx for idx, value in enumerate(values.unique())}
for value, idx in value_to_id.items():
ofp.write("{},{}\n".format(value, idx))
return value_to_id
df = pd.read_csv(filepath_or_buffer = "data/collab_raw.csv",
header = None,
names = ["visitorId", "contentId", "rating"],
dtype = {"visitorId": str, "contentId": str, "rating": np.float})
df.to_csv(path_or_buf = "data/collab_raw.csv", index = False, header = False)
user_mapping = create_mapping(df["visitorId"], "data/users.csv")
item_mapping = create_mapping(df["contentId"], "data/items.csv")
!head -3 data/*.csv
df["userId"] = df["visitorId"].map(user_mapping.get)
df["itemId"] = df["contentId"].map(item_mapping.get)
mapped_df = df[["userId", "itemId", "rating"]]
mapped_df.to_csv(path_or_buf = "data/collab_mapped.csv", index = False, header = False)
mapped_df.head()
import pandas as pd
import numpy as np
mapped_df = pd.read_csv(filepath_or_buffer = "data/collab_mapped.csv", header = None, names = ["userId", "itemId", "rating"])
mapped_df.head()
NITEMS = np.max(mapped_df["itemId"]) + 1
NUSERS = np.max(mapped_df["userId"]) + 1
mapped_df["rating"] = np.round(mapped_df["rating"].values, 2)
print("{} items, {} users, {} interactions".format( NITEMS, NUSERS, len(mapped_df) ))
grouped_by_items = mapped_df.groupby("itemId")
iter = 0
for item, grouped in grouped_by_items:
print(item, grouped["userId"].values, grouped["rating"].values)
iter = iter + 1
if iter > 5:
break
import tensorflow as tf
grouped_by_items = mapped_df.groupby("itemId")
with tf.python_io.TFRecordWriter("data/users_for_item") as ofp:
for item, grouped in grouped_by_items:
example = tf.train.Example(features = tf.train.Features(feature = {
"key": tf.train.Feature(int64_list = tf.train.Int64List(value = [item])),
"indices": tf.train.Feature(int64_list = tf.train.Int64List(value = grouped["userId"].values)),
"values": tf.train.Feature(float_list = tf.train.FloatList(value = grouped["rating"].values))
}))
ofp.write(example.SerializeToString())
grouped_by_users = mapped_df.groupby("userId")
with tf.python_io.TFRecordWriter("data/items_for_user") as ofp:
for user, grouped in grouped_by_users:
example = tf.train.Example(features = tf.train.Features(feature = {
"key": tf.train.Feature(int64_list = tf.train.Int64List(value = [user])),
"indices": tf.train.Feature(int64_list = tf.train.Int64List(value = grouped["itemId"].values)),
"values": tf.train.Feature(float_list = tf.train.FloatList(value = grouped["rating"].values))
}))
ofp.write(example.SerializeToString())
!ls -lrt data
To summarize, we created the following data files from collab_raw.csv:
Once you have the dataset, do matrix factorization with WALS using the WALSMatrixFactorization in the contrib directory. This is an estimator model, so it should be relatively familiar.
As usual, we write an input_fn to provide the data to the model, and then create the Estimator to do train_and_evaluate. Because it is in contrib and hasn't moved over to tf.estimator yet, we use tf.contrib.learn.Experiment to handle the training loop.
import os
import tensorflow as tf
from tensorflow.python.lib.io import file_io
from tensorflow.contrib.factorization import WALSMatrixFactorization
def read_dataset(mode, args):
def decode_example(protos, vocab_size):
features = {
"key": tf.FixedLenFeature(shape = [1], dtype = tf.int64),
"indices": tf.VarLenFeature(dtype = tf.int64),
"values": tf.VarLenFeature(dtype = tf.float32)}
parsed_features = tf.parse_single_example(serialized = protos, features = features)
values = tf.sparse_merge(sp_ids = parsed_features["indices"], sp_values = parsed_features["values"], vocab_size = vocab_size)
# Save key to remap after batching
# This is a temporary workaround to assign correct row numbers in each batch.
# You can ignore details of this part and remap_keys().
key = parsed_features["key"]
decoded_sparse_tensor = tf.SparseTensor(indices = tf.concat(values = [values.indices, [key]], axis = 0),
values = tf.concat(values = [values.values, [0.0]], axis = 0),
dense_shape = values.dense_shape)
return decoded_sparse_tensor
def remap_keys(sparse_tensor):
# Current indices of our SparseTensor that we need to fix
bad_indices = sparse_tensor.indices # shape = (current_batch_size * (number_of_items/users[i] + 1), 2)
# Current values of our SparseTensor that we need to fix
bad_values = sparse_tensor.values # shape = (current_batch_size * (number_of_items/users[i] + 1),)
# Since batch is ordered, the last value for a batch index is the user
# Find where the batch index chages to extract the user rows
# 1 where user, else 0
user_mask = tf.concat(values = [bad_indices[1:,0] - bad_indices[:-1,0], tf.constant(value = [1], dtype = tf.int64)], axis = 0) # shape = (current_batch_size * (number_of_items/users[i] + 1), 2)
# Mask out the user rows from the values
good_values = tf.boolean_mask(tensor = bad_values, mask = tf.equal(x = user_mask, y = 0)) # shape = (current_batch_size * number_of_items/users[i],)
item_indices = tf.boolean_mask(tensor = bad_indices, mask = tf.equal(x = user_mask, y = 0)) # shape = (current_batch_size * number_of_items/users[i],)
user_indices = tf.boolean_mask(tensor = bad_indices, mask = tf.equal(x = user_mask, y = 1))[:, 1] # shape = (current_batch_size,)
good_user_indices = tf.gather(params = user_indices, indices = item_indices[:,0]) # shape = (current_batch_size * number_of_items/users[i],)
# User and item indices are rank 1, need to make rank 1 to concat
good_user_indices_expanded = tf.expand_dims(input = good_user_indices, axis = -1) # shape = (current_batch_size * number_of_items/users[i], 1)
good_item_indices_expanded = tf.expand_dims(input = item_indices[:, 1], axis = -1) # shape = (current_batch_size * number_of_items/users[i], 1)
good_indices = tf.concat(values = [good_user_indices_expanded, good_item_indices_expanded], axis = 1) # shape = (current_batch_size * number_of_items/users[i], 2)
remapped_sparse_tensor = tf.SparseTensor(indices = good_indices, values = good_values, dense_shape = sparse_tensor.dense_shape)
return remapped_sparse_tensor
def parse_tfrecords(filename, vocab_size):
if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
else:
num_epochs = 1 # end-of-input after this
files = tf.gfile.Glob(filename = os.path.join(args["input_path"], filename))
# Create dataset from file list
dataset = tf.data.TFRecordDataset(files)
dataset = dataset.map(map_func = lambda x: decode_example(x, vocab_size))
dataset = dataset.repeat(count = num_epochs)
dataset = dataset.batch(batch_size = args["batch_size"])
dataset = dataset.map(map_func = lambda x: remap_keys(x))
return dataset.make_one_shot_iterator().get_next()
def _input_fn():
features = {
WALSMatrixFactorization.INPUT_ROWS: parse_tfrecords("items_for_user", args["nitems"]),
WALSMatrixFactorization.INPUT_COLS: parse_tfrecords("users_for_item", args["nusers"]),
WALSMatrixFactorization.PROJECT_ROW: tf.constant(True)
}
return features, None
return _input_fn
This code is helpful in developing the input function. You don't need it in production.
def try_out():
with tf.Session() as sess:
fn = read_dataset(
mode = tf.estimator.ModeKeys.EVAL,
args = {"input_path": "data", "batch_size": 4, "nitems": NITEMS, "nusers": NUSERS})
feats, _ = fn()
print(feats["input_rows"].eval())
print(feats["input_rows"].eval())
try_out()
def find_top_k(user, item_factors, k):
all_items = tf.matmul(a = tf.expand_dims(input = user, axis = 0), b = tf.transpose(a = item_factors))
topk = tf.nn.top_k(input = all_items, k = k)
return tf.cast(x = topk.indices, dtype = tf.int64)
def batch_predict(args):
import numpy as np
with tf.Session() as sess:
estimator = tf.contrib.factorization.WALSMatrixFactorization(
num_rows = args["nusers"],
num_cols = args["nitems"],
embedding_dimension = args["n_embeds"],
model_dir = args["output_dir"])
# This is how you would get the row factors for out-of-vocab user data
# row_factors = list(estimator.get_projections(input_fn=read_dataset(tf.estimator.ModeKeys.EVAL, args)))
# user_factors = tf.convert_to_tensor(np.array(row_factors))
# But for in-vocab data, the row factors are already in the checkpoint
user_factors = tf.convert_to_tensor(value = estimator.get_row_factors()[0]) # (nusers, nembeds)
# In either case, we have to assume catalog doesn"t change, so col_factors are read in
item_factors = tf.convert_to_tensor(value = estimator.get_col_factors()[0])# (nitems, nembeds)
# For each user, find the top K items
topk = tf.squeeze(input = tf.map_fn(fn = lambda user: find_top_k(user, item_factors, args["topk"]), elems = user_factors, dtype = tf.int64))
with file_io.FileIO(os.path.join(args["output_dir"], "batch_pred.txt"), mode = 'w') as f:
for best_items_for_user in topk.eval():
f.write(",".join(str(x) for x in best_items_for_user) + '\n')
def train_and_evaluate(args):
train_steps = int(0.5 + (1.0 * args["num_epochs"] * args["nusers"]) / args["batch_size"])
steps_in_epoch = int(0.5 + args["nusers"] / args["batch_size"])
print("Will train for {} steps, evaluating once every {} steps".format(train_steps, steps_in_epoch))
def experiment_fn(output_dir):
return tf.contrib.learn.Experiment(
tf.contrib.factorization.WALSMatrixFactorization(
num_rows = args["nusers"],
num_cols = args["nitems"],
embedding_dimension = args["n_embeds"],
model_dir = args["output_dir"]),
train_input_fn = read_dataset(tf.estimator.ModeKeys.TRAIN, args),
eval_input_fn = read_dataset(tf.estimator.ModeKeys.EVAL, args),
train_steps = train_steps,
eval_steps = 1,
min_eval_frequency = steps_in_epoch
)
from tensorflow.contrib.learn.python.learn import learn_runner
learn_runner.run(experiment_fn = experiment_fn, output_dir = args["output_dir"])
batch_predict(args)
import shutil
shutil.rmtree(path = "wals_trained", ignore_errors=True)
train_and_evaluate({
"output_dir": "wals_trained",
"input_path": "data/",
"num_epochs": 0.05,
"nitems": NITEMS,
"nusers": NUSERS,
"batch_size": 512,
"n_embeds": 10,
"topk": 3
})
!ls wals_trained
!head wals_trained/batch_pred.txt
Let's run it as Python module for just a few steps.
os.environ["NITEMS"] = str(NITEMS)
os.environ["NUSERS"] = str(NUSERS)
%%bash
rm -rf wals.tar.gz wals_trained
gcloud ai-platform local train \
--module-name=walsmodel.task \
--package-path=${PWD}/walsmodel \
-- \
--output_dir=${PWD}/wals_trained \
--input_path=${PWD}/data \
--num_epochs=0.01 --nitems=${NITEMS} --nusers=${NUSERS} \
--job-dir=./tmp
%%bash
gsutil -m cp data/* gs://${BUCKET}/wals/data
%%bash
OUTDIR=gs://${BUCKET}/wals/model_trained
JOBNAME=wals_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ai-platform jobs submit training $JOBNAME \
--region=$REGION \
--module-name=walsmodel.task \
--package-path=${PWD}/walsmodel \
--job-dir=$OUTDIR \
--staging-bucket=gs://$BUCKET \
--scale-tier=BASIC_GPU \
--runtime-version=$TFVERSION \
-- \
--output_dir=$OUTDIR \
--input_path=gs://${BUCKET}/wals/data \
--num_epochs=10 --nitems=${NITEMS} --nusers=${NUSERS}
This took 10 minutes for me.
Once you have a trained WALS model, you can get row and column factors (user and item embeddings) from the checkpoint file. We'll look at how to use these in the section on building a recommendation system using deep neural networks.
def get_factors(args):
with tf.Session() as sess:
estimator = tf.contrib.factorization.WALSMatrixFactorization(
num_rows = args["nusers"],
num_cols = args["nitems"],
embedding_dimension = args["n_embeds"],
model_dir = args["output_dir"])
row_factors = estimator.get_row_factors()[0]
col_factors = estimator.get_col_factors()[0]
return row_factors, col_factors
args = {
"output_dir": "gs://{}/wals/model_trained".format(BUCKET),
"nitems": NITEMS,
"nusers": NUSERS,
"n_embeds": 10
}
user_embeddings, item_embeddings = get_factors(args)
print(user_embeddings[:3])
print(item_embeddings[:3])
You can visualize the embedding vectors using dimensional reduction techniques such as PCA.
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.decomposition import PCA
pca = PCA(n_components = 3)
pca.fit(user_embeddings)
user_embeddings_pca = pca.transform(user_embeddings)
fig = plt.figure(figsize = (8,8))
ax = fig.add_subplot(111, projection = "3d")
xs, ys, zs = user_embeddings_pca[::150].T
ax.scatter(xs, ys, zs)
# Copyright 2018 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.