Analyzing Event Stream Dataset Using Spark

In this post, we will assume we were given a task to analyze click stream events and generate a report to display top viewed items. We will be using a sample dataset from kaggle as input data. You can download the sample dataset from the following link

https://www.kaggle.com/retailrocket/ecommerce-dataset?select=events.csv

There are multiple CSV files, we will just focus on events dataset. Requirement is to filter out only “view” events and based on this, find the most viewed “itemid”.

First we will add required packages and initialize spark using findspark package.

import findspark
findspark.init()
from pyspark.sql import SparkSession

In the main method, we will create the spark session and set our application name. Here master is set to local[*], means that we are running locally and * within the brackets specifies the number of threads, you can specify a number here, if * is defined then it will run spark locally with as many worker threads as logical cores on your machine.

    spark = SparkSession.builder.master("local[*]") \
                    .appName('ClickStreamEventAnalysis') \
                    .getOrCreate()

After creating the session, we can now read our data into data frame. We set header option to true as the file has headers, and we set inferSchema option to true to infer the schema automatically. This is not supposed to be used in production environments, instead a schema should be defined for the dataset. But for demo purposes, we can use this option.

    eventsDataFrame = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .load("dataset\events.csv")

We can then create a temp view using the data frame and query against it to get top items in our dataset.

    eventsDataFrame.createOrReplaceTempView("events_data")
    top_items_sql = spark.sql("""select itemid, count(*) as total_viewed
                                from events_data
                                where event="view"
                                group by itemid
                                order by count(*)
                                limit 10""")
    top_items_sql.collect()
    top_items_sql.show()

In the next posts, we will improve the code to make it more like production environment by adding some streaming option to it and using window function over time series. Here is the full code:

import findspark
findspark.init()
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder.master("local[*]") \
                    .appName('ClickStreamEventAnalysis') \
                    .getOrCreate()
    # read the events.csv file and create a data frame
    eventsDataFrame = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .load("dataset\events.csv")
    eventsDataFrame.createOrReplaceTempView("events_data")
    top_items_sql = spark.sql("""select itemid, count(*) as total_viewed
                                from events_data
                                where event="view"
                                group by itemid
                                order by count(*)
                                limit 10""")
    top_items_sql.collect()
    top_items_sql.show()

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Website Powered by WordPress.com.

Up ↑

%d bloggers like this: