Project Idea: Our project will be to build a recommendation system for Netflix users based on their viewing history. We will use Apache Kafka to stream user’s viewing history to a machine learning model that will make recommendations based on the user’s previous viewing history.
Here are the steps we will follow:
- Install Kafka and ZooKeeper on your machine
- Download and extract the Netflix dataset
- Create a Kafka producer to stream the data
- Create a Kafka consumer to receive the streaming data
- Implement a machine learning algorithm to make recommendations
- Build a Flask web application to display recommendations
Let’s get started!
First, we need to install Kafka and ZooKeeper on our local machine. To do this, we can follow the instructions in the Apache Kafka documentation.
Next, we need to download and extract the Netflix dataset. You can download the dataset from Kaggle.
Once you have downloaded the dataset, extract it to a folder on your local machine.
Now we are ready to create a Kafka producer to stream the data from the dataset. We will use the kafka-python library to create the producer.
First, let’s install the library:
!pip install kafka-python
Now let’s write the code for the Kafka producer:
from kafka import KafkaProducer
import json
import time
producer = KafkaProducer(bootstrap_servers=[‘localhost:9092’],
value_serializer=lambda x:
json.dumps(x).encode(‘utf-8’))
with open(‘netflix_titles.csv’) as f:
next(f) # Skip the header row
for line in f:
producer.send(‘netflix’, line)
time.sleep(0.1) # Add a delay to simulate streaming
This code reads in the Netflix dataset file and sends each line to the Kafka topic ‘netflix’. We also add a delay of 0.1 seconds between each message to simulate streaming.
Next, we need to create a Kafka consumer to receive the streaming data from the producer. We will use the kafka-python library again to create the consumer.
First, let’s install the library:
!pip install kafka-python
Now let’s write the code for the Kafka consumer:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(‘netflix’,
bootstrap_servers=[‘localhost:9092’],
auto_offset_reset=’earliest’,
value_deserializer=lambda x: json.loads(x.decode(‘utf-8’)))
for message in consumer:
print(message.value)
This code creates a Kafka consumer that listens to the ‘netflix’ topic and prints each received message to the console.
Now that we are receiving streaming data from the Kafka producer, we can implement a machine learning algorithm to make recommendations based on the user’s viewing history.
For this project, we will use a content-based recommendation system. This means that we will recommend titles to the user based on their previous viewing history. We will use the plot description of each title to make recommendations.
First, let’s install the necessary libraries:
!pip install pandas numpy scikit-learn nltk
Now let’s write the code for the machine learning algorithm:
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
nltk.download(‘stopwords’)
nltk.download(‘punkt’)
# Load the Netflix dataset
netflix_df = pd.read_csv(‘netflix_titles.csv’)
# Replace missing values with empty strings
netflix_df = netflix_df.fillna(”)
# Combine the ‘title’ and ‘description’ columns into a single column
netflix_df[‘combined’] = netflix_df[‘title’] + ‘ ‘ + netflix_df[‘description’]
# Tokenize the text and remove stopwords
stop_words = set(stopwords.words(‘english’))
netflix_df[‘combined’] = netflix_df[‘combined’].apply(lambda x: ‘ ‘.join([word.lower() for word in word_tokenize(x) if word.lower() not in stop_words]))
# Compute TF-IDF vectors for each title
tfidf = TfidfVectorizer()
tfidf_matrix = tfidf.fit_transform(netflix_df[‘combined’])
# Compute cosine similarity between titles
cosine_sim = cosine_similarity(tfidf_matrix, tfidf_matrix)
# Define a function to get recommendations for a given title
def get_recommendations(title, cosine_sim=cosine_sim, df=netflix_df):
# Get the index of the title in the dataframe
idx = df[df[‘title’]==title].index[0]
# Compute the cosine similarity between the title and all other titles
sim_scores = list(enumerate(cosine_sim[idx]))
# Sort the titles by similarity score
sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
# Get the top 10 most similar titles
sim_scores = sim_scores[1:11]
# Get the titles indices
titles_indices = [i[0] for i in sim_scores]
# Return the top 10 most similar titles
return df.iloc[titles_indices][‘title’].tolist()
This code loads the Netflix dataset and pre-processes the text data by tokenizing the text and removing stop words. We then compute TF-IDF vectors for each title and compute cosine similarity between titles. Finally, we define a function to get recommendations for a given title based on cosine similarity.
Now that we have a machine learning algorithm to make recommendations, we can build a Flask web application to display the recommendations to the user.
First, let’s install the necessary libraries:
!pip install flask
Now let’s write the code for the Flask web application:
from flask import Flask, render_template, request
app = Flask(__name__)
# Home page
@app.route(‘/’)
def home():
return render_template(‘index.html’)
# Recommendation page
@app.route(‘/recommend’, methods=[‘POST’])
def recommend():
# Get the user’s viewing history
history = request.form[‘history’]
# Make recommendations based on the user’s viewing history
recommendations = get_recommendations(history)
# Render the recommendations page with the recommendations
return render_template(‘recommendations.html’, recommendations=recommendations)
if __name__ == ‘__main__’:
app.run(debug=True)
This code defines a Flask web application with two routes: the home page and the recommendation page. The home page displays a form where the user can enter their viewing history. When the user submits the form, the application makes recommendations based on the user’s viewing history using the get_recommendations() function we defined earlier. The recommendations are then displayed on the recommendations page.
Now that we have a working Flask web application, we can deploy it on Apache Kafka using the confluent-kafka-python library.
First, let’s install the necessary libraries:
!pip install confluent-kafka
Now let’s write the code to publish messages to a Kafka topic:
from confluent_kafka import Producer
def delivery_report(err, msg):
if err is not None:
print(‘Message delivery failed: {}’.format(err))
else:
print(‘Message delivered to {} [{}]’.format(msg.topic(), msg.partition()))
p = Producer({‘bootstrap.servers’: ‘localhost:9092’})
def publish_message(topic_name, value):
try:
p.produce(topic_name, value.encode(‘utf-8’), callback=delivery_report)
p.flush()
except Exception as e:
print(‘Exception in publishing message’)
print(str(e))
This code defines a Producer object that publishes messages to a Kafka topic using the publish_message() function.
Next, let’s write the code to consume messages from the Kafka topic and make recommendations based on the user’s viewing history:
from confluent_kafka import Consumer, KafkaError
c = Consumer({
‘bootstrap.servers’: ‘localhost:9092’,
‘group.id’: ‘mygroup’,
‘auto.offset.reset’: ‘earliest’
})
c.subscribe([‘viewing-history’])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(‘End of partition reached {}’.format(msg.topic()))
else:
print(‘Error while consuming message: {}’.format(msg.error()))
else:
print(‘Received message: {}’.format(msg.value().decode(‘utf-8’)))
history = msg.value().decode(‘utf-8’)
recommendations = get_recommendations(history)
publish_message(‘recommendations’, ‘, ‘.join(recommendations))
This code defines a Consumer object that subscribes to the viewing-history topic and consumes messages. When a message is received, the application makes recommendations based on the user’s viewing history using the get_recommendations() function, and publishes the recommendations to the recommendations topic using the publish_message() function.
Now that we have deployed the Flask web application on Apache Kafka, we can test the application by sending messages to the viewing-history topic and receiving recommendations from the recommendations topic.
To send messages to the viewing-history topic, we can use the kafka-console-producer command:
$ kafka-console-producer –broker-list localhost:9092 –topic viewing-history
This command opens a console where we can enter messages to send to the viewing-history topic.
To receive recommendations from the recommendations topic, we can use the kafka-console-consumer command:
$ kafka-console-consumer –bootstrap-server localhost:9092 –topic recommendations –from-beginning
This command opens a console where we can see the recommendations sent by the application.
In this tutorial, we built a machine learning project using Apache Kafka and Python to make personalized recommendations to users based on their viewing history.
We started by cleaning and pre-processing the Netflix dataset, then built a machine learning algorithm to compute cosine similarity between titles and make recommendations based on a user’s viewing history.
We then built a Flask web application to display the recommendations to the user, and deployed the application on Apache Kafka to enable real-time message processing. We also discussed how to test the application by sending messages to Kafka topics.
This project demonstrates how Apache Kafka can be used to build real-time machine learning applications that can handle large volumes of data and make personalized recommendations in real-time. The project can be extended to handle more complex datasets and algorithms, and can be deployed on cloud platforms for scalable and reliable performance.
I hope you found this tutorial helpful in building your own machine learning projects using Apache Kafka and Python. Happy coding!
If you enjoyed reading this tutorial and found it helpful, checkout my handpicked list of machine learning and AI below:
Machine Learning & AI
More content at PlainEnglish.io. Sign up for our free weekly newsletter. Join our Discord community and follow us on Twitter, LinkedIn and YouTube.
Learn how to build awareness and adoption for your startup with Circuit.