In order to deploy SparkML pipeline model for serving (online prediction), you can take either of the following 3 approaches.
- Convert to MLeap format (see here) and serve in a single container without Apache Spark.
This approach is lightweight, but you cannot always take this method, because not all model in Spark can be serialized into MLeap format (such as, LightGBM model built with SynapseML library).
You can use pre-built MLeap serving image with Spring Boot framework, and then quickly serve MLeap pipeline on this container. See here for this example. - With single instance of Apache Spark installation, provide Spark ML serving by interacting with Spark session.
I’ll discuss this approach in this post. (See here for source code.) - Use Spark structured streaming (see here) with HTTP input/output streams in Apache Spark.
You can use open-source SynapseML (formerly, MMLSpark) library, in which you can serve Spark ML pipeline model in HTTP-triggered structured streaming with load balancing. (You don’t need to manually implement HTTP triggered streaming.)
See here for details about SynapseML model serving capability.
Note : See here for SparkML-flavor MLflow model serving (deployment) in Databricks.
Currently MLflow model deployment in Azure Machine Learning doesn’t support Spark flavor model.
In this post, I’ll show you how to implement the second approach with Azure Machine Learning.
As you’ll find in this post, you can quickly deploy and provide online prediction of Spark ML model on servers with Apache Spark installed.
1. Train your model (Databricks)
First of all, train and create model on Apache Spark.
In this example, I will run this script (see below) and create Spark ML model (flight_model
) on Azure Databricks.
In this script, I predict the flight delay over 15 minutes (ARR_DEL15
column) using flight information and weather information – such as, airport (departure/destination), aircraft career, wind speed (departure/destination), visibility (departure/destination), so on and so forth.
# Read dataset from Azure Blobfrom pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampTypedf = (sqlContext.read.format("csv"). option("header", "true"). option("nullValue", "NA"). option("inferSchema", True). load("abfss://container01@demostore01.dfs.core.windows.net/flight_weather.csv"))# Set ARR_DEL15 = 1 if it's canceled.from pyspark.sql.functions import whendf = df.withColumn("ARR_DEL15", when(df["CANCELLED"] == 1, 1).otherwise(df["ARR_DEL15"]))# Remove flights if it's diverted.df = df.filter(df["DIVERTED"] == 0)# Select required columnsdf = df.select( "ARR_DEL15", "MONTH", "DAY_OF_WEEK", "UNIQUE_CARRIER", "ORIGIN", "DEST", "CRS_DEP_TIME", "CRS_ARR_TIME", "RelativeHumidityOrigin", "AltimeterOrigin", "DryBulbCelsiusOrigin", "WindSpeedOrigin", "VisibilityOrigin", "DewPointCelsiusOrigin", "RelativeHumidityDest", "AltimeterDest", "DryBulbCelsiusDest", "WindSpeedDest", "VisibilityDest", "DewPointCelsiusDest")# Drop rows with null valuedf = df.dropna()# Transformation : convert categorical values (carrier code, airport code, ...) to index valuesfrom pyspark.ml.feature import StringIndexeruniqueCarrierIndexer = StringIndexer(inputCol="UNIQUE_CARRIER", outputCol="Indexed_UNIQUE_CARRIER").fit(df)originIndexer = StringIndexer(inputCol="ORIGIN", outputCol="Indexed_ORIGIN").fit(df)destIndexer = StringIndexer(inputCol="DEST", outputCol="Indexed_DEST").fit(df)arrDel15Indexer = StringIndexer(inputCol="ARR_DEL15", outputCol="Indexed_ARR_DEL15").fit(df)# Transformation : assemble feature columns as a vector columnfrom pyspark.ml.feature import VectorAssemblerassembler = VectorAssembler( inputCols = ["MONTH","DAY_OF_WEEK","Indexed_UNIQUE_CARRIER","Indexed_ORIGIN","Indexed_DEST","CRS_DEP_TIME","CRS_ARR_TIME","RelativeHumidityOrigin","AltimeterOrigin","DryBulbCelsiusOrigin","WindSpeedOrigin","VisibilityOrigin","DewPointCelsiusOrigin","RelativeHumidityDest","AltimeterDest","DryBulbCelsiusDest","WindSpeedDest","VisibilityDest","DewPointCelsiusDest"], outputCol = "features")# Generate decision tree classifierfrom pyspark.ml.classification import DecisionTreeClassifierclassifier = DecisionTreeClassifier( featuresCol="features", labelCol="ARR_DEL15", maxDepth=15, maxBins=500)# Create pipeline and Trainfrom pyspark.ml import Pipelinepipeline = Pipeline(stages=[uniqueCarrierIndexer, originIndexer, destIndexer, arrDel15Indexer, assembler, classifier])model = pipeline.fit(df)# Save pipelinemodel.write().overwrite().save("/mnt/testblob/flight_model")
The generated model (/mnt/testblob/flight_model
) is Spark ML pipeline model format, which includes not only model parameters, but also includes entire pipeline – such as, data transformations.
2. Register model (AML)
Register this generated model (/mnt/testblob/flight_model
) into Azure Machine Learning (Azure ML) with Azure ML CLI command as follows.
az ml model create --name fight_delay_model \ --version 1 \ --path /mnt/testblob/flight_model \ --resource-group $my_resource_group \ --workspace-name $my_workspace
3. Build container image with Apache Spark installation (AML)
Let’s build a docker container image for online prediction.
In this example, I’ll install and configure Apache Spark (which runs on Java runtime), Python environment, and PySpark settings. (Because the program code will be written in Python script later.)
In Azure Machine Learning, you can configure these environment settings with YAML as follows.
As you can see below, the image is built from AML minimal inferencing image (mcr.microsoft.com/azureml/minimal-ubuntu20.04-py38-cpu-inference:latest
).
./env_sparkml_serving.yml
$schema: https://azuremlschemas.azureedge.net/latest/environment.schema.jsonname: sparkml-serving-envbuild: path: docker-contextdescription: environment for SparkML serving
./docker-context/Dockerfile
FROM mcr.microsoft.com/azureml/minimal-ubuntu20.04-py38-cpu-inference:latestUSER root:root# Install JavaRUN mkdir -p /usr/share/man/man1RUN apt-get update -y && \apt-get install -y openjdk-8-jdkENV JAVA_HOME='/usr/lib/jvm/java-8-openjdk-amd64'# Install Apache SparkRUN wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz && \ tar xzf spark-3.1.2-bin-hadoop3.2.tgz -C /opt && \ mv /opt/spark-3.1.2-bin-hadoop3.2 /opt/spark && \ rm spark-3.1.2-bin-hadoop3.2.tgzENV SPARK_HOME=/opt/sparkENV PYSPARK_PYTHON=pythonENV PYTHONPATH=$PYTHONPATH:$SPARK_HOME/python# Install additional packagesWORKDIR /COPY requirements.txt .RUN pip install -r requirements.txt && rm requirements.txt
./docker-context/requirements.txt
azureml-defaultsnumpypyspark
By running the following command (Azure ML CLI), you can create an AML environment, in which the container image will be built and registered in use.
az ml environment create --file env_sparkml_serving.yml \ --resource-group $my_resource_group \ --workspace-name $my_workspace
Note : You can also register this image in public container repository – such as, Docker Hub or Azure Container Registry (ACR) – and use it in Azure ML.
Note : In Azure ML, you can also use built-in image (called curated environment), “
AzureML-PySpark-MmlSpark-0.15
“, in which Apache Spark and PySpark is already installed and configured.
4. Create serving script (AML)
In order to provide inference, here we create the following entry script (serving script) written in Python.
In Azure Machine Learning, init()
is executed at starting your web service, and run()
is invoked by user’s inference requests.
As you can see below, we will start Spark app and build Spark session in init()
, and run inference with this Spark context.
./script/inference.py
import osimport jsonfrom pyspark.sql import SparkSessionfrom pyspark.ml import PipelineModeldef init(): global spark global loaded_model spark = SparkSession.builder.appName("flight_delay_serving").getOrCreate() model_path = os.path.join(os.getenv("AZUREML_MODEL_DIR"),"flight_model" ) loaded_model = PipelineModel.load(model_path) def run(raw_data): try:input_list = json.loads(raw_data)["data"]sc = spark.sparkContextinput_rdd = sc.parallelize(input_list)input_df = input_rdd.toDF()pred_df = loaded_model.transform(input_df)pred_list = pred_df.collect()pred_array = [int(x["prediction"]) for x in pred_list]return pred_array except Exception as e:result = str(e)return "Internal Exception : " + result
Note : In order to use additional packages, specify
config()
in builder creation.
For instance, when you use Azure storage, you can add package configuration as follows. (See here for details about settings.)spark = (SparkSession.builder\ .appName("flight_delay_serving")\ .config("spark.jars.packages","org.apache.hadoop:hadoop-azure:3.2.0") .config("spark.jars.packages","com.microsoft.azure:azure-storage:8.6.3") .getOrCreate())
5. Deploy model for serving (AML)
Now let’s deploy this Spark ML model serving on Azure ML managed endpoint.
First, create a managed online endpoint with CLI command as follows.
az ml online-endpoint create --file managed_endpoint.yml \ --resource-group $my_resource_group \ --workspace-name $my_workspace
./managed_endpoint.yml
$schema: https://azuremlschemas.azureedge.net/latest/managedOnlineEndpoint.schema.jsonname: sparkml-test01auth_mode: key
Now let’s get all together and deploy on this endpoint by running the following command.
az ml online-deployment create --file managed_deployment.yml \ --resource-group $my_resource_group \ --workspace-name $my_workspace \ --all-traffic
./managed_deployment.yml
$schema: https://azuremlschemas.azureedge.net/latest/managedOnlineDeployment.schema.jsonname: sparkml-deployment-v1endpoint_name: sparkml-test01model: azureml:fight_delay_model@latestcode_configuration: code: ./script scoring_script: inference.pyenvironment: azureml:sparkml-serving-env@latestinstance_type: Standard_DS3_v2instance_count: 1
6. Test your web service (Python)
After online prediction (web service) is successfully deployed, you can test this web service as follows.
Note : See here for how to get
endpoint_url
andauthorization_key
in below code.
import requestsendpoint_url = "https://sparkml-test01.eastus.inference.ml.azure.com/score"authorization_key = "dcSSJx9Oc6NDlkQCN3BV3sWDQQtOKg3n"headers = { "Content-Type":"application/json", "Authorization":("Bearer " + authorization_key),}input_data = """{ "data": [{ "MONTH": 1, "DAY_OF_WEEK": 1, "UNIQUE_CARRIER": "AA", "ORIGIN": "ABQ", "DEST": "DFW", "CRS_DEP_TIME": 9, "CRS_ARR_TIME": 12, "RelativeHumidityOrigin": 23.0, "AltimeterOrigin": 30.55, "DryBulbCelsiusOrigin": 9.4, "WindSpeedOrigin": 3.0, "VisibilityOrigin": 10.0, "DewPointCelsiusOrigin": -10.6, "RelativeHumidityDest": 35.0, "AltimeterDest": 30.6, "DryBulbCelsiusDest": 7.2, "WindSpeedDest": 7.0, "VisibilityDest": 10.0, "DewPointCelsiusDest": -7.2},{ "MONTH": 1, "DAY_OF_WEEK": 1, "UNIQUE_CARRIER": "AA", "ORIGIN": "BNA", "DEST": "DFW", "CRS_DEP_TIME": 12, "CRS_ARR_TIME": 15, "RelativeHumidityOrigin": 78.5, "AltimeterOrigin": 30.05, "DryBulbCelsiusOrigin": 10.8, "WindSpeedOrigin": 1.5, "VisibilityOrigin": 8.0, "DewPointCelsiusOrigin": 7.1, "RelativeHumidityDest": 86.0, "AltimeterDest": 29.86, "DryBulbCelsiusDest": 9.4, "WindSpeedDest": 18.0, "VisibilityDest": 6.0, "DewPointCelsiusDest": 7.2} ]}"""http_res = requests.post( endpoint_url, input_data, headers = headers)print("Predicted : ", http_res.text)
Result :
Predicted : [0, 0]
You can download and run notebook (entire source code) from here.
[Change Logs]
Aug 2022 Updated to meet AML CLI and Python SDK v2 (not using AML curated environment)
Categories: Uncategorized
Great blog! Very insightful.
LikeLike
Thanks a lot! Wonderfully written . I used this for storing the preprocessing pipeline and then use a DL model. Any other way we can connect?
LikeLike