Welcome to spark_sklearn’s documentation!

Contents:

class spark_sklearn.Converter(sc)

Bases: object

Class for converting between scikit-learn and Spark ML models

Parameters:sc – SparkContext
toPandas(df)

This is similar to the Spark DataFrame built-in toPandas() method, but it handles MLlib Vector columns differently. It converts MLlib Vectors into rows of scipy.sparse.csr_matrix, which is generally friendlier for PyData tools like scikit-learn.

Note

Experimental: This will likely be replaced in later releases with improved APIs.

Parameters:df – Spark DataFrame
Returns:Pandas dataframe
toSKLearn(model)

Convert a Spark MLlib model from the Pipelines API (spark.ml) to a scikit-learn model. Currently supported models: - pyspark.ml.classification.LogisticRegressionModel - pyspark.ml.regression.LinearRegressionModel

Parameters:model – Spark ML model
Returns:scikit-learn model with equivalent predictive behavior. Currently, parameters or arguments for training are not copied.
toSpark(model)

Convert a scikit-learn model to a Spark ML model from the Pipelines API (spark.ml). Currently supported models: - sklearn.linear_model.LogisticRegression (binary classification only, not multiclass) - sklearn.linear_model.LinearRegression

Parameters:model – scikit-learn model
Returns:Spark ML model with equivalent predictive behavior. Currently, parameters or arguments for training are not copied.
class spark_sklearn.CSRVectorUDT

Bases: pyspark.sql.types.UserDefinedType

SQL user-defined type (UDT) for scipy.sparse.csr_matrix (vectors only, not matrices).

Note

Experimental

deserialize(datum)

Converts a SQL datum into a user-type object.

classmethod module()

The Python module of the UDT.

serialize(obj)

Converts the a user-type object into a SQL datum.

classmethod sqlType()

Underlying SQL storage type for this UDT.

class spark_sklearn.GridSearchCV(sc, estimator, param_grid, scoring=None, fit_params=None, n_jobs=1, iid=True, refit=True, cv=3, verbose=0, pre_dispatch='2*n_jobs', error_score='raise', return_train_score=True)

Bases: spark_sklearn.base_search.SparkBaseSearchCV

Exhaustive search over specified parameter values for an estimator.

Important members are fit, predict.

GridSearchCV implements a “fit” and a “score” method. It also implements “predict”, “predict_proba”, “decision_function”, “transform” and “inverse_transform” if they are implemented in the estimator used.

The parameters of the estimator used to apply these methods are optimized by cross-validated grid-search over a parameter grid.

estimator : estimator object.
This is assumed to implement the scikit-learn estimator interface. Either estimator needs to provide a score function, or scoring must be passed.
param_grid : dict or list of dictionaries
Dictionary with parameters names (string) as keys and lists of parameter settings to try as values, or a list of such dictionaries, in which case the grids spanned by each dictionary in the list are explored. This enables searching over any sequence of parameter settings.
scoring : string, callable or None, default=None
A string (see model evaluation documentation) or a scorer callable object / function with signature scorer(estimator, X, y). If None, the score method of the estimator is used.
fit_params : dict, optional
Parameters to pass to the fit method.
n_jobs : int, default=1
Number of jobs to run in parallel.
pre_dispatch : int, or string, optional
Not used; exists for scikit-learn compatibility.
iid : boolean, default=True
If True, the data is assumed to be identically distributed across the folds, and the loss minimized is the total loss per sample, and not the mean loss across the folds.
cv : int, cross-validation generator or an iterable, optional

Determines the cross-validation splitting strategy. Possible inputs for cv are:

  • None, to use the default 3-fold cross validation,
  • integer, to specify the number of folds in a (Stratified)KFold,
  • An object to be used as a cross-validation generator.
  • An iterable yielding train, test splits.

For integer/None inputs, if the estimator is a classifier and y is either binary or multiclass, StratifiedKFold is used. In all other cases, KFold is used.

refit : boolean, default=True
Refit the best estimator with the entire dataset. If “False”, it is impossible to make predictions using this GridSearchCV instance after fitting.
verbose : integer
Controls the verbosity: the higher, the more messages.
error_score : ‘raise’ (default) or numeric
Value to assign to the score if an error occurs in estimator fitting. If set to ‘raise’, the error is raised. If a numeric value is given, FitFailedWarning is raised. This parameter does not affect the refit step, which will always raise the error.
return_train_score : boolean, default=True
If 'False', the cv_results_ attribute will not include training scores.
>>> from sklearn import svm, datasets
>>> from spark_sklearn.grid_search import GridSearchCV
>>> from spark_sklearn.util import createLocalSparkSession
>>> sc = createLocalSparkSession().sparkContext
>>> iris = datasets.load_iris()
>>> parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}
>>> svr = svm.SVC(gamma='auto')
>>> clf = GridSearchCV(sc, svr, parameters)
>>> clf.fit(iris.data, iris.target)
...                             # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
GridSearchCV(cv=3, error_score=...,
       estimator=SVC(C=1.0, cache_size=..., class_weight=..., coef0=...,
                     decision_function_shape=..., degree=..., gamma=...,
                     kernel='rbf', max_iter=-1, probability=False,
                     random_state=None, shrinking=True, tol=...,
                     verbose=False),
       fit_params={}, iid=..., n_jobs=1,
       param_grid=..., pre_dispatch=..., refit=...,
       scoring=..., verbose=...)
>>> sorted(clf.cv_results_.keys())
...                             # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
['mean_fit_time', 'mean_score_time', 'mean_test_score',...
 'mean_train_score', 'param_C', 'param_kernel', 'params',...
 'rank_test_score', 'split0_test_score',...
 'split0_train_score', 'split1_test_score', 'split1_train_score',...
 'split2_test_score', 'split2_train_score',...
 'std_fit_time', 'std_score_time', 'std_test_score', 'std_train_score'...]
cv_results_ : dict of numpy (masked) ndarrays

A dict with keys as column headers and values as columns, that can be imported into a pandas DataFrame. For instance the below given table

param_kernel param_gamma param_degree split0_test_score rank…..
‘poly’ 2 0.8 2
‘poly’ 3 0.7 4
‘rbf’ 0.1 0.8 3
‘rbf’ 0.2 0.9 1

will be represented by a cv_results_ dict of:

{
'param_kernel': masked_array(data = ['poly', 'poly', 'rbf', 'rbf'],
                             mask = [False False False False]...)
'param_gamma': masked_array(data = [-- -- 0.1 0.2],
                            mask = [ True  True False False]...),
'param_degree': masked_array(data = [2.0 3.0 -- --],
                             mask = [False False  True  True]...),
'split0_test_score'  : [0.8, 0.7, 0.8, 0.9],
'split1_test_score'  : [0.82, 0.5, 0.7, 0.78],
'mean_test_score'    : [0.81, 0.60, 0.75, 0.82],
'std_test_score'     : [0.02, 0.01, 0.03, 0.03],
'rank_test_score'    : [2, 4, 3, 1],
'split0_train_score' : [0.8, 0.9, 0.7],
'split1_train_score' : [0.82, 0.5, 0.7],
'mean_train_score'   : [0.81, 0.7, 0.7],
'std_train_score'    : [0.03, 0.03, 0.04],
'mean_fit_time'      : [0.73, 0.63, 0.43, 0.49],
'std_fit_time'       : [0.01, 0.02, 0.01, 0.01],
'mean_score_time'    : [0.007, 0.06, 0.04, 0.04],
'std_score_time'     : [0.001, 0.002, 0.003, 0.005],
'params'             : [{'kernel': 'poly', 'degree': 2}, ...],
}

NOTE that the key 'params' is used to store a list of parameter settings dict for all the parameter candidates. The mean_fit_time, std_fit_time, mean_score_time and std_score_time are all in seconds.

best_estimator_ : estimator
Estimator that was chosen by the search, i.e. estimator which gave highest score (or smallest loss if specified) on the left out data. Not available if refit=False.
best_score_ : float
Score of best_estimator on the left out data.
best_params_ : dict
Parameter setting that gave the best results on the hold out data.
best_index_ : int
The index (of the cv_results_ arrays) which corresponds to the best candidate parameter setting. The dict at search.cv_results_['params'][search.best_index_] gives the parameter setting for the best model, that gives the highest mean score (search.best_score_).
scorer_ : function
Scorer function used on the held out data to choose the best parameters for the model.
n_splits_ : int
The number of cross-validation splits (folds/iterations).

The parameters selected are those that maximize the score of the left out data, unless an explicit score is passed in which case it is used instead.

ParameterGrid:
generates all the combinations of a hyperparameter grid.
sklearn.model_selection.train_test_split():
utility function to split the data into a development set usable for fitting a GridSearchCV instance and an evaluation set for its final evaluation.
sklearn.metrics.make_scorer():
Make a scorer from a performance metric or loss function.
fit(X, y=None, groups=None)

Run fit with all sets of parameters.

X : array-like, shape = [n_samples, n_features]
Training vector, where n_samples is the number of samples and n_features is the number of features.
y : array-like, shape = [n_samples] or [n_samples, n_output], optional
Target relative to X for classification or regression; None for unsupervised learning.
groups : array-like, with shape (n_samples,), optional
Group labels for the samples used while splitting the dataset into train/test set.
class spark_sklearn.RandomizedSearchCV(sc, estimator, param_distributions, n_iter=10, scoring=None, fit_params=None, n_jobs=1, iid=True, refit=True, cv=None, verbose=0, pre_dispatch='2*n_jobs', random_state=None, error_score='raise')

Bases: spark_sklearn.base_search.SparkBaseSearchCV

Randomized search on hyper parameters.

RandomizedSearchCV implements a “fit” and a “score” method. It also implements “predict”, “predict_proba”, “decision_function”, “transform” and “inverse_transform” if they are implemented in the estimator used.

The parameters of the estimator used to apply these methods are optimized by cross-validated search over parameter settings.

In contrast to GridSearchCV, not all combinations of parameter values are tried out, but rather a fixed number of parameter settings is sampled from the specified distributions. The number of parameter settings that are tried is given by n_iter.

If all parameters are presented as a list, sampling without replacement is performed. If at least one parameter is given as a distribution, sampling with replacement is used. It is highly recommended to use continuous distributions for continuous parameters.

estimator : estimator object.
A object of that type is instantiated for each grid point. This is assumed to implement the scikit-learn estimator interface. Either estimator needs to provide a score function, or scoring must be passed.
param_distributions : dict
Dictionary with parameters names (string) as keys and distributions or lists of parameters to try. Distributions must provide a rvs method for sampling (such as those from scipy.stats.distributions). If a list is given, it is sampled uniformly.
n_iter : int, default=10
Number of parameter settings that are sampled. n_iter trades off runtime vs quality of the solution.
scoring : string, callable or None, default=None
A string (see model evaluation documentation) or a scorer callable object / function with signature scorer(estimator, X, y). If None, the score method of the estimator is used.
fit_params : dict, optional
Parameters to pass to the fit method.
n_jobs : int, default=1
Number of jobs to run in parallel.
pre_dispatch : int, or string, optional
Not used; exists for scikit-learn compatibility.
iid : boolean, default=True
If True, the data is assumed to be identically distributed across the folds, and the loss minimized is the total loss per sample, and not the mean loss across the folds.
cv : int, cross-validation generator or an iterable, optional

Determines the cross-validation splitting strategy. Possible inputs for cv are:

  • None, to use the default 3-fold cross-validation,
  • integer, to specify the number of folds.
  • An object to be used as a cross-validation generator.
  • An iterable yielding train/test splits.

For integer/None inputs, if the estimator is a classifier and y is either binary or multiclass, sklearn.model_selection.StratifiedKFold is used. In all other cases, sklearn.model_selection.KFold is used.

refit : boolean, default=True
Refit the best estimator with the entire dataset. If “False”, it is impossible to make predictions using this RandomizedSearchCV instance after fitting.
verbose : integer
Controls the verbosity: the higher, the more messages.
random_state : int, RandomState instance or None, optional, default=None
Pseudo random number generator state used for random uniform sampling from lists of possible values instead of scipy.stats distributions. If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by np.random.
error_score : ‘raise’ (default) or numeric
Value to assign to the score if an error occurs in estimator fitting. If set to ‘raise’, the error is raised. If a numeric value is given, FitFailedWarning is raised. This parameter does not affect the refit step, which will always raise the error.
cv_results_ : dict of numpy (masked) ndarrays

A dict with keys as column headers and values as columns, that can be imported into a pandas DataFrame. For instance the below given table

param_kernel param_gamma param_degree split0_test_score rank_….
‘poly’ 2 0.8 2
‘poly’ 3 0.7 4
‘rbf’ 0.1 0.8 3
‘rbf’ 0.2 0.9 1

will be represented by a cv_results_ dict of:

{
'param_kernel': masked_array(data = ['poly', 'poly', 'rbf', 'rbf'],
                             mask = [False False False False]...)
'param_gamma': masked_array(data = [-- -- 0.1 0.2],
                            mask = [ True  True False False]...),
'param_degree': masked_array(data = [2.0 3.0 -- --],
                             mask = [False False  True  True]...),
'split0_test_score'  : [0.8, 0.7, 0.8, 0.9],
'split1_test_score'  : [0.82, 0.5, 0.7, 0.78],
'mean_test_score'    : [0.81, 0.60, 0.75, 0.82],
'std_test_score'     : [0.02, 0.01, 0.03, 0.03],
'rank_test_score'    : [2, 4, 3, 1],
'split0_train_score' : [0.8, 0.9, 0.7],
'split1_train_score' : [0.82, 0.5, 0.7],
'mean_train_score'   : [0.81, 0.7, 0.7],
'std_train_score'    : [0.03, 0.03, 0.04],
'mean_fit_time'      : [0.73, 0.63, 0.43, 0.49],
'std_fit_time'       : [0.01, 0.02, 0.01, 0.01],
'mean_score_time'    : [0.007, 0.06, 0.04, 0.04],
'std_score_time'     : [0.001, 0.002, 0.003, 0.005],
'params'             : [{'kernel': 'poly', 'degree': 2}, ...],
}

NOTE that the key 'params' is used to store a list of parameter settings dict for all the parameter candidates. The mean_fit_time, std_fit_time, mean_score_time and std_score_time are all in seconds.

best_estimator_ : estimator
Estimator that was chosen by the search, i.e. estimator which gave highest score (or smallest loss if specified) on the left out data. Not available if refit=False.
best_score_ : float
Score of best_estimator on the left out data.
best_params_ : dict
Parameter setting that gave the best results on the hold out data.
best_index_ : int
The index (of the cv_results_ arrays) which corresponds to the best candidate parameter setting. The dict at search.cv_results_['params'][search.best_index_] gives the parameter setting for the best model, that gives the highest mean score (search.best_score_).

The parameters selected are those that maximize the score of the held-out data, according to the scoring parameter.

GridSearchCV:
Does exhaustive search over a grid of parameters.
ParameterSampler:
A generator over parameter settings, constructed from param_distributions.
fit(X, y=None, groups=None)

Run fit on the estimator with randomly drawn parameters.

X : array-like, shape = [n_samples, n_features]
Training vector, where n_samples in the number of samples and n_features is the number of features.
y : array-like, shape = [n_samples] or [n_samples, n_output], optional
Target relative to X for classification or regression; None for unsupervised learning.
groups : array-like, with shape (n_samples,), optional
Group labels for the samples used while splitting the dataset into train/test set.
spark_sklearn.gapply(grouped_data, func, schema, *cols)

Applies the function func to data grouped by key. In particular, given a dataframe grouped by some set of key columns key1, key2, …, keyn, this method groups all the values for each row with the same key columns into a single Pandas dataframe and by default invokes func((key1, key2, ..., keyn), values) where the number and order of the key arguments is determined by columns on which this instance’s parent DataFrame was grouped and values is a pandas.DataFrame of columns selected by cols, in that order.

If there is only one key then the key tuple is automatically unpacked, with func(key, values) called.

func is expected to return a pandas.DataFrame of the specified schema schema, which should be of type StructType (output columns are of this name and order).

If spark.conf.get("spark.sql.retainGroupColumns") is not u'true', then func is called with an empty key tuple (note it is set to u'true' by default).

If no cols are specified, then all grouped columns will be offered, in the order of the columns in the original dataframe. In either case, the Pandas columns will be named according to the DataFrame column names.

The order of the rows passed in as Pandas rows is not guaranteed to be stable relative to the original row order.

Note:

Users must ensure that the grouped values for every group must fit entirely in memory.

Note:

This method is only available if Pandas is installed.

Parameters:
  • grouped_data – data grouped by key
  • func – a two argument function, which may be either a lambda or named function
  • schema – the return schema for func, a StructType
  • cols – list of column names (string only)
Raises:
  • ValueError – if "*" is in cols
  • ValueError – if cols contains duplicates
  • ValueError – if schema is not a StructType
  • ImportError – if pandas module is not installed
  • ImportError – if pandas version is too old (less than 0.7.1)
Returns:

the new DataFrame with the original key columns replicated for each returned value in each group’s resulting pandas dataframe, the schema being the original key schema prepended to schema, where all the resulting groups’ rows are concatenated. Of course, if retaining group columns is disabled, then the output will exactly match schema since no keys can be prepended.

>>> import pandas as pd
>>> from pyspark.sql import SparkSession
>>> from spark_sklearn.group_apply import gapply
>>> from spark_sklearn.util import createLocalSparkSession
>>> spark = createLocalSparkSession()
>>> df = (spark
...     .createDataFrame([Row(course="dotNET", year=2012, earnings=10000),
...                       Row(course="Java",   year=2012, earnings=20000),
...                       Row(course="dotNET", year=2012, earnings=5000),
...                       Row(course="dotNET", year=2013, earnings=48000),
...                       Row(course="Java",   year=2013, earnings=30000)])
...     .select("course", "year", "earnings"))
>>> def yearlyMedian(_, vals):
...     all_years = set(vals['year'])
...     # Note that interpolation is performed, so we need to cast back to int.
...     yearly_median = [(year, int(vals['earnings'][vals['year'] == year].median()))
...                      for year in all_years]
...     return pd.DataFrame.from_records(yearly_median)
>>> newSchema = StructType().add("year", LongType()).add("median_earnings", LongType())
>>> gapply(df.groupBy("course"), yearlyMedian, newSchema).orderBy("median_earnings").show()
+------+----+---------------+
|course|year|median_earnings|
+------+----+---------------+
|dotNET|2012|           7500|
|  Java|2012|          20000|
|  Java|2013|          30000|
|dotNET|2013|          48000|
+------+----+---------------+
<BLANKLINE>
>>> def twoKeyYearlyMedian(_, vals):
...     return pd.DataFrame.from_records([(int(vals["earnings"].median()),)])
>>> newSchema = StructType([df.schema["earnings"]])
>>> gapply(df.groupBy("course", "year"), twoKeyYearlyMedian, newSchema, "earnings").orderBy(
...     "earnings").show()
+------+----+--------+
|course|year|earnings|
+------+----+--------+
|dotNET|2012|    7500|
|  Java|2012|   20000|
|  Java|2013|   30000|
|dotNET|2013|   48000|
+------+----+--------+
<BLANKLINE>
>>> spark.stop(); SparkSession._instantiatedContext = None

Keyed Models

The use case that this addresses is where a client has a dataset with many keys - the distribution of which is such that the total number of rows for with a shared key value can be contained completely in memory on a single machine.

This assumption is particularly enabling because clients may wish to apply more intricate single-machine models (such as a scikit-learn estimator) to every user.

The API provided here generalizes the scikit-learn estimator interface to the Spark ML one; in particular, it allows clients to train their scikit-learn estimators in parallel over a grouped and aggregated dataframe.

>>> from sklearn.linear_model import LinearRegression
>>> from sklearn.cluster import KMeans
>>> from pyspark.ml.linalg import Vectors, Matrices, MatrixUDT
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql import SparkSession
>>> from spark_sklearn.util import createLocalSparkSession
>>> from spark_sklearn.keyed_models import KeyedEstimator
>>> spark = createLocalSparkSession()
>>> df = spark.createDataFrame([(user,
...                              Vectors.dense([i, i ** 2, i ** 3]),
...                              0.0 + user + i + 2 * i ** 2 + 3 * i ** 3)
...                             for user in range(3) for i in range(5)])
>>> df = df.toDF("key", "features", "y")
>>> df.where("5 < y and y < 10").sort("key", "y").show()
+---+-------------+---+
|key|     features|  y|
+---+-------------+---+
|  0|[1.0,1.0,1.0]|6.0|
|  1|[1.0,1.0,1.0]|7.0|
|  2|[1.0,1.0,1.0]|8.0|
+---+-------------+---+
<BLANKLINE>
>>> km = KeyedEstimator(sklearnEstimator=LinearRegression(), yCol="y").fit(df)
>>> def printFloat(x):
...     rounded = round(x, 2)
...     return "{:.2f}".format(0 if rounded == 0 else rounded)
...
>>> def printModel(model):
...     coef = "[" + ", ".join(map(printFloat, model.coef_)) + "]"
...     intercept = printFloat(model.intercept_)
...     return "intercept: {} coef: {}".format(intercept, coef)
...
>>> km.keyedModels.columns
['key', 'estimator']
>>> printedModels = udf(printModel)("estimator").alias("linear fit")
>>> km.keyedModels.select("key", printedModels).sort("key").show(truncate=False)
+---+----------------------------------------+
|key|linear fit                              |
+---+----------------------------------------+
|0  |intercept: 0.00 coef: [1.00, 2.00, 3.00]|
|1  |intercept: 1.00 coef: [1.00, 2.00, 3.00]|
|2  |intercept: 2.00 coef: [1.00, 2.00, 3.00]|
+---+----------------------------------------+
<BLANKLINE>

Now that we have generated a linear model for each key, we can apply it to keyed test data. In the following, we only show one point for simplicity, but the test data can contain multiple points for multiple different keys.

>>> input = spark.createDataFrame([(0, Vectors.dense(3, 1, -1))]).toDF("key", "features")
>>> km.transform(input).withColumn("output", udf(printFloat)("output")).show()
+---+--------------+------+
|key|      features|output|
+---+--------------+------+
|  0|[3.0,1.0,-1.0]|  2.00|
+---+--------------+------+
<BLANKLINE>

Suppose we wanted to perform key-based clustering. The most common use case would require just fitting our model.

>>> km = KeyedEstimator(sklearnEstimator=KMeans(random_state=0, n_clusters=2)).fit(df)
>>> def getCentroids(kmeans):
...     n_clusters, n_features = kmeans.cluster_centers_.shape
...     return Matrices.dense(n_clusters, n_features, kmeans.cluster_centers_.reshape(-1))
...
>>> centroids = udf(getCentroids, MatrixUDT())("estimator").alias("centroids")
>>> km.keyedModels.select("key", centroids).sort("key").show()
+---+--------------------+
|key|           centroids|
+---+--------------------+
|  0|4.0   64.0  3.5  ...|
|  1|4.0   64.0  3.5  ...|
|  2|4.0   64.0  3.5  ...|
+---+--------------------+
<BLANKLINE>

Usually, this is all we want. In the case of KMeans, we can also predict cluster labels, since the scikit-learn estimator provides this functionality. Note this is not the case for some other clusterers, such as DBSCAN.

>>> km.transform(input).withColumnRenamed("output", "cluster label").show()
+---+--------------+-------------+
|key|      features|cluster label|
+---+--------------+-------------+
|  0|[3.0,1.0,-1.0]|            1|
+---+--------------+-------------+
<BLANKLINE>
>>> spark.stop(); SparkSession._instantiatedContext = None # clear hidden SparkContext for reuse
class spark_sklearn.keyed_models.KeyedEstimator(sklearnEstimator=None, keyCols=['key'], xCol='features', outputCol='output', yCol=None, estimatorType=None)

Bases: pyspark.ml.base.Estimator

A KeyedEstimator provides an interface for training per-key scikit-learn estimators.

The KeyedEstimator can be part of any Spark ML pipeline provided the columns are appropriately matched.

Currently, the class provides a generalization for scikit-learn transformers, clusterers, and predictors. Because these scikit-learn estimators all derive from the same base type (yielding the same API), yet have different expectations for what methods should be called and with what arguments, this class enumerates three different types of behavior:

  1. "transformer"

    Examples: sklearn.decomposition.PCA, sklearn.cluster.KMeans

    In this case, the estimator will aggregate the all input features for a given key into a NxD data matrix, where N is the number of rows with the given key and D is the feature space dimensionality; let this matrix be X.

    For each such key and data matrix pair, a clone of the parameter estimator is fitted with estimator.fit(X), inducing a mapping between keys and fitted estimators: this produces a fitted transformer KeyedModel, whose Spark ML transform() method generates an output column by applying each key’s fitted scikit-learn estimator’s own transform method.

    The output column type for transformers will always be a DenseVector.

  2. "clusterer"

    Examples: sklearn.cluster.DBSCAN, sklearn.cluster.KMeans

    As before, the data will be aggregated into a design matrix X, and estimator.fit(X) will be called for each key group.

    The difference between a "transformer" and "clusterer" lies in their prediction behavior: a clusterer will call estimator.predict() whereas a transformer refers to the transform method.

    The output column type for clusterers will always be of LongType.

  3. "predictor"

    Examples: sklearn.svm.LinearSVC, sklearn.linear_model.ElasticNet

    Here, the estimator will likewise aggregate input features into the data matrix X. In addition, the label column will be aggregated in a collated manner, generating a vector y for each key. The estimator clone will be fitted with estimator.fit(X, y).

    A predictor KeyedModel transforms its input dataframe by generating an output column with the output of the estimator’s predict method.

    The output column type for predictors will be the same as the label column (which must be an AtomicType (else a TypeError will be thrown at fit()-time).

The input column should be numeric or a vector (else a TypeError will be thrown at fit()-time). Don’t use “estimator” as a column name.

  • In certain cases, a scikit-learn estimator may support both "transformer" and "clusterer" interfaces. sklearn.cluster.KMeans, for instance, supports both the cluster-labelling operation predict() and a transformation into cluster-mean-distance space. Such ambiguity is resolved by prefering clustering. It may be overriden by manually specifying the estimatorType to transformer in the KeyedEstimator constructor.
  • Key-based grouping only occurs during training. During the transformation/prediction phase of computation, the output is unaggregated: the number of rows inputted as test data will be equal to the number of rows outputted.
  • spark.conf.get("spark.sql.retainGroupColumns") assumed to be u"true". This is the case by default for Spark 1.4+. This is necessary for both the keyed estimator and the keyed model.
  • Estimators trained, persisted, and loaded across different scikit-learn versions are not guaranteed to work.

For all instances, the ordered list of keyCols determine the set of groups which each sklearnEstimator is applied to.

For every unique keyCols value, the remaining columns are aggregated and used to train the scikit-learn estimator.

estimatorType inference is conducted as follows: if yCol is specified, then this is assumed to be of "predictor" type, else a "transformer" or a "clusterer", depending on the estimator having the transform() or fit_predict() attributes, with "clusterer" being chosen in case both attributes are present.

Parameters:
  • sklearnEstimator – An instance of a scikit-learn estimator, with parameters configured as desired for each user.
  • keyCols – Key column names list used to group data to which models are applied, where order implies lexicographical importance.
  • xCol – Name of column of input features used for training and transformation/prediction.
  • yCol – Specifies name of label column for regression or classification pipelines. Required for predictors, must be unspecified or None for transformers.
  • estimatorType – Identifies the type of scikit-learn estimator being used, which changes the interface the sklearnEstimator is expected to have. This parameter’s value is inferred using reflection by default, but may be manually overriden.
Raises:
  • ValueError – if sklearnEstimator is None.
  • ValueError – if sklearnEstimator does not derive from sklearn.base.BaseEstimator.
  • ValueError – if keyCols is empty.
  • ValueError – if any column has the name "estimator"
  • AttributeError – if reflection checks indicate that parameter estimator is not equipped with a fit() method.
fit(dataset, params=None)

Fits a model to the input dataset with optional parameters.

Parameters:
  • dataset – input dataset, which is an instance of pyspark.sql.DataFrame
  • params – an optional param map that overrides embedded params. If a list/tuple of param maps is given, this calls fit on each param map and returns a list of models.
Returns:

fitted model(s)

New in version 1.3.0.

sklearnEstimatorType
Returns:the estimator type of this keyed estimator
class spark_sklearn.keyed_models.KeyedModel(sklearnEstimator=None, keyCols=None, xCol=None, outputCol=None, yCol=None, estimatorType=None, keyedSklearnEstimators=None, outputType=None)

Bases: pyspark.ml.base.Model

Represents a Spark ML Model, generated by a fitted KeyedEstimator.

Wraps fitted scikit-learn estimators - at transformation time transforms the input for each key using a key-specific model. See KeyedEstimator documentation for details.

If no estimator is present for a given key at transformation time, the prediction is null.

The constructor is used by KeyedEstimator to generate a KeyedModel; it is not intended for external use.

keyedModels
Returns:Returns the keyedSklearnEstimators param, a DataFrame with columns keyCols (where each key is unique) and the column "estimator" containing the fitted scikit-learn estimator as a SparkSklearnEstimator.
sklearnEstimatorType
Returns:the estimator type of this keyed model
transform(dataset, params=None)

Transforms the input dataset with optional parameters.

Parameters:
  • dataset – input dataset, which is an instance of pyspark.sql.DataFrame
  • params – an optional param map that overrides embedded params.
Returns:

transformed dataset

New in version 1.3.0.

class spark_sklearn.keyed_models.SparkSklearnEstimator(estimator)

Bases: object

SparkSklearnEstimator is a wrapper for containing scikit-learn estimators in dataframes - any estimators need to be stored inside the wrapper class to be properly serialized/deserialized in dataframe operations.

Note any method called on the estimator this object wraps may be called on the wrapper instead.

Initializes with the parameter estimator.

Param:estimator: scikit-learn estimator to contain.
estimator
Returns:the underlying estimator

Indices and tables