Welcome to spark_sklearn’s documentation!¶
Contents:
-
class
spark_sklearn.
Converter
(sc)¶ Bases:
object
Class for converting between scikit-learn and Spark ML models
Parameters: sc – SparkContext -
toPandas
(df)¶ This is similar to the Spark DataFrame built-in toPandas() method, but it handles MLlib Vector columns differently. It converts MLlib Vectors into rows of scipy.sparse.csr_matrix, which is generally friendlier for PyData tools like scikit-learn.
Note
Experimental: This will likely be replaced in later releases with improved APIs.
Parameters: df – Spark DataFrame Returns: Pandas dataframe
-
toSKLearn
(model)¶ Convert a Spark MLlib model from the Pipelines API (spark.ml) to a scikit-learn model. Currently supported models: - pyspark.ml.classification.LogisticRegressionModel - pyspark.ml.regression.LinearRegressionModel
Parameters: model – Spark ML model Returns: scikit-learn model with equivalent predictive behavior. Currently, parameters or arguments for training are not copied.
-
toSpark
(model)¶ Convert a scikit-learn model to a Spark ML model from the Pipelines API (spark.ml). Currently supported models: - sklearn.linear_model.LogisticRegression (binary classification only, not multiclass) - sklearn.linear_model.LinearRegression
Parameters: model – scikit-learn model Returns: Spark ML model with equivalent predictive behavior. Currently, parameters or arguments for training are not copied.
-
-
class
spark_sklearn.
CSRVectorUDT
¶ Bases:
pyspark.sql.types.UserDefinedType
SQL user-defined type (UDT) for scipy.sparse.csr_matrix (vectors only, not matrices).
Note
Experimental
-
deserialize
(datum)¶ Converts a SQL datum into a user-type object.
-
classmethod
module
()¶ The Python module of the UDT.
-
serialize
(obj)¶ Converts the a user-type object into a SQL datum.
-
classmethod
sqlType
()¶ Underlying SQL storage type for this UDT.
-
-
class
spark_sklearn.
GridSearchCV
(sc, estimator, param_grid, scoring=None, fit_params=None, n_jobs=1, iid=True, refit=True, cv=3, verbose=0, pre_dispatch='2*n_jobs', error_score='raise', return_train_score=True)¶ Bases:
spark_sklearn.base_search.SparkBaseSearchCV
Exhaustive search over specified parameter values for an estimator.
Important members are fit, predict.
GridSearchCV implements a “fit” and a “score” method. It also implements “predict”, “predict_proba”, “decision_function”, “transform” and “inverse_transform” if they are implemented in the estimator used.
The parameters of the estimator used to apply these methods are optimized by cross-validated grid-search over a parameter grid.
- estimator : estimator object.
- This is assumed to implement the scikit-learn estimator interface.
Either estimator needs to provide a
score
function, orscoring
must be passed. - param_grid : dict or list of dictionaries
- Dictionary with parameters names (string) as keys and lists of parameter settings to try as values, or a list of such dictionaries, in which case the grids spanned by each dictionary in the list are explored. This enables searching over any sequence of parameter settings.
- scoring : string, callable or None, default=None
- A string (see model evaluation documentation) or
a scorer callable object / function with signature
scorer(estimator, X, y)
. IfNone
, thescore
method of the estimator is used. - fit_params : dict, optional
- Parameters to pass to the fit method.
- n_jobs : int, default=1
- Number of jobs to run in parallel.
- pre_dispatch : int, or string, optional
- Not used; exists for scikit-learn compatibility.
- iid : boolean, default=True
- If True, the data is assumed to be identically distributed across the folds, and the loss minimized is the total loss per sample, and not the mean loss across the folds.
- cv : int, cross-validation generator or an iterable, optional
Determines the cross-validation splitting strategy. Possible inputs for cv are:
- None, to use the default 3-fold cross validation,
- integer, to specify the number of folds in a (Stratified)KFold,
- An object to be used as a cross-validation generator.
- An iterable yielding train, test splits.
For integer/None inputs, if the estimator is a classifier and
y
is either binary or multiclass,StratifiedKFold
is used. In all other cases,KFold
is used.- refit : boolean, default=True
- Refit the best estimator with the entire dataset. If “False”, it is impossible to make predictions using this GridSearchCV instance after fitting.
- verbose : integer
- Controls the verbosity: the higher, the more messages.
- error_score : ‘raise’ (default) or numeric
- Value to assign to the score if an error occurs in estimator fitting. If set to ‘raise’, the error is raised. If a numeric value is given, FitFailedWarning is raised. This parameter does not affect the refit step, which will always raise the error.
- return_train_score : boolean, default=True
- If
'False'
, thecv_results_
attribute will not include training scores.
>>> from sklearn import svm, datasets >>> from spark_sklearn.grid_search import GridSearchCV >>> from spark_sklearn.util import createLocalSparkSession >>> sc = createLocalSparkSession().sparkContext >>> iris = datasets.load_iris() >>> parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]} >>> svr = svm.SVC(gamma='auto') >>> clf = GridSearchCV(sc, svr, parameters) >>> clf.fit(iris.data, iris.target) ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS GridSearchCV(cv=3, error_score=..., estimator=SVC(C=1.0, cache_size=..., class_weight=..., coef0=..., decision_function_shape=..., degree=..., gamma=..., kernel='rbf', max_iter=-1, probability=False, random_state=None, shrinking=True, tol=..., verbose=False), fit_params={}, iid=..., n_jobs=1, param_grid=..., pre_dispatch=..., refit=..., scoring=..., verbose=...) >>> sorted(clf.cv_results_.keys()) ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS ['mean_fit_time', 'mean_score_time', 'mean_test_score',... 'mean_train_score', 'param_C', 'param_kernel', 'params',... 'rank_test_score', 'split0_test_score',... 'split0_train_score', 'split1_test_score', 'split1_train_score',... 'split2_test_score', 'split2_train_score',... 'std_fit_time', 'std_score_time', 'std_test_score', 'std_train_score'...]
- cv_results_ : dict of numpy (masked) ndarrays
A dict with keys as column headers and values as columns, that can be imported into a pandas
DataFrame
. For instance the below given tableparam_kernel param_gamma param_degree split0_test_score … rank….. ‘poly’ – 2 0.8 … 2 ‘poly’ – 3 0.7 … 4 ‘rbf’ 0.1 – 0.8 … 3 ‘rbf’ 0.2 – 0.9 … 1 will be represented by a
cv_results_
dict of:{ 'param_kernel': masked_array(data = ['poly', 'poly', 'rbf', 'rbf'], mask = [False False False False]...) 'param_gamma': masked_array(data = [-- -- 0.1 0.2], mask = [ True True False False]...), 'param_degree': masked_array(data = [2.0 3.0 -- --], mask = [False False True True]...), 'split0_test_score' : [0.8, 0.7, 0.8, 0.9], 'split1_test_score' : [0.82, 0.5, 0.7, 0.78], 'mean_test_score' : [0.81, 0.60, 0.75, 0.82], 'std_test_score' : [0.02, 0.01, 0.03, 0.03], 'rank_test_score' : [2, 4, 3, 1], 'split0_train_score' : [0.8, 0.9, 0.7], 'split1_train_score' : [0.82, 0.5, 0.7], 'mean_train_score' : [0.81, 0.7, 0.7], 'std_train_score' : [0.03, 0.03, 0.04], 'mean_fit_time' : [0.73, 0.63, 0.43, 0.49], 'std_fit_time' : [0.01, 0.02, 0.01, 0.01], 'mean_score_time' : [0.007, 0.06, 0.04, 0.04], 'std_score_time' : [0.001, 0.002, 0.003, 0.005], 'params' : [{'kernel': 'poly', 'degree': 2}, ...], }
NOTE that the key
'params'
is used to store a list of parameter settings dict for all the parameter candidates. Themean_fit_time
,std_fit_time
,mean_score_time
andstd_score_time
are all in seconds.- best_estimator_ : estimator
- Estimator that was chosen by the search, i.e. estimator which gave highest score (or smallest loss if specified) on the left out data. Not available if refit=False.
- best_score_ : float
- Score of best_estimator on the left out data.
- best_params_ : dict
- Parameter setting that gave the best results on the hold out data.
- best_index_ : int
- The index (of the
cv_results_
arrays) which corresponds to the best candidate parameter setting. The dict atsearch.cv_results_['params'][search.best_index_]
gives the parameter setting for the best model, that gives the highest mean score (search.best_score_
). - scorer_ : function
- Scorer function used on the held out data to choose the best parameters for the model.
- n_splits_ : int
- The number of cross-validation splits (folds/iterations).
The parameters selected are those that maximize the score of the left out data, unless an explicit score is passed in which case it is used instead.
ParameterGrid
:- generates all the combinations of a hyperparameter grid.
sklearn.model_selection.train_test_split()
:- utility function to split the data into a development set usable for fitting a GridSearchCV instance and an evaluation set for its final evaluation.
sklearn.metrics.make_scorer()
:- Make a scorer from a performance metric or loss function.
-
fit
(X, y=None, groups=None)¶ Run fit with all sets of parameters.
- X : array-like, shape = [n_samples, n_features]
- Training vector, where n_samples is the number of samples and n_features is the number of features.
- y : array-like, shape = [n_samples] or [n_samples, n_output], optional
- Target relative to X for classification or regression; None for unsupervised learning.
- groups : array-like, with shape (n_samples,), optional
- Group labels for the samples used while splitting the dataset into train/test set.
-
class
spark_sklearn.
RandomizedSearchCV
(sc, estimator, param_distributions, n_iter=10, scoring=None, fit_params=None, n_jobs=1, iid=True, refit=True, cv=None, verbose=0, pre_dispatch='2*n_jobs', random_state=None, error_score='raise')¶ Bases:
spark_sklearn.base_search.SparkBaseSearchCV
Randomized search on hyper parameters.
RandomizedSearchCV implements a “fit” and a “score” method. It also implements “predict”, “predict_proba”, “decision_function”, “transform” and “inverse_transform” if they are implemented in the estimator used.
The parameters of the estimator used to apply these methods are optimized by cross-validated search over parameter settings.
In contrast to GridSearchCV, not all combinations of parameter values are tried out, but rather a fixed number of parameter settings is sampled from the specified distributions. The number of parameter settings that are tried is given by n_iter.
If all parameters are presented as a list, sampling without replacement is performed. If at least one parameter is given as a distribution, sampling with replacement is used. It is highly recommended to use continuous distributions for continuous parameters.
- estimator : estimator object.
- A object of that type is instantiated for each grid point.
This is assumed to implement the scikit-learn estimator interface.
Either estimator needs to provide a
score
function, orscoring
must be passed. - param_distributions : dict
- Dictionary with parameters names (string) as keys and distributions
or lists of parameters to try. Distributions must provide a
rvs
method for sampling (such as those from scipy.stats.distributions). If a list is given, it is sampled uniformly. - n_iter : int, default=10
- Number of parameter settings that are sampled. n_iter trades off runtime vs quality of the solution.
- scoring : string, callable or None, default=None
- A string (see model evaluation documentation) or
a scorer callable object / function with signature
scorer(estimator, X, y)
. IfNone
, thescore
method of the estimator is used. - fit_params : dict, optional
- Parameters to pass to the fit method.
- n_jobs : int, default=1
- Number of jobs to run in parallel.
- pre_dispatch : int, or string, optional
- Not used; exists for scikit-learn compatibility.
- iid : boolean, default=True
- If True, the data is assumed to be identically distributed across the folds, and the loss minimized is the total loss per sample, and not the mean loss across the folds.
- cv : int, cross-validation generator or an iterable, optional
Determines the cross-validation splitting strategy. Possible inputs for cv are:
- None, to use the default 3-fold cross-validation,
- integer, to specify the number of folds.
- An object to be used as a cross-validation generator.
- An iterable yielding train/test splits.
For integer/None inputs, if the estimator is a classifier and
y
is either binary or multiclass,sklearn.model_selection.StratifiedKFold
is used. In all other cases,sklearn.model_selection.KFold
is used.- refit : boolean, default=True
- Refit the best estimator with the entire dataset. If “False”, it is impossible to make predictions using this RandomizedSearchCV instance after fitting.
- verbose : integer
- Controls the verbosity: the higher, the more messages.
- random_state : int, RandomState instance or None, optional, default=None
- Pseudo random number generator state used for random uniform sampling from lists of possible values instead of scipy.stats distributions. If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by np.random.
- error_score : ‘raise’ (default) or numeric
- Value to assign to the score if an error occurs in estimator fitting. If set to ‘raise’, the error is raised. If a numeric value is given, FitFailedWarning is raised. This parameter does not affect the refit step, which will always raise the error.
- cv_results_ : dict of numpy (masked) ndarrays
A dict with keys as column headers and values as columns, that can be imported into a pandas
DataFrame
. For instance the below given tableparam_kernel param_gamma param_degree split0_test_score … rank_…. ‘poly’ – 2 0.8 … 2 ‘poly’ – 3 0.7 … 4 ‘rbf’ 0.1 – 0.8 … 3 ‘rbf’ 0.2 – 0.9 … 1 will be represented by a
cv_results_
dict of:{ 'param_kernel': masked_array(data = ['poly', 'poly', 'rbf', 'rbf'], mask = [False False False False]...) 'param_gamma': masked_array(data = [-- -- 0.1 0.2], mask = [ True True False False]...), 'param_degree': masked_array(data = [2.0 3.0 -- --], mask = [False False True True]...), 'split0_test_score' : [0.8, 0.7, 0.8, 0.9], 'split1_test_score' : [0.82, 0.5, 0.7, 0.78], 'mean_test_score' : [0.81, 0.60, 0.75, 0.82], 'std_test_score' : [0.02, 0.01, 0.03, 0.03], 'rank_test_score' : [2, 4, 3, 1], 'split0_train_score' : [0.8, 0.9, 0.7], 'split1_train_score' : [0.82, 0.5, 0.7], 'mean_train_score' : [0.81, 0.7, 0.7], 'std_train_score' : [0.03, 0.03, 0.04], 'mean_fit_time' : [0.73, 0.63, 0.43, 0.49], 'std_fit_time' : [0.01, 0.02, 0.01, 0.01], 'mean_score_time' : [0.007, 0.06, 0.04, 0.04], 'std_score_time' : [0.001, 0.002, 0.003, 0.005], 'params' : [{'kernel': 'poly', 'degree': 2}, ...], }
NOTE that the key
'params'
is used to store a list of parameter settings dict for all the parameter candidates. Themean_fit_time
,std_fit_time
,mean_score_time
andstd_score_time
are all in seconds.- best_estimator_ : estimator
- Estimator that was chosen by the search, i.e. estimator which gave highest score (or smallest loss if specified) on the left out data. Not available if refit=False.
- best_score_ : float
- Score of best_estimator on the left out data.
- best_params_ : dict
- Parameter setting that gave the best results on the hold out data.
- best_index_ : int
- The index (of the
cv_results_
arrays) which corresponds to the best candidate parameter setting. The dict atsearch.cv_results_['params'][search.best_index_]
gives the parameter setting for the best model, that gives the highest mean score (search.best_score_
).
The parameters selected are those that maximize the score of the held-out data, according to the scoring parameter.
GridSearchCV
:- Does exhaustive search over a grid of parameters.
ParameterSampler
:- A generator over parameter settings, constructed from param_distributions.
-
fit
(X, y=None, groups=None)¶ Run fit on the estimator with randomly drawn parameters.
- X : array-like, shape = [n_samples, n_features]
- Training vector, where n_samples in the number of samples and n_features is the number of features.
- y : array-like, shape = [n_samples] or [n_samples, n_output], optional
- Target relative to X for classification or regression; None for unsupervised learning.
- groups : array-like, with shape (n_samples,), optional
- Group labels for the samples used while splitting the dataset into train/test set.
-
spark_sklearn.
gapply
(grouped_data, func, schema, *cols)¶ Applies the function
func
to data grouped by key. In particular, given a dataframe grouped by some set of key columns key1, key2, …, keyn, this method groups all the values for each row with the same key columns into a single Pandas dataframe and by default invokesfunc((key1, key2, ..., keyn), values)
where the number and order of the key arguments is determined by columns on which this instance’s parentDataFrame
was grouped andvalues
is apandas.DataFrame
of columns selected bycols
, in that order.If there is only one key then the key tuple is automatically unpacked, with
func(key, values)
called.func
is expected to return apandas.DataFrame
of the specified schemaschema
, which should be of typeStructType
(output columns are of this name and order).If
spark.conf.get("spark.sql.retainGroupColumns")
is notu'true'
, thenfunc
is called with an empty key tuple (note it is set tou'true'
by default).If no
cols
are specified, then all grouped columns will be offered, in the order of the columns in the original dataframe. In either case, the Pandas columns will be named according to the DataFrame column names.The order of the rows passed in as Pandas rows is not guaranteed to be stable relative to the original row order.
Note: Users must ensure that the grouped values for every group must fit entirely in memory.
Note: This method is only available if Pandas is installed.
Parameters: - grouped_data – data grouped by key
- func – a two argument function, which may be either a lambda or named function
- schema – the return schema for
func
, aStructType
- cols – list of column names (string only)
Raises: - ValueError – if
"*"
is incols
- ValueError – if
cols
contains duplicates - ValueError – if
schema
is not aStructType
- ImportError – if
pandas
module is not installed - ImportError – if
pandas
version is too old (less than 0.7.1)
Returns: the new
DataFrame
with the original key columns replicated for each returned value in each group’s resulting pandas dataframe, the schema being the original key schema prepended toschema
, where all the resulting groups’ rows are concatenated. Of course, if retaining group columns is disabled, then the output will exactly matchschema
since no keys can be prepended.>>> import pandas as pd >>> from pyspark.sql import SparkSession >>> from spark_sklearn.group_apply import gapply >>> from spark_sklearn.util import createLocalSparkSession >>> spark = createLocalSparkSession() >>> df = (spark ... .createDataFrame([Row(course="dotNET", year=2012, earnings=10000), ... Row(course="Java", year=2012, earnings=20000), ... Row(course="dotNET", year=2012, earnings=5000), ... Row(course="dotNET", year=2013, earnings=48000), ... Row(course="Java", year=2013, earnings=30000)]) ... .select("course", "year", "earnings")) >>> def yearlyMedian(_, vals): ... all_years = set(vals['year']) ... # Note that interpolation is performed, so we need to cast back to int. ... yearly_median = [(year, int(vals['earnings'][vals['year'] == year].median())) ... for year in all_years] ... return pd.DataFrame.from_records(yearly_median) >>> newSchema = StructType().add("year", LongType()).add("median_earnings", LongType()) >>> gapply(df.groupBy("course"), yearlyMedian, newSchema).orderBy("median_earnings").show() +------+----+---------------+ |course|year|median_earnings| +------+----+---------------+ |dotNET|2012| 7500| | Java|2012| 20000| | Java|2013| 30000| |dotNET|2013| 48000| +------+----+---------------+ <BLANKLINE> >>> def twoKeyYearlyMedian(_, vals): ... return pd.DataFrame.from_records([(int(vals["earnings"].median()),)]) >>> newSchema = StructType([df.schema["earnings"]]) >>> gapply(df.groupBy("course", "year"), twoKeyYearlyMedian, newSchema, "earnings").orderBy( ... "earnings").show() +------+----+--------+ |course|year|earnings| +------+----+--------+ |dotNET|2012| 7500| | Java|2012| 20000| | Java|2013| 30000| |dotNET|2013| 48000| +------+----+--------+ <BLANKLINE> >>> spark.stop(); SparkSession._instantiatedContext = None
Keyed Models¶
The use case that this addresses is where a client has a dataset with many keys - the distribution of which is such that the total number of rows for with a shared key value can be contained completely in memory on a single machine.
This assumption is particularly enabling because clients may wish to apply more intricate single-machine models (such as a scikit-learn estimator) to every user.
The API provided here generalizes the scikit-learn estimator interface to the Spark ML one; in particular, it allows clients to train their scikit-learn estimators in parallel over a grouped and aggregated dataframe.
>>> from sklearn.linear_model import LinearRegression
>>> from sklearn.cluster import KMeans
>>> from pyspark.ml.linalg import Vectors, Matrices, MatrixUDT
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql import SparkSession
>>> from spark_sklearn.util import createLocalSparkSession
>>> from spark_sklearn.keyed_models import KeyedEstimator
>>> spark = createLocalSparkSession()
>>> df = spark.createDataFrame([(user,
... Vectors.dense([i, i ** 2, i ** 3]),
... 0.0 + user + i + 2 * i ** 2 + 3 * i ** 3)
... for user in range(3) for i in range(5)])
>>> df = df.toDF("key", "features", "y")
>>> df.where("5 < y and y < 10").sort("key", "y").show()
+---+-------------+---+
|key| features| y|
+---+-------------+---+
| 0|[1.0,1.0,1.0]|6.0|
| 1|[1.0,1.0,1.0]|7.0|
| 2|[1.0,1.0,1.0]|8.0|
+---+-------------+---+
<BLANKLINE>
>>> km = KeyedEstimator(sklearnEstimator=LinearRegression(), yCol="y").fit(df)
>>> def printFloat(x):
... rounded = round(x, 2)
... return "{:.2f}".format(0 if rounded == 0 else rounded)
...
>>> def printModel(model):
... coef = "[" + ", ".join(map(printFloat, model.coef_)) + "]"
... intercept = printFloat(model.intercept_)
... return "intercept: {} coef: {}".format(intercept, coef)
...
>>> km.keyedModels.columns
['key', 'estimator']
>>> printedModels = udf(printModel)("estimator").alias("linear fit")
>>> km.keyedModels.select("key", printedModels).sort("key").show(truncate=False)
+---+----------------------------------------+
|key|linear fit |
+---+----------------------------------------+
|0 |intercept: 0.00 coef: [1.00, 2.00, 3.00]|
|1 |intercept: 1.00 coef: [1.00, 2.00, 3.00]|
|2 |intercept: 2.00 coef: [1.00, 2.00, 3.00]|
+---+----------------------------------------+
<BLANKLINE>
Now that we have generated a linear model for each key, we can apply it to keyed test data. In the following, we only show one point for simplicity, but the test data can contain multiple points for multiple different keys.
>>> input = spark.createDataFrame([(0, Vectors.dense(3, 1, -1))]).toDF("key", "features")
>>> km.transform(input).withColumn("output", udf(printFloat)("output")).show()
+---+--------------+------+
|key| features|output|
+---+--------------+------+
| 0|[3.0,1.0,-1.0]| 2.00|
+---+--------------+------+
<BLANKLINE>
Suppose we wanted to perform key-based clustering. The most common use case would require just fitting our model.
>>> km = KeyedEstimator(sklearnEstimator=KMeans(random_state=0, n_clusters=2)).fit(df)
>>> def getCentroids(kmeans):
... n_clusters, n_features = kmeans.cluster_centers_.shape
... return Matrices.dense(n_clusters, n_features, kmeans.cluster_centers_.reshape(-1))
...
>>> centroids = udf(getCentroids, MatrixUDT())("estimator").alias("centroids")
>>> km.keyedModels.select("key", centroids).sort("key").show()
+---+--------------------+
|key| centroids|
+---+--------------------+
| 0|4.0 64.0 3.5 ...|
| 1|4.0 64.0 3.5 ...|
| 2|4.0 64.0 3.5 ...|
+---+--------------------+
<BLANKLINE>
Usually, this is all we want. In the case of KMeans
, we can also predict cluster labels,
since the scikit-learn estimator provides this functionality. Note this is not the case for
some other clusterers, such as DBSCAN
.
>>> km.transform(input).withColumnRenamed("output", "cluster label").show()
+---+--------------+-------------+
|key| features|cluster label|
+---+--------------+-------------+
| 0|[3.0,1.0,-1.0]| 1|
+---+--------------+-------------+
<BLANKLINE>
>>> spark.stop(); SparkSession._instantiatedContext = None # clear hidden SparkContext for reuse
-
class
spark_sklearn.keyed_models.
KeyedEstimator
(sklearnEstimator=None, keyCols=['key'], xCol='features', outputCol='output', yCol=None, estimatorType=None)¶ Bases:
pyspark.ml.base.Estimator
A
KeyedEstimator
provides an interface for training per-key scikit-learn estimators.The
KeyedEstimator
can be part of any Spark ML pipeline provided the columns are appropriately matched.Currently, the class provides a generalization for scikit-learn transformers, clusterers, and predictors. Because these scikit-learn estimators all derive from the same base type (yielding the same API), yet have different expectations for what methods should be called and with what arguments, this class enumerates three different types of behavior:
"transformer"
Examples:
sklearn.decomposition.PCA
,sklearn.cluster.KMeans
In this case, the estimator will aggregate the all input features for a given key into a NxD data matrix, where N is the number of rows with the given key and D is the feature space dimensionality; let this matrix be
X
.For each such key and data matrix pair, a clone of the parameter estimator is fitted with
estimator.fit(X)
, inducing a mapping between keys and fitted estimators: this produces a fitted transformerKeyedModel
, whose Spark MLtransform()
method generates an output column by applying each key’s fitted scikit-learn estimator’s owntransform
method.The output column type for transformers will always be a
DenseVector
."clusterer"
Examples:
sklearn.cluster.DBSCAN
,sklearn.cluster.KMeans
As before, the data will be aggregated into a design matrix
X
, andestimator.fit(X)
will be called for each key group.The difference between a
"transformer"
and"clusterer"
lies in their prediction behavior: a clusterer will callestimator.predict()
whereas a transformer refers to thetransform
method.The output column type for clusterers will always be of
LongType
."predictor"
Examples:
sklearn.svm.LinearSVC
,sklearn.linear_model.ElasticNet
Here, the estimator will likewise aggregate input features into the data matrix
X
. In addition, the label column will be aggregated in a collated manner, generating a vectory
for each key. The estimator clone will be fitted withestimator.fit(X, y)
.A predictor
KeyedModel
transforms its input dataframe by generating an output column with the output of the estimator’spredict
method.The output column type for predictors will be the same as the label column (which must be an
AtomicType
(else aTypeError
will be thrown atfit()
-time).
The input column should be numeric or a vector (else a
TypeError
will be thrown atfit()
-time). Don’t use “estimator” as a column name.- In certain cases, a scikit-learn estimator may support both
"transformer"
and"clusterer"
interfaces.sklearn.cluster.KMeans
, for instance, supports both the cluster-labelling operationpredict()
and a transformation into cluster-mean-distance space. Such ambiguity is resolved by prefering clustering. It may be overriden by manually specifying theestimatorType
totransformer
in theKeyedEstimator
constructor. - Key-based grouping only occurs during training. During the transformation/prediction phase of computation, the output is unaggregated: the number of rows inputted as test data will be equal to the number of rows outputted.
spark.conf.get("spark.sql.retainGroupColumns")
assumed to beu"true"
. This is the case by default for Spark 1.4+. This is necessary for both the keyed estimator and the keyed model.- Estimators trained, persisted, and loaded across different scikit-learn versions are not guaranteed to work.
For all instances, the ordered list of
keyCols
determine the set of groups which eachsklearnEstimator
is applied to.For every unique
keyCols
value, the remaining columns are aggregated and used to train the scikit-learn estimator.estimatorType
inference is conducted as follows: ifyCol
is specified, then this is assumed to be of"predictor"
type, else a"transformer"
or a"clusterer"
, depending on the estimator having thetransform()
orfit_predict()
attributes, with"clusterer"
being chosen in case both attributes are present.Parameters: - sklearnEstimator – An instance of a scikit-learn estimator, with parameters configured as desired for each user.
- keyCols – Key column names list used to group data to which models are applied, where order implies lexicographical importance.
- xCol – Name of column of input features used for training and transformation/prediction.
- yCol – Specifies name of label column for regression or classification pipelines.
Required for predictors, must be unspecified or
None
for transformers. - estimatorType – Identifies the type of scikit-learn estimator being used, which
changes the interface the
sklearnEstimator
is expected to have. This parameter’s value is inferred using reflection by default, but may be manually overriden.
Raises: - ValueError – if
sklearnEstimator
isNone
. - ValueError – if
sklearnEstimator
does not derive fromsklearn.base.BaseEstimator
. - ValueError – if
keyCols
is empty. - ValueError – if any column has the name
"estimator"
- AttributeError – if reflection checks indicate that parameter estimator is not equipped
with a
fit()
method.
-
fit
(dataset, params=None)¶ Fits a model to the input dataset with optional parameters.
Parameters: - dataset – input dataset, which is an instance of
pyspark.sql.DataFrame
- params – an optional param map that overrides embedded params. If a list/tuple of param maps is given, this calls fit on each param map and returns a list of models.
Returns: fitted model(s)
New in version 1.3.0.
- dataset – input dataset, which is an instance of
-
sklearnEstimatorType
¶ Returns: the estimator type of this keyed estimator
-
class
spark_sklearn.keyed_models.
KeyedModel
(sklearnEstimator=None, keyCols=None, xCol=None, outputCol=None, yCol=None, estimatorType=None, keyedSklearnEstimators=None, outputType=None)¶ Bases:
pyspark.ml.base.Model
Represents a Spark ML Model, generated by a fitted
KeyedEstimator
.Wraps fitted scikit-learn estimators - at transformation time transforms the input for each key using a key-specific model. See
KeyedEstimator
documentation for details.If no estimator is present for a given key at transformation time, the prediction is null.
The constructor is used by
KeyedEstimator
to generate aKeyedModel
; it is not intended for external use.-
keyedModels
¶ Returns: Returns the keyedSklearnEstimators
param, aDataFrame
with columnskeyCols
(where each key is unique) and the column"estimator"
containing the fitted scikit-learn estimator as aSparkSklearnEstimator
.
-
sklearnEstimatorType
¶ Returns: the estimator type of this keyed model
-
transform
(dataset, params=None)¶ Transforms the input dataset with optional parameters.
Parameters: - dataset – input dataset, which is an instance of
pyspark.sql.DataFrame
- params – an optional param map that overrides embedded params.
Returns: transformed dataset
New in version 1.3.0.
- dataset – input dataset, which is an instance of
-
-
class
spark_sklearn.keyed_models.
SparkSklearnEstimator
(estimator)¶ Bases:
object
SparkSklearnEstimator
is a wrapper for containing scikit-learn estimators in dataframes - any estimators need to be stored inside the wrapper class to be properly serialized/deserialized in dataframe operations.Note any method called on the estimator this object wraps may be called on the wrapper instead.
Initializes with the parameter estimator.
Param: estimator: scikit-learn estimator to contain. -
estimator
¶ Returns: the underlying estimator
-