Analyzing Apache Web Server Logs with Apache Spark

In this blog We will be analyzing Apache web server log files and extract the HTTP request code information from log files and get the total number of responses for each status code.

First we need to import the required packages. Pyspark is an interface for Apache Spark in Python. Pyspark isn’t on path by default so we use findspark package is adding pyspark at runtime.

We are also importing re package, which is regular expression package for Python and will be used to extract required information from apache web log lines.

import findspark
findspark.init() 
import re
from pyspark.sql import SparkSession

Parsing the Log Lines

Following code provides a method to extract status code information from log lines using regular expressions. To check more about the log line format of apache web server you can check https://httpd.apache.org/docs/1.3/logs.html

apache_web_log_parts = [
    r'(?P<host>\S+)',
    r'\S+',
    r'(?P<user>\S+)',
    r'\[(?P<time>.+)\]',
    r'"(?P<request>.+)"',
    r'(?P<status>[0-9]+)',
    r'(?P<size>\S+)',
    r'"(?P<referer>.*)"',
    r'"(?P<agent>.*)"'
]
apache_web_log_pattern = re.compile(r'\s+'.join(apache_web_log_parts)+r'\s*\Z')

def extract_status_code_from_apache_log_line(line):
    match = apache_web_log_pattern.match(line)
    if match:
        status_code = match.groupdict()["status"]
        if status_code:
            return status_code

Reading Log Files in RDD

There are various data structures in Apache Spark, one of them is RDD (Resilient Distributed Datasets). Following code is reading apache web log files from text file into RDD for further processing. Each line in the log file will be parsed using the extract_status_code_from_apache_log_line method using map. Later we cache the RDD for performance improvement. This will help to improve performance especially for the data which is accessed several times.

After that we map http status codes to tuple, which contains status code and number 1 as the second element. This number will be used for aggregation while reducing the keys.

We add all these up to see how many times each status code is appeared in the log.

In the final step, we convert RDD to data frame structure and then write result into a csv file.

if __name__ == "__main__":
    spark = SparkSession.builder.master("local[*]") \
                    .appName('ApacheWebLogProcessor') \
                    .getOrCreate()
    # read the apache web log file and create an RDD
    apache_web_log_requests = (spark.sparkContext.textFile(r"apache_logs.txt")
                                 .map(extract_status_code_from_apache_log_line)
                                 .cache())
    print(apache_web_log_requests.take(10))
    # we map the http request codes to tuple containing request code and number 1. 
    # Later we add all these up to see how many times each request code is appeared.
    apache_web_log_request_counts = apache_web_log_requests.map(lambda r : (r, 1)).reduceByKey(lambda x, y : x + y)
    print(apache_web_log_request_counts.take(10))
    df_apache_web_log_request_counts = spark.createDataFrame(apache_web_log_request_counts)
    df_apache_web_log_request_counts.repartition(1).write.csv(r"apache_logs_requests.csv")
    print('Done')

Normally in production environments, these logs go through a streaming platform and usually not reside in text files on the system. In the next steps I will demonstrate how we can get these logs from Kafka instead of text files, which will get this closer to a real-world system.

Here is the full code, in the next blog we will improve this code by ingesting data from Kafka streams.

import findspark
findspark.init() 
import re
from pyspark.sql import SparkSession
apache_web_log_parts = [
    r'(?P<host>\S+)',
    r'\S+',
    r'(?P<user>\S+)',
    r'\[(?P<time>.+)\]',
    r'"(?P<request>.+)"',
    r'(?P<status>[0-9]+)',
    r'(?P<size>\S+)',
    r'"(?P<referer>.*)"',
    r'"(?P<agent>.*)"'
]
apache_web_log_pattern = re.compile(r'\s+'.join(apache_web_log_parts)+r'\s*\Z')

def extract_status_code_from_apache_log_line(line):
    match = apache_web_log_pattern.match(line)
    if match:
        status_code = match.groupdict()["status"]
        if status_code:
            return status_code
if __name__ == "__main__":
    spark = SparkSession.builder.master("local[*]") \
                    .appName('ApacheWebLogProcessor') \
                    .getOrCreate()
    # read the apache web log file and create an RDD
    apache_web_log_requests = (spark.sparkContext.textFile(r"apache_logs.txt")
                                 .map(extract_status_code_from_apache_log_line)
                                 .cache())
    print(apache_web_log_requests.take(10))
    # we map the http request codes to tuple containing request code and number 1. 
    # Later we add all these up to see how many times each request code is appeared.
    apache_web_log_request_counts = apache_web_log_requests.map(lambda r : (r, 1)).reduceByKey(lambda x, y : x + y)
    print(apache_web_log_request_counts.take(10))
    df_apache_web_log_request_counts = spark.createDataFrame(apache_web_log_request_counts)
    df_apache_web_log_request_counts.repartition(1).write.csv(r"apache_logs_requests.csv")
    print('Done')

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Website Powered by WordPress.com.

Up ↑

%d bloggers like this: