The mlflow.spark
module provides an API for logging and loading Spark MLlib models. This module exports Spark MLlib models with the following flavors:
Allows models to be loaded as Spark Transformers for scoring in a Spark session. Models with this flavor can be loaded as PySpark PipelineModel objects in Python. This is the main flavor and is always produced.
mlflow.pyfunc
Supports deployment outside of Spark by instantiating a SparkContext and reading input data as a Spark DataFrame prior to scoring. Also supports deployment in Spark as a Spark UDF. Models with this flavor can be loaded as Python functions for performing inference. This flavor is always produced.
Note
Autologging is known to be compatible with the following package versions: 3.3.0
<= pyspark
<= 4.0.0
. Autologging may not succeed when used with package versions outside of this range.
Enables (or disables) and configures logging of Spark datasource paths, versions (if applicable), and formats when they are read. This method is not threadsafe and assumes a SparkSession already exists with the mlflow-spark JAR attached. It should be called on the Spark driver, not on the executors (i.e. do not call this method within a function parallelized by Spark). The mlflow-spark JAR used must match the Scala version of Spark. Please see the Maven Repository for available versions. This API requires Spark 3.0 or above.
Datasource information is cached in memory and logged to all subsequent MLflow runs, including the active MLflow run (if one exists when the data is read). Note that autologging of Spark ML (MLlib) models is not currently supported via this API. Datasource autologging is best-effort, meaning that if Spark is under heavy load or MLflow logging fails for any reason (e.g., if the MLflow server is unavailable), logging may be dropped.
For any unexpected issues with autologging, check Spark driver and executor logs in addition to stderr & stdout generated from your MLflow code - datasource information is pulled from Spark, so logs relevant to debugging may show up amongst the Spark logs.
Note
Spark datasource autologging only supports logging to MLflow runs in a single thread
import mlflow.spark import os import shutil from pyspark.sql import SparkSession # Create and persist some dummy data # Note: the 2.12 in 'org.mlflow:mlflow-spark_2.12:2.16.2' below indicates the Scala # version, please match this with that of Spark. The 2.16.2 indicates the mlflow version. # Note: On environments like Databricks with pre-created SparkSessions, # ensure the org.mlflow:mlflow-spark_2.12:2.16.2 is attached as a library to # your cluster spark = ( SparkSession.builder.config( "spark.jars.packages", "org.mlflow:mlflow-spark_2.12:2.16.2", ) .master("local[*]") .getOrCreate() ) df = spark.createDataFrame( [(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")], ["id", "text"], ) import tempfile tempdir = tempfile.mkdtemp() df.write.csv(os.path.join(tempdir, "my-data-path"), header=True) # Enable Spark datasource autologging. mlflow.spark.autolog() loaded_df = spark.read.csv( os.path.join(tempdir, "my-data-path"), header=True, inferSchema=True ) # Call toPandas() to trigger a read of the Spark datasource. Datasource info # (path and format) is logged to the current active run, or the # next-created MLflow run if no run is currently active with mlflow.start_run() as active_run: pandas_df = loaded_df.toPandas()
disable â If True
, disables the Spark datasource autologging integration. If False
, enables the Spark datasource autologging integration.
silent â If True
, suppress all event logs and warnings from MLflow during Spark datasource autologging. If False
, show all events and warnings during Spark datasource autologging.
The default Conda environment for MLflow Models produced by calls to save_model()
and log_model()
. This Conda environment contains the current version of PySpark that is installed on the callerâs system. dev
versions of PySpark are replaced with stable versions in the resulting Conda environment (e.g., if you are running PySpark version 2.4.5.dev0
, invoking this method produces a Conda environment with a dependency on PySpark version 2.4.5
).
A list of default pip requirements for MLflow Models produced by this flavor. Calls to save_model()
and log_model()
produce a pip environment that, at minimum, contains these requirements.
Load the Spark MLlib model from the path.
model_uri â
The location, in URI format, of the MLflow model, for example:
/Users/me/path/to/local/model
relative/path/to/local/model
s3://my_bucket/path/to/model
runs:/<mlflow_run_id>/run-relative/path/to/model
models:/<model_name>/<model_version>
models:/<model_name>/<stage>
For more information about supported URI schemes, see Referencing Artifacts.
dfs_tmpdir â Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is loaded from this destination. Defaults to /tmp/mlflow
.
dst_path â The local filesystem path to which to download the model artifact. This directory must already exist. If unspecified, a local output path will be created.
pyspark.ml.pipeline.PipelineModel
import mlflow model = mlflow.spark.load_model("spark-model") # Prepare test documents, which are unlabeled (id, text) tuples. test = spark.createDataFrame( [(4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")], ["id", "text"], ) # Make predictions on test documents prediction = model.transform(test)
Log a Spark MLlib model as an MLflow artifact for the current run. This uses the MLlib persistence format and produces an MLflow Model with the Spark flavor.
Note: If no run is active, it will instantiate a run to obtain a run_id.
spark_model â
Spark model to be saved - MLflow can only save descendants of pyspark.ml.Model or pyspark.ml.Transformer which implement MLReadable and MLWritable.
Note
The provided Spark modelâs transform method must generate one column named with âpredictionâ, the column is used as MLflow pyfunc model output. Most Spark models generate the output column with âpredictionâ name that contains prediction labels by default. To set probability column as the output column for probabilistic classification models, you need to set âprobabilityColâ param to âpredictionâ and set âpredictionColâ param to ââ. (e.g. model.setProbabilityCol(âpredictionâ).setPredictionCol(ââ))
artifact_path â Run relative artifact path.
conda_env â
Either a dictionary representation of a Conda environment or the path to a conda environment yaml file. If provided, this describes the environment this model should be run in. At a minimum, it should specify the dependencies contained in get_default_conda_env(). If None
, a conda environment with pip requirements inferred by mlflow.models.infer_pip_requirements()
is added to the model. If the requirement inference fails, it falls back to using get_default_pip_requirements. pip requirements from conda_env
are written to a pip requirements.txt
file and the full conda environment is written to conda.yaml
. The following is an example dictionary representation of a conda environment:
{ "name": "mlflow-env", "channels": ["conda-forge"], "dependencies": [ "python=3.8.15", { "pip": [ "pyspark==x.y.z" ], }, ], }
code_paths â
A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are prepended to the system path when the model is loaded. Files declared as dependencies for a given model should have relative imports declared from a common root path if multiple files are defined with import dependencies between them to avoid import errors when loading the model.
For a detailed explanation of code_paths
functionality, recommended usage patterns and limitations, see the code_paths usage guide.
dfs_tmpdir â Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is written in this destination and then copied into the modelâs artifact directory. This is necessary as Spark ML models read from and write to DFS if running on a cluster. If this operation completes successfully, all temporary files created on the DFS are removed. Defaults to /tmp/mlflow
. For models defined in pyspark.ml.connect module, this param is ignored.
registered_model_name â If given, create a model version under registered_model_name
, also creating a registered model if one with the given name does not exist.
signature â
A Model Signature object that describes the input and output Schema of the model. The model signature can be inferred using infer_signature function of mlflow.models.signature. Note if your Spark model contains Spark ML vector type input or output column, you should create SparkMLVector
vector type for the column, infer_signature function can also infer SparkMLVector
vector type correctly from Spark Dataframe input / output. When loading a Spark ML model with SparkMLVector
vector type input as MLflow pyfunc model, it accepts Array[double]
type input. MLflow internally converts the array into Spark ML vector and then invoke Spark model for inference. Similarly, if the model has vector type output, MLflow internally converts Spark ML vector output data into Array[double]
type inference result.
from mlflow.models import infer_signature from pyspark.sql.functions import col from pyspark.ml.classification import LogisticRegression from pyspark.ml.functions import array_to_vector import pandas as pd import mlflow train_df = spark.createDataFrame( [([3.0, 4.0], 0), ([5.0, 6.0], 1)], schema="features array<double>, label long" ).select(array_to_vector("features").alias("features"), col("label")) lor = LogisticRegression(maxIter=2) lor.setPredictionCol("").setProbabilityCol("prediction") lor_model = lor.fit(train_df) test_df = train_df.select("features") prediction_df = lor_model.transform(train_df).select("prediction") signature = infer_signature(test_df, prediction_df) with mlflow.start_run() as run: model_info = mlflow.spark.log_model( lor_model, "model", signature=signature, ) # The following signature is outputted: # inputs: # ['features': SparkML vector (required)] # outputs: # ['prediction': SparkML vector (required)] print(model_info.signature) loaded = mlflow.pyfunc.load_model(model_info.model_uri) test_dataset = pd.DataFrame({"features": [[1.0, 2.0]]}) # `loaded.predict` accepts `Array[double]` type input column, # and generates `Array[double]` type output column. print(loaded.predict(test_dataset))
input_example â one or several instances of valid model input. The input example is used as a hint of what data to feed the model. It will be converted to a Pandas DataFrame and then serialized to json using the Pandas split-oriented format, or a numpy array where the example will be serialized to json by converting it to a list. Bytes are base64-encoded. When the signature
parameter is None
, the input example is used to infer a model signature.
await_registration_for â Number of seconds to wait for the model version to finish being created and is in READY
status. By default, the function waits for five minutes. Specify 0 or None to skip waiting.
pip_requirements â Either an iterable of pip requirement strings (e.g. ["pyspark", "-r requirements.txt", "-c constraints.txt"]
) or the string path to a pip requirements file on the local filesystem (e.g. "requirements.txt"
). If provided, this describes the environment this model should be run in. If None
, a default list of requirements is inferred by mlflow.models.infer_pip_requirements()
from the current software environment. If the requirement inference fails, it falls back to using get_default_pip_requirements. Both requirements and constraints are automatically parsed and written to requirements.txt
and constraints.txt
files, respectively, and stored as part of the model. Requirements are also written to the pip
section of the modelâs conda environment (conda.yaml
) file.
extra_pip_requirements â
Either an iterable of pip requirement strings (e.g. ["pandas", "-r requirements.txt", "-c constraints.txt"]
) or the string path to a pip requirements file on the local filesystem (e.g. "requirements.txt"
). If provided, this describes additional pip requirements that are appended to a default set of pip requirements generated automatically based on the userâs current software environment. Both requirements and constraints are automatically parsed and written to requirements.txt
and constraints.txt
files, respectively, and stored as part of the model. Requirements are also written to the pip
section of the modelâs conda environment (conda.yaml
) file.
Warning
The following arguments canât be specified at the same time:
conda_env
pip_requirements
extra_pip_requirements
This example demonstrates how to specify pip requirements using pip_requirements
and extra_pip_requirements
.
metadata â Custom metadata dictionary passed to the model and stored in the MLmodel file.
A ModelInfo
instance that contains the metadata of the logged model.
from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer training = spark.createDataFrame( [ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0), ], ["id", "text", "label"], ) tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.001) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) model = pipeline.fit(training) mlflow.spark.log_model(model, "spark-model")
Save a Spark MLlib Model to a local path.
By default, this function saves models using the Spark MLlib persistence mechanism.
spark_model â Spark model to be saved - MLflow can only save descendants of pyspark.ml.Model or pyspark.ml.Transformer which implement MLReadable and MLWritable.
path â Local path where the model is to be saved.
mlflow_model â MLflow model config this flavor is being added to.
conda_env â
Either a dictionary representation of a Conda environment or the path to a conda environment yaml file. If provided, this describes the environment this model should be run in. At a minimum, it should specify the dependencies contained in get_default_conda_env(). If None
, a conda environment with pip requirements inferred by mlflow.models.infer_pip_requirements()
is added to the model. If the requirement inference fails, it falls back to using get_default_pip_requirements. pip requirements from conda_env
are written to a pip requirements.txt
file and the full conda environment is written to conda.yaml
. The following is an example dictionary representation of a conda environment:
{ "name": "mlflow-env", "channels": ["conda-forge"], "dependencies": [ "python=3.8.15", { "pip": [ "pyspark==x.y.z" ], }, ], }
code_paths â
A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are prepended to the system path when the model is loaded. Files declared as dependencies for a given model should have relative imports declared from a common root path if multiple files are defined with import dependencies between them to avoid import errors when loading the model.
For a detailed explanation of code_paths
functionality, recommended usage patterns and limitations, see the code_paths usage guide.
dfs_tmpdir â Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is be written in this destination and then copied to the requested local path. This is necessary as Spark ML models read from and write to DFS if running on a cluster. All temporary files created on the DFS are removed if this operation completes successfully. Defaults to /tmp/mlflow
.
signature â See the document of argument signature
in mlflow.spark.log_model()
.
input_example â one or several instances of valid model input. The input example is used as a hint of what data to feed the model. It will be converted to a Pandas DataFrame and then serialized to json using the Pandas split-oriented format, or a numpy array where the example will be serialized to json by converting it to a list. Bytes are base64-encoded. When the signature
parameter is None
, the input example is used to infer a model signature.
pip_requirements â Either an iterable of pip requirement strings (e.g. ["pyspark", "-r requirements.txt", "-c constraints.txt"]
) or the string path to a pip requirements file on the local filesystem (e.g. "requirements.txt"
). If provided, this describes the environment this model should be run in. If None
, a default list of requirements is inferred by mlflow.models.infer_pip_requirements()
from the current software environment. If the requirement inference fails, it falls back to using get_default_pip_requirements. Both requirements and constraints are automatically parsed and written to requirements.txt
and constraints.txt
files, respectively, and stored as part of the model. Requirements are also written to the pip
section of the modelâs conda environment (conda.yaml
) file.
extra_pip_requirements â
Either an iterable of pip requirement strings (e.g. ["pandas", "-r requirements.txt", "-c constraints.txt"]
) or the string path to a pip requirements file on the local filesystem (e.g. "requirements.txt"
). If provided, this describes additional pip requirements that are appended to a default set of pip requirements generated automatically based on the userâs current software environment. Both requirements and constraints are automatically parsed and written to requirements.txt
and constraints.txt
files, respectively, and stored as part of the model. Requirements are also written to the pip
section of the modelâs conda environment (conda.yaml
) file.
Warning
The following arguments canât be specified at the same time:
conda_env
pip_requirements
extra_pip_requirements
This example demonstrates how to specify pip requirements using pip_requirements
and extra_pip_requirements
.
metadata â Custom metadata dictionary passed to the model and stored in the MLmodel file.
from mlflow import spark from pyspark.ml.pipeline import PipelineModel # your pyspark.ml.pipeline.PipelineModel type model = ... mlflow.spark.save_model(model, "spark-model")
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4