init(Apache Spark)

In a previous post I wrote about how I've started down the Data Science path, kicking off with an exploration of sentiment analysis for political tweets. This is a topic which I will come back to in the future, not least because the nltk corpus I've made use of for au-pol-sentiment is based on British political tweets. While Australia and Britain share a common political heritage, I'm not completely confident that our political discourse is quite covered by that corpus.

In the meantime, another aspect of Data Science in practice is the use of an ecosystem called Apache Spark. Leaving aside my 20+ years of muscle memory spelling it as SPARC, this is a Machine Learning engine, described on the homepage as a unified analytics engine for large-scale data processing.

My experience is that when I want or need to learn a new toolkit or utility, the best way to do so is by trying to directly solve a specific problem with it. One problem (ok, not really a problem, more a set of questions) I have is that with all of the data I've gathered since 2013 from my solar inverter I'm dependent on pvoutput.org for finding per-year and per-month averages, maxima and minima. So with a data science-focused job interview (with Oracle Labs) approaching, I decided to get stuck in and get started with Apache Spark.

The first issue I faced was implementing the ETL pipeline. I have two types of files to load - the first contains the output from solarmonj, the second has the output from my solar inverter script.

Here's the first schema form:

Field name

Units

Timestamp

seconds-since-epoch

Temperature

float (degrees C)

energyNow

float (Watts)

energyToday

float (Watt-hours)

powerGenerated

float (Hertz)

voltageDC

float (Volts)

current

float (Amps)

energyTotal

float (Watt-hours)

voltageAC

float (Volts)


The second schema is from jfy-monitor, and has this schema:

Field name

Units

Timestamp

ISO8601-like ("yyyy-MM-dd'T'HH:mm:ss")

Temperature

float (degrees C)

PowerGenerated

float (Watts)

VoltageDC

float (Volts)

Current

float (Amps)

EnergyGenerated

float (Watts-Hours)

VoltageAC

float (Volts)

There are two other salient pieces of information about these files. The first is that the energyTotal and EnergyGenerated fields are running totals of the amount of energy generated on that particular day. The second is that in the first version of the schema, energyTotal needs to be multiplied by 1000 to get the actual KW/h value.

With that knowledge ready, let's dive into the code.

The first step is to start up a Spark session:

from pyspark.sql.functions import date_format
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Basic Spark session configuration
sc = SparkContext("local", "PV Inverter Analysis")
spark = SparkSession(sc)

# We don't need most of this output
log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)

I observed while prototyping this in the pyspark REPL environment that if I didn't turn the logging output right down, then I'd see squillions of INFO messages.

The second step is to generate a list of files. As you might have guessed, I've got a year/month/day hierarchy - but the older files have a csv extension. To get those files (and since I want to be able to process any given year or year+month combination), I need to use some globbing:

import glob

def generateFiles(topdir, year, month):
    """Construct per-year dicts of lists of files"""
    allfiles = {}
    kkey = ""
    patterns = []
    months = []
    # Since some of our data dirs have months as bare numbers and
    # others have a prepended 0, let's match them correctly.
    if month:
        if month < 10:
            months = [month, "0" + str(month)]
        else:
            months = [month]
    if year and month:
        patterns = ["{year}/{monthp}/**".format(year=year, monthp=monthp)
                    for monthp in months]
        kkey = year
    elif year:
        patterns = ["{year}/*/**".format(year=year)]
        kkey = year

    if patterns:
        globs = []
        for pat in patterns:
            globs.extend(glob.glob(os.path.join(topdir, pat)))
        allfiles[kkey] = globs
    else:
        for yy in allYears:
            allfiles[yy] = glob.glob(os.path.join(topdir,
                                                  "{yy}/*/*".format(yy=yy)))
    return allfiles

To load in each file, I turned to the tried-and-true Python standard module csv, and - rather than having a v1 and v2 processing function, I model DRY and use an input argument to determine which set of elements to match:

import csv
from datetime import datetime

def importCSV(fname, isOld):
    output = []
    if isOld:
        multiplier = 1000.0
    else:
        multiplier = 1.0

    csvreader = csv.reader(open(fname).readlines())
    for row in csvreader:
        try:
            if isOld:
                (tstamp, temp, _enow, _etoday, powergen, vdc,
                 current, energen, vac) = row
            else:
                (tstamp, temp, powergen, vdc, current, energen, vac) = row
        except ValueError as _ve:
            # print("failed at {row} of {fname}".format(row=row, fname=fname))
            continue

        if "e" in temp:
            # invalid line, skip it
            continue

        if isOld:
            isostamp = datetime.fromtimestamp(int(tstamp))
        else:
            isostamp = datetime.fromisoformat(tstamp)

        output.append({
            "timestamp": isostamp,
            "Temperature": float(temp),
            "PowerGenerated": float(powergen),
            "VoltageDC": float(vdc),
            "Current": float(current),
            "EnergyGenerated": float(energen) * multiplier,
            "VoltageAC": float(vac)})
    return output

