By Nirajkanth Ravichandran
Introduction
In the ever-evolving landscape of data science and machine learning, the ability to efficiently process and analyze large datasets is crucial. Dask, a flexible parallel computing library in Python, provides a powerful solution to tackle big data challenges by allowing users to create parallel algorithms with familiar APIs. When combined with Kubernetes, an open-source container orchestration platform, Dask can further scale its capabilities. This article delves into the integration of Dask-SQL with a Dask Kubernetes cluster for machine learning tasks, showcasing the process through the example of training a classifier model on the popular IRIS dataset.
The code I employed for the experiment is available in this notebook.
Setting Up the Kubernetes Cluster
Before diving into Dask-SQL and machine learning, it’s essential to have a functional Kubernetes cluster. If you’re not familiar with setting up a cluster, you can refer to my previous article on how to configure a Kubernetes cluster in your local machine.
Introducing Dask-SQL
Dask-SQL is an extension of Dask that allows SQL queries to be executed directly on Dask dataframes. This seamless integration between Dask and SQL empowers data scientists with SQL’s expressive querying capabilities while leveraging Dask’s parallel processing abilities. The integration of Dask-SQL within a Kubernetes-based Dask cluster opens doors to processing vast datasets efficiently and performing complex analytical tasks.
Machine Learning with Dask-SQL on Kubernetes
Let’s walk through a practical application of Dask-SQL for machine learning within the Kubernetes environment. We’ll use the IRIS dataset for simplicity, but the same approach can be extended to larger datasets.
To begin our exploration, let’s initiate Dask-Kubernetes, which is also referred to as KubeClusters, as demonstrated in the code examples below. Please be aware that if you come across any packages that are not found on the scheduler, you can install them using extra pip/conda packages.
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name="daskmlcluster",
image='ghcr.io/dask/dask:2023.7.0-py3.10',
n_workers=2,
resources={"requests": {"memory": "0.5Gi"}, "limits": {"memory": "1.5Gi"}},
env={
'EXTRA_PIP_PACKAGES':"dask-sql==2023.8.0 dask-ml==2023.3.24 scikit-learn==1.3.0"
}
)
cluster
from dask.distributed import Client
# Connect Dask to the cluster
client = Client(cluster)
client
Data Loading and Exploration
Before loading the data, it is necessary to initiate context class form Dask-SQL which provides an environment where you can execute SQL queries on Dask dataframes.
from dask_sql import Context
c = Context() # Python equivalent to a SQL database
Read the data from internet and register the data as iris in the dask_sql.context that we created above.
c.sql("""CREATE OR REPLACE TABLE iris WITH (
location = 'https://datahub.io/machine-learning/iris/r/iris.csv',
persist = True)"""
)
Let’s verify whether the data registration is successful.
c.sql("""SHOW TABLES FROM root""").compute()
As we can see, the data has been registered successfully with named ‘iris’.
Let’s visualize how the data looks.
c.sql("""SELECT * From iris""").compute()
Given that the ‘class’ column is a categorical variable, it’s crucial to examine its distinct values. You can accomplish this by using the following code:
c.sql("""SELECT DISTINCT From iris""").compute()
Model Training
Dask-SQL provides support for Scikit-Learn-based machine learning models. In this specific experiment, I am utilizing the GradientBoostingClassifier from the Scikit-Learn library. Given that we are working with the Iris dataset, the transformation of a categorical column into a numerical format becomes crucial. To achieve this, a simple SQL query can be applied while the model training process as follows.
c.sql("""
CREATE MODEL sql_model WITH (
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
wrap_predict = True,
target_column = 'target'
) AS (
SELECT sepallength, sepalwidth, petallength, petalwidth,
CASE
WHEN class = 'Iris-setosa' THEN 0
WHEN class = 'Iris-versicolor' THEN 1
WHEN class = 'Iris-virginica' THEN 2
END AS target
FROM iris
LIMIT 100
)
""")
This will train the model and store within the Dask-SQL context under the name ‘sql_model’. We can verify as follows.
c.sql("""SHOW MODELS""").compute()
Hyperparameter Tuning
Following the initial model training, lets employ Dask’s parallel computing capabilities for hyperparameter tuning. Utilize Dask’s distributed computing to execute hyperparameter searches across the Kubernetes cluster. By parallelizing this process, you speed up the search for optimal hyperparameters, enhancing the model’s performance.
query = """
CREATE EXPERIMENT my_exp WITH (
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
experiment_class = 'sklearn.model_selection.GridSearchCV',
tune_parameters = (n_estimators = ARRAY [16, 32, 2],
learning_rate = ARRAY [0.1,0.01,0.001],
max_depth = ARRAY [3,4,5,10]
),
target_column = 'target'
) AS (
SELECT sepallength, sepalwidth, petallength, petalwidth,
CASE
WHEN class = 'Iris-setosa' THEN 0
WHEN class = 'Iris-versicolor' THEN 1
WHEN class = 'Iris-virginica' THEN 2
END AS target
FROM iris
LIMIT 100
)
"""
result1 = c.sql(query)
The provided code snippet will train multiple models using the specified parameters and save the best-performing model with the identifier ‘my_exp’.
Additionally, you have the capability to access all the models that were trained during the hyperparameter tuning process, as demonstrated in the following code snippets.
sorted_r = result1.sort_values(by='mean_test_score', ascending=False)
sorted_r.compute().head()
We can observe the Dask dashboard, which illustrates the distributed training occurring across the cluster as shown below.
Verify the stored model
c.sql("""SHOW MODELS""").compute()
Now we can see there are two model has been registered in the Dask-SQL context which are model without hyperparameter tuning (sql_model) and model with hyperparameter tuning (my_exp).
Model Prediction
Once the model is trained and tuned, predictions can be done from the stored model as follows.
c.sql("""
SELECT * FROM PREDICT (
MODEL my_exp,
SELECT sepallength, sepalwidth, petallength, petalwidth,
CASE
WHEN class = 'Iris-setosa' THEN 0
WHEN class = 'Iris-versicolor' THEN 1
WHEN class = 'Iris-virginica' THEN 2
END AS actual
FROM iris
OFFSET 50
)
""").compute()
Here are the predictions. It is clear that, there is room for further improvement through various optimization techniques. However, it’s important to note that the primary focus of this article is to showcase Dask-SQL machine learning on a Kubernetes cluster. Therefore, for the purpose of this article, we will skip delving into the optimization techniques.
Conclusion
In this article, the integration of Dask-SQL with a Kubernetes-driven Dask cluster has been demonstrated, showcasing its prowess in efficient and scalable machine learning workflows. The fusion of Dask’s parallel processing with SQL querying via Dask-SQL enables streamlined data analysis, as exemplified by training a Gradient Boosting Classifier on the Iris dataset. Leveraging Dask’s dynamic scaling and Kubernetes’ resource management, the collaboration empowers data scientists to handle extensive datasets adeptly. While optimization techniques are pertinent for further enhancement, this article’s focus remains on illustrating the potential of Dask-SQL machine learning within the Kubernetes context, offering a pathway for effective, parallelized, and scalable data analysis and machine learning in the era of significant data challenges.
Visit Axiata Digital Labs to find out more about our products and services.
References
1. https://dask-sql.readthedocs.io/en/latest/machine_learning.html
2. https://kubernetes.dask.org/en/latest/
3. https://datahub.io/machine-learning/iris
Disclaimer: ADL is not responsible for any damage caused by any of the articles to any internal or external parties.