
In this post, we will assume we were given a task to analyze click stream events and generate a report to display top viewed items. We will be using a sample dataset from kaggle as input data. You can download the sample dataset from the following link
https://www.kaggle.com/retailrocket/ecommerce-dataset?select=events.csv
There are multiple CSV files, we will just focus on events dataset. Requirement is to filter out only “view” events and based on this, find the most viewed “itemid”.
First we will add required packages and initialize spark using findspark package.
import findspark
findspark.init()
from pyspark.sql import SparkSession
In the main method, we will create the spark session and set our application name. Here master is set to local[*], means that we are running locally and * within the brackets specifies the number of threads, you can specify a number here, if * is defined then it will run spark locally with as many worker threads as logical cores on your machine.
spark = SparkSession.builder.master("local[*]") \
.appName('ClickStreamEventAnalysis') \
.getOrCreate()
After creating the session, we can now read our data into data frame. We set header option to true as the file has headers, and we set inferSchema option to true to infer the schema automatically. This is not supposed to be used in production environments, instead a schema should be defined for the dataset. But for demo purposes, we can use this option.
eventsDataFrame = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("dataset\events.csv")
We can then create a temp view using the data frame and query against it to get top items in our dataset.
eventsDataFrame.createOrReplaceTempView("events_data")
top_items_sql = spark.sql("""select itemid, count(*) as total_viewed
from events_data
where event="view"
group by itemid
order by count(*)
limit 10""")
top_items_sql.collect()
top_items_sql.show()
In the next posts, we will improve the code to make it more like production environment by adding some streaming option to it and using window function over time series. Here is the full code:
import findspark
findspark.init()
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.master("local[*]") \
.appName('ClickStreamEventAnalysis') \
.getOrCreate()
# read the events.csv file and create a data frame
eventsDataFrame = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("dataset\events.csv")
eventsDataFrame.createOrReplaceTempView("events_data")
top_items_sql = spark.sql("""select itemid, count(*) as total_viewed
from events_data
where event="view"
group by itemid
order by count(*)
limit 10""")
top_items_sql.collect()
top_items_sql.show()
Leave a Reply