Now we get to the Apache Spark part. Having got a dictionary of anonymous dicts I can turn them into an Resilient Distributed Dataset (RDD) and thence a DataFrame. I chose the DataFrame model rather than a Row because that matches up nicely with my existing data format. For other applications (such as when I extend my Twitter Sentiment Analysis project with the streaming API) I'll use the Row datatype instead.

def now():
    """ Returns an ISO8601-formatted (without microseconds) timestamp"""
    return datetime.now().strftime("%Y-%M-%dT%H:%m:%S")

allFiles = generateFiles("data", qyear, qmonth)

print(now(), "Importing data files")

for k in allFiles:
    rddyear = []
    for fn in allFiles[k]:
        if fn.endswith(".csv"):
            rddyear.extend(importCSV(fn, True))
        else:
            rddyear.extend(importCSV(fn, False))
    rdds[k] = rddyear

print(now(), "All data files imported")

for year in allYears:
    rdd = sc.parallelize(rdds[year])
    allFrames[year] = rdd.toDF()
    newFrame = "new" + str(year)
    # Extend the schema for our convenience
    allFrames[newFrame] = allFrames[year].withColumn(
        "DateOnly", date_format('timestamp', "yyyyMMdd")
    ).withColumn("TimeOnly", date_format('timestamp', "HHmmss"))
    allFrames[newFrame].createOrReplaceTempView("view{year}".format(
        year=year))

print(now(), "Data transformed from RDDs into DataFrames")

The reason I chose to extend the frames with two extra columns is because when I search for the record dates (minimum and maximum), I want to have a quick SELECT which I can aggregate on.

ymdquery = "SELECT DISTINCT DateOnly from {view} WHERE DateOnly "
ymdquery += "LIKE '{yyyymm}%' ORDER BY DateOnly ASC"

for year in allYears:
    for mon in allMonths:
        if mon < 10:
            yyyymm = str(year) + "0" + str(mon)
        else:
            yyyymm = str(year) + str(mon)

        _dates = spark.sql(ymdquery.format(
            view=view, yyyymm=yyyymm)).collect()
        days = [n.asDict()["DateOnly"] for n in _dates]

        _monthMax = frame.filter(
            frame.DateOnly.isin(days)).agg(
                {"EnergyGenerated": "max"}).collect()[0]
        monthMax = _monthMax.asDict()["max(EnergyGenerated)"]

I keep track of each day's maximum, and update my minval and minDay as required. All this information is then stored in a per-month dict, and then in a per-year dict.

The last stage is to print out the record dates, monthly and yearly totals, averages and other values.

Running this utility on my 4-core Ubuntu system at home, I get what I believe are ok timings for whole-year investigations, and reasonable timings if I check a specific month.

When I run the utility for January 2018, the output looks like this:

(v-3.7-linux) flerken:solar-spark $ SPARK_LOCAL_IP=0.0.0.0 time -f "%E"  spark-submit --executor-memory 2G --driver-memory 2G solar-spark.py  -y 2018 -m 1
19/10/15 12:23:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark default log4j profile: org/apache/spark/log4j-defaults.properties

[Most INFO-level output elided]

19/10/15 12:23:58 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/10/15 12:23:58 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://0.0.0.0:4040

2019-23-15T12:10:59 Importing data files
2019-23-15T12:10:59 All data files imported
/space/jmcp/web/v-3.7-linux/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/session.py:366: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
2019-24-15T12:10:01 Data transformed from RDDs into DataFrames
2019-24-15T12:10:01 Analysing 2018
2019-24-15T12:10:01          January
2019-24-15T12:10:15 All data analysed
2019-24-15T12:10:15 2018 total generation: 436130.00 KW/h
2019-24-15T12:10:15         January total:               436130.00 KW/h
2019-24-15T12:10:15         Record dates for January:    Max on 20180131 (16780.00 KW/h), Min on 20180102 (10560.00 KW/h)
2019-24-15T12:10:15         Average daily generation  14068.71 KW/h
2019-24-15T12:10:15 ----------------

0:18.76

While that processing is going on, you can see a dashboard with useful information about the application at http://localhost:4040:

ExecutorsJobsDetails of a query

You can find the code for this project in my GitHub repo solar-spark.