Welcome to the Deep Learning Pipelines Python API docs!

Note that most of the Python API docs are currently stubs. The APIs are designed to match the Scala APIs as closely as reasonable, so please refer to the Scala API docs for more details on both the algorithms and APIs (particularly DataFrame schema).

Contents:

Module contents

class sparkdl.TFImageTransformer(channelOrder, inputCol=None, outputCol=None, graph=None, inputTensor='sparkdl_image_input:0', outputTensor=None, outputMode='vector')[source]

Bases: pyspark.ml.base.Transformer, sparkdl.param.shared_params.HasInputCol, sparkdl.param.shared_params.HasOutputCol, sparkdl.param.image_params.HasOutputMode

Applies the Tensorflow graph to the image column in DataFrame.

Restrictions of the current API:

  • Does not use minibatches, which is a major low-hanging fruit for performance.
  • Only one output node can be specified.
  • The output is expected to be an image or a 1-d vector.
  • All images in the dataframe are expected be of the same numerical data type (i.e. the dtype of the values in the numpy array representation is the same.)

We assume all graphs have a “minibatch” dimension (i.e. an unknown leading dimension) in the tensor shapes.

Note

The input tensorflow graph should have appropriate weights constantified, since a new session is created inside this transformer.

__init__(self, channelOrder, inputCol=None, outputCol=None, graph=None,
inputTensor=IMAGE_INPUT_TENSOR_NAME, outputTensor=None, outputMode=”vector”)
param:channelOrder: specify the ordering of the color channel, can be one of RGB,

BGR, L (grayscale)

channelOrder = Param(parent='undefined', name='channelOrder', doc='Strign specifying the expected color channel order, can be one of L,RGB,BGR')
getGraph()[source]
getInputTensor()[source]
getOutputTensor()[source]
graph = Param(parent='undefined', name='graph', doc='A TensorFlow computation graph')
inputTensor = Param(parent='undefined', name='inputTensor', doc='A TensorFlow tensor object or name representing the input image')
outputTensor = Param(parent='undefined', name='outputTensor', doc='A TensorFlow tensor object or name representing the output')
setGraph(value)[source]
setInputTensor(value)[source]
setOutputTensor(value)[source]
setParams(channelOrder=None, inputCol=None, outputCol=None, graph=None, inputTensor='sparkdl_image_input:0', outputTensor=None, outputMode='vector')[source]
setParams(self, channelOrder=None, inputCol=None, outputCol=None, graph=None,
inputTensor=IMAGE_INPUT_TENSOR_NAME, outputTensor=None, outputMode=”vector”)
class sparkdl.TFInputGraph(graph_def, input_tensor_name_from_signature, output_tensor_name_from_signature)[source]

Bases: object

An opaque object containing TensorFlow graph. This object can be serialized.

Note

We recommend constructing this object using one of the class constructor methods.

When the graph contains serving signatures in which a set of well-known names are associated with their corresponding raw tensor names in the graph, we extract and store them here. For example, the TensorFlow saved model may contain the following structure, so that end users can retrieve the the input tensor via well_known_input_sig and the output tensor via well_known_output_sig without knowing the actual tensor names a priori.

sigdef: {'well_known_prediction_signature':
inputs { key: "well_known_input_sig"
  value {
    name: "tnsrIn:0"
    dtype: DT_DOUBLE
    tensor_shape { dim { size: -1 } dim { size: 17 } }
    }
  }
outputs { key: "well_known_output_sig"
  value {
    name: "tnsrOut:0"
    dtype: DT_DOUBLE
    tensor_shape { dim { size: -1 } }
    }
}}

In this case, the class will internally store the mapping from signature names to tensor names.

{'well_known_input_sig': 'tnsrIn:0'}
{'well_known_output_sig': 'tnsrOut:0'}
param graph_def:
 tf.GraphDef, a serializable object containing the topology and computation units of the TensorFlow graph. The graph object is prepared for inference, i.e. the variables are converted to constants and operations like BatchNormalization are converted to be independent of input batch.
param input_tensor_name_from_signature:
 dict, signature key names mapped to tensor names. Please see the example above.
param output_tensor_name_from_signature:
 dict, signature key names mapped to tensor names Please see the example above.
classmethod fromCheckpoint(checkpoint_dir, feed_names, fetch_names)[source]

Construct a TFInputGraph object from a checkpoint, ignore the embedded signature_def, if there is any.

Parameters:
  • checkpoint_dir – str, name of the directory containing the TensorFlow graph training checkpoint.
  • feed_names – list, names of the input tensors.
  • fetch_names – list, names of the output tensors.
classmethod fromCheckpointWithSignature(checkpoint_dir, signature_def_key)[source]

Construct a TFInputGraph object from a checkpoint, using the embedded signature_def. Throw an error if we cannot find an entry with the signature_def_key inside the signature_def.

Parameters:
  • checkpoint_dir – str, name of the directory containing the TensorFlow graph training checkpoint.
  • signature_def_key – str, key (name) of the signature_def to use. It should be in the list of signature_def structures saved with the checkpoint.
classmethod fromGraph(graph, sess, feed_names, fetch_names)[source]

Construct a TFInputGraph from a in memory tf.Graph object. The graph might contain variables that are maintained in the provided session. Thus we need an active session in which the graph’s variables are initialized or restored. We do not close the session. As a result, this constructor can be used inside a standard TensorFlow session context.

with tf.Session() as sess:
    graph = import_my_tensorflow_graph(...)
    input = TFInputGraph.fromGraph(graph, sess, ...)
Parameters:
  • graph – a tf.Graph object containing the topology and computation units of the TensorFlow graph.
  • feed_names – list, names of the input tensors.
  • fetch_names – list, names of the output tensors.
classmethod fromGraphDef(graph_def, feed_names, fetch_names)[source]

Construct a TFInputGraph from a tf.GraphDef object.

Parameters:
  • graph_deftf.GraphDef, a serializable object containing the topology and computation units of the TensorFlow graph.
  • feed_names – list, names of the input tensors.
  • fetch_names – list, names of the output tensors.
classmethod fromSavedModel(saved_model_dir, tag_set, feed_names, fetch_names)[source]

Construct a TFInputGraph object from a saved model (tf.SavedModel) directory. Ignore the the embedded signature_def, if there is any.

Parameters:
  • saved_model_dir – str, name of the directory containing the TensorFlow graph training checkpoint.
  • tag_set – str, name of the graph stored in this meta_graph of the saved model that we are interested in using.
  • feed_names – list, names of the input tensors.
  • fetch_names – list, names of the output tensors.
classmethod fromSavedModelWithSignature(saved_model_dir, tag_set, signature_def_key)[source]

Construct a TFInputGraph object from a saved model (tf.SavedModel) directory, using the embedded signature_def. Throw error if we cannot find an entry with the signature_def_key inside the signature_def.

Parameters:
  • saved_model_dir – str, name of the directory containing the TensorFlow graph training checkpoint.
  • tag_set – str, name of the graph stored in this meta_graph of the saved model that we are interested in using.
  • signature_def_key – str, key (name) of the signature_def to use. It should be in the list of signature_def structures saved with the TensorFlow SavedModel.
translateInputMapping(input_mapping)[source]

When the meta_graph contains signature_def, we expect users to provide input and output mapping with respect to the tensor reference keys embedded in the signature_def.

This function translates the input_mapping into the canonical format, which maps input DataFrame column names to tensor names.

Parameters:input_mapping – dict, DataFrame column name to tensor reference names defined in the signature_def key.
translateOutputMapping(output_mapping)[source]

When the meta_graph contains signature_def, we expect users to provide input and output mapping with respect to the tensor reference keys embedded in the signature_def.

This function translates the output_mapping into the canonical format, which maps tensor names into input DataFrame column names.

Parameters:output_mapping – dict, tensor reference names defined in the signature_def keys into the output DataFrame column names.
class sparkdl.TFTransformer(self, tfInputGraph=None, inputMapping=None, outputMapping=None, tfHParms=None)[source]

Bases: pyspark.ml.base.Transformer, sparkdl.param.shared_params.HasTFInputGraph, sparkdl.param.shared_params.HasTFHParams, sparkdl.param.shared_params.HasInputMapping, sparkdl.param.shared_params.HasOutputMapping

Applies the TensorFlow graph to the array column in DataFrame.

Restrictions of the current API:

We assume that - All the inputs of the graphs have a “minibatch” dimension (i.e. an unknown leading

dimension) in the tensor shapes.
  • Input DataFrame has an array column where all elements have the same length
  • The transformer is expected to work on blocks of data at the same time.
setParams(self, tfInputGraph=None, inputMapping=None, outputMapping=None, tfHParms=None)[source]
class sparkdl.DeepImagePredictor(inputCol=None, outputCol=None, modelName=None, decodePredictions=False, topK=5)[source]

Bases: pyspark.ml.base.Transformer, sparkdl.param.shared_params.HasInputCol, sparkdl.param.shared_params.HasOutputCol

Applies the model specified by its popular name to the image column in DataFrame. The input image column should be 3-channel SpImage. The output is a MLlib Vector.

__init__(self, inputCol=None, outputCol=None, modelName=None, decodePredictions=False,
topK=5)
decodePredictions = Param(parent='undefined', name='decodePredictions', doc='If true, output predictions in the (class, description, probability) format')
getModelName()[source]
modelName = Param(parent='undefined', name='modelName', doc='A deep learning model name')
setModelName(value)[source]
setParams(inputCol=None, outputCol=None, modelName=None, decodePredictions=False, topK=5)[source]
setParams(self, inputCol=None, outputCol=None, modelName=None, decodePredictions=False,
topK=5)
topK = Param(parent='undefined', name='topK', doc='How many classes to return if decodePredictions is True')
class sparkdl.DeepImageFeaturizer(inputCol=None, outputCol=None, modelName=None, scaleHint='SCALE_AREA_AVERAGING')[source]

Bases: pyspark.ml.wrapper.JavaTransformer

Applies the model specified by its popular name, with its prediction layer(s) chopped off, to the image column in DataFrame. The output is a MLlib Vector so that DeepImageFeaturizer can be used in a MLlib Pipeline. The input image column should be ImageSchema.

__init__(self, inputCol=None, outputCol=None, modelName=None,
scaleHint=”SCALE_AREA_AVERAGING”)
getInputCol()[source]
getModelName()[source]
getOutputCol()[source]
getScaleHint()[source]
inputCol = Param(parent='undefined', name='inputCol', doc='input column name.')
modelName = Param(parent='undefined', name='modelName', doc='A deep learning model name')
outputCol = Param(parent='undefined', name='outputCol', doc='output column name.')
scaleHint = Param(parent='undefined', name='scaleHint', doc='Hint which algorhitm to use for image resizing')
setInputCol(value)[source]
setModelName(value)[source]
setOutputCol(value)[source]
setParams(inputCol=None, outputCol=None, modelName=None, scaleHint='SCALE_AREA_AVERAGING')[source]
setParams(self, inputCol=None, outputCol=None, modelName=None,
scaleHint=”SCALE_AREA_AVERAGING”)
setScaleHint(value)[source]
class sparkdl.KerasImageFileTransformer(inputCol=None, outputCol=None, modelFile=None, imageLoader=None, outputMode='vector')[source]

Bases: pyspark.ml.base.Transformer, sparkdl.param.shared_params.HasInputCol, sparkdl.param.shared_params.HasOutputCol, sparkdl.param.image_params.CanLoadImage, sparkdl.param.shared_params.HasKerasModel, sparkdl.param.image_params.HasOutputMode

Applies the Tensorflow-backed Keras model (specified by a file name) to images (specified by the URI in the inputCol column) in the DataFrame.

Restrictions of the current API:
  • see TFImageTransformer.
  • Only supports Tensorflow-backed Keras models (no Theano).
__init__(self, inputCol=None, outputCol=None, modelFile=None, imageLoader=None,
outputMode=”vector”)
setParams(inputCol=None, outputCol=None, modelFile=None, imageLoader=None, outputMode='vector')[source]
setParams(self, inputCol=None, outputCol=None, modelFile=None, imageLoader=None,
outputMode=”vector”)
class sparkdl.KerasTransformer(self, inputCol=None, outputCol=None, modelFile=None)[source]

Bases: pyspark.ml.base.Transformer, sparkdl.param.shared_params.HasInputCol, sparkdl.param.shared_params.HasOutputCol, sparkdl.param.shared_params.HasKerasModel

Applies a Tensorflow-backed Keras model (specified by a file name) to a column of arrays (where each array corresponds to a Tensor) in a DataFrame. Produces an output column of arrays.

Restrictions of the current API:
  • See TFTransformer
  • Only supports Keras models with a single input tensor & a single output tensor, where the input & output tensors must have at most 2 dimensions.
  • Only supports Tensorflow-backed Keras models (no Theano or CNTK).
setParams(self, inputCol=None, outputCol=None, modelFile=None)[source]
sparkdl.imageInputPlaceholder(nChannels=None)[source]

Inserts a TensorFlow placeholder for imput images.

class sparkdl.KerasImageFileEstimator(inputCol=None, outputCol=None, outputMode='vector', labelCol=None, modelFile=None, imageLoader=None, kerasOptimizer=None, kerasLoss=None, kerasFitParams=None)[source]

Bases: pyspark.ml.base.Estimator, sparkdl.param.shared_params.HasInputCol, sparkdl.param.shared_params.HasOutputCol, sparkdl.param.shared_params.HasLabelCol, sparkdl.param.shared_params.HasKerasModel, sparkdl.param.shared_params.HasKerasOptimizer, sparkdl.param.shared_params.HasKerasLoss, sparkdl.param.image_params.CanLoadImage, sparkdl.param.image_params.HasOutputMode

Build a Estimator from a Keras model.

First, create a model and save it to file system

from keras.applications.resnet50 import ResNet50
model = ResNet50(weights=None)
model.save("path_to_my_model.h5")

Then, create a image loading function that reads image data from URI, preprocess them, and returns the numerical tensor.

def load_image_and_process(uri):
    import PIL.Image
    from keras.applications.imagenet_utils import preprocess_input

    original_image = PIL.Image.open(uri).convert('RGB')
    resized_image = original_image.resize((224, 224), PIL.Image.ANTIALIAS)
    image_array = np.array(resized_image).astype(np.float32)
    image_tensor = preprocess_input(image_array[np.newaxis, :])
    return image_tensor

Assume the image URIs live in the following DataFrame.

original_dataset = spark.createDataFrame([
    Row(imageUri="image1_uri", imageLabel="image1_label"),
    Row(imageUri="image2_uri", imageLabel="image2_label"),
    # and more rows ...
])
stringIndexer = StringIndexer(inputCol="imageLabel", outputCol="categoryIndex")
indexed_dateset = stringIndexer.fit(original_dataset).transform(original_dataset)
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
image_dataset = encoder.transform(indexed_dateset)

We can then create a Keras estimator that takes our saved model file and train it using Spark.

estimator = KerasImageFileEstimator(inputCol="imageUri",
                                    outputCol="name_of_result_column",
                                    labelCol="categoryVec",
                                    imageLoader=load_image_and_process,
                                    kerasOptimizer="adam",
                                    kerasLoss="categorical_crossentropy",
                                    kerasFitParams={"epochs": 5, "batch_size": 64},
                                    modelFile="path_to_my_model.h5")

transformers = estimator.fit(image_dataset)
__init__(self, inputCol=None, outputCol=None, outputMode=”vector”, labelCol=None,
modelFile=None, imageLoader=None, kerasOptimizer=None, kerasLoss=None, kerasFitParams=None)
fitMultiple(dataset, paramMaps)[source]

Fits len(paramMaps) models in parallel, one in each Spark task. :param dataset: input dataset, which is an instance of pyspark.sql.DataFrame.

The column inputCol should be of type sparkdl.image.imageIO.imgSchema.
Parameters:paramMaps – non-empty list or tuple of ParamMaps (dict values)
Returns:an iterable which contains one model for each param map. Each call to next(modelIterator) will return (index, model) where model was fit using paramMaps[index]. index values may not be sequential.

Warning

This serializes each model into an HDF5 byte file to the driver. If the model file is large, the driver might go out-of-memory. As we cannot assume the existence of a sufficiently large (and writable) file system, users are advised to not train too many models in a single Spark job.

setParams(inputCol=None, outputCol=None, outputMode='vector', labelCol=None, modelFile=None, imageLoader=None, kerasOptimizer=None, kerasLoss=None, kerasFitParams=None)[source]
setParams(self, inputCol=None, outputCol=None, outputMode=”vector”, labelCol=None,
modelFile=None, imageLoader=None, kerasOptimizer=None, kerasLoss=None, kerasFitParams=None)
class sparkdl.HorovodRunner(np)[source]

Bases: object

HorovodRunner runs distributed deep learning training jobs using Horovod.

On Databricks Runtime 5.0 ML and above, it launches the Horovod job as a distributed Spark job. It makes running Horovod easy on Databricks by managing the cluster setup and integrating with Spark. Check out Databricks documentation to view end-to-end examples and performance tuning tips.

The open-source version only runs the job locally inside the same Python process, which is for local development only.

Note

Horovod is a distributed training framework developed by Uber.

Parameters:np

number of parallel processes to use for the Horovod job. This argument only takes effect on Databricks Runtime 5.0 ML and above. It is ignored in the open-source version. On Databricks, each process will take an available task slot, which maps to a GPU on a GPU cluster or a CPU core on a CPU cluster. Accepted values are:

  • If <0, this will spawn -np subprocesses on the driver node to run Horovod locally. Training stdout and stderr messages go to the notebook cell output, and are also available in driver logs in case the cell output is truncated. This is useful for debugging and we recommend testing your code under this mode first. However, be careful of heavy use of the Spark driver on a shared Databricks cluster. Note that np < -1 is only supported on Databricks Runtime 5.5 ML and above.
  • If >0, this will launch a Spark job with np tasks starting all together and run the Horovod job on the task nodes. It will wait until np task slots are available to launch the job. If np is greater than the total number of task slots on the cluster, the job will fail. As of Databricks Runtime 5.4 ML, training stdout and stderr messages go to the notebook cell output. In the event that the cell output is truncated, full logs are available in stderr stream of task 0 under the 2nd spark job started by HorovodRunner, which you can find in the Spark UI.
  • If 0, this will use all task slots on the cluster to launch the job.
run(main, **kwargs)[source]

Runs a Horovod training job invoking main(**kwargs).

The open-source version only invokes main(**kwargs) inside the same Python process. On Databricks Runtime 5.0 ML and above, it will launch the Horovod job based on the documented behavior of np. Both the main function and the keyword arguments are serialized using cloudpickle and distributed to cluster workers.

Parameters:
  • main – a Python function that contains the Horovod training code. The expected signature is def main(**kwargs) or compatible forms. Because the function gets pickled and distributed to workers, please change global states inside the function, e.g., setting logging level, and be aware of pickling limitations. Avoid referencing large objects in the function, which might result large pickled data, making the job slow to start.
  • kwargs – keyword arguments passed to the main function at invocation time.
Returns:

return value of the main function. With np>=0, this returns the value from the rank 0 process. Note that the returned value should be serializable using cloudpickle.

Indices and tables