2c. Loading large datasets progressively with the tf.data.Dataset

In this notebook, we continue reading the same small dataset, but refactor our ML pipeline in two small, but significant, ways:

  1. Refactor the input to read data from disk progressively.

  2. Refactor the feature creation so that it is not one-to-one with inputs. </ol>
    The Pandas function in the previous notebook first read the whole data into memory -- on a large dataset, this won't be an option.

In [1]:
from google.cloud import bigquery
import tensorflow as tf
import numpy as np
import shutil
print(tf.__version__)
1.15.2

1. Refactor the input

Read data created in Lab1a, but this time make it more general, so that we can later handle large datasets. We use the Dataset API for this. It ensures that, as data gets delivered to the model in mini-batches, it is loaded from disk only when needed.

In [2]:
CSV_COLUMNS = ['fare_amount', 'pickuplon','pickuplat','dropofflon','dropofflat','passengers', 'key']
DEFAULTS = [[0.0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']]

def read_dataset(filename, mode, batch_size = 512):
  def decode_csv(row):
    columns = tf.decode_csv(row, record_defaults = DEFAULTS)
    features = dict(zip(CSV_COLUMNS, columns))
    features.pop('key') # discard, not a real feature
    label = features.pop('fare_amount') # remove label from features and store
    return features, label

  # Create list of file names that match "glob" pattern (i.e. data_file_*.csv)
  filenames_dataset = tf.data.Dataset.list_files(filename, shuffle=False)
  # Read lines from text files
  textlines_dataset = filenames_dataset.flat_map(tf.data.TextLineDataset)
  # Parse text lines as comma-separated values (CSV)
  dataset = textlines_dataset.map(decode_csv)

  # Note:
  # use tf.data.Dataset.flat_map to apply one to many transformations (here: filename -> text lines)
  # use tf.data.Dataset.map      to apply one to one  transformations (here: text line -> feature list)

  if mode == tf.estimator.ModeKeys.TRAIN:
      num_epochs = None # loop indefinitely
      dataset = dataset.shuffle(buffer_size = 10 * batch_size, seed=2)
  else:
      num_epochs = 1 # end-of-input after this

  dataset = dataset.repeat(num_epochs).batch(batch_size)

  return dataset

def get_train_input_fn():
  return read_dataset('./taxi-train.csv', mode = tf.estimator.ModeKeys.TRAIN)

def get_valid_input_fn():
  return read_dataset('./taxi-valid.csv', mode = tf.estimator.ModeKeys.EVAL)

2. Refactor the way features are created.

For now, pass these through (same as previous lab). However, refactoring this way will enable us to break the one-to-one relationship between inputs and features.

In [3]:
INPUT_COLUMNS = [
    tf.feature_column.numeric_column('pickuplon'),
    tf.feature_column.numeric_column('pickuplat'),
    tf.feature_column.numeric_column('dropofflat'),
    tf.feature_column.numeric_column('dropofflon'),
    tf.feature_column.numeric_column('passengers'),
]

def add_more_features(feats):
  # Nothing to add (yet!)
  return feats

feature_cols = add_more_features(INPUT_COLUMNS)

Create and train the model

Note that we train for num_steps * batch_size examples.

In [ ]:
tf.logging.set_verbosity(tf.logging.INFO)
OUTDIR = 'taxi_trained'
shutil.rmtree(OUTDIR, ignore_errors = True) # start fresh each time
model = tf.estimator.LinearRegressor(
      feature_columns = feature_cols, model_dir = OUTDIR)
model.train(input_fn = get_train_input_fn, steps = 200)
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': 'taxi_trained', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f8237b45690>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
WARNING:tensorflow:From /opt/conda/lib/python3.7/site-packages/tensorflow_core/python/training/training_util.py:236: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
WARNING:tensorflow:From /opt/conda/lib/python3.7/site-packages/tensorflow_core/python/autograph/converters/directives.py:119: The name tf.decode_csv is deprecated. Please use tf.io.decode_csv instead.

WARNING:tensorflow:From /opt/conda/lib/python3.7/site-packages/tensorflow_core/python/data/util/random_seed.py:58: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /opt/conda/lib/python3.7/site-packages/tensorflow_core/python/feature_column/feature_column_v2.py:305: Layer.add_variable (from tensorflow.python.keras.engine.base_layer) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
WARNING:tensorflow:From /opt/conda/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
WARNING:tensorflow:From /opt/conda/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:308: to_float (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.cast` instead.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into taxi_trained/model.ckpt.
INFO:tensorflow:loss = 100053.76, step = 1
INFO:tensorflow:global_step/sec: 24.7391
INFO:tensorflow:loss = 37355.76, step = 101 (4.044 sec)

Evaluate model

As before, evaluate on the validation data. We'll do the third refactoring (to move the evaluation into the training loop) in the next lab.

In [ ]:
metrics = model.evaluate(input_fn = get_valid_input_fn, steps = None)
print('RMSE on dataset = {}'.format(np.sqrt(metrics['average_loss'])))

Challenge Exercise

Create a neural network that is capable of finding the volume of a cylinder given the radius of its base (r) and its height (h). Assume that the radius and height of the cylinder are both in the range 0.5 to 2.0. Unlike in the challenge exercise for b_estimator.ipynb, assume that your measurements of r, h and V are all rounded off to the nearest 0.1. Simulate the necessary training dataset. This time, you will need a lot more data to get a good predictor.

Hint (highlight to see):

Create random values for r and h and compute V. Then, round off r, h and V (i.e., the volume is computed from the true value of r and h; it's only your measurement that is rounded off). Your dataset will consist of the round values of r, h and V. Do this for both the training and evaluation datasets.

Now modify the "noise" so that instead of just rounding off the value, there is up to a 10% error (uniformly distributed) in the measurement followed by rounding off.

Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License