Common Table Expressions and an ORM

I solved a gnarly performance problem last week and I'd like to share with you what I learnt while doing it.

We received an alert from one of our automated monitoring systems that a particular query being run via an API was taking too long. And by "too long" I mean minutes. Since database engines are optimised to return data to you in milliseconds, this seemed wildly wrong.

The first step when you are checking a query for performance is to use the query analyzer. This is a very powerful tool in any database, and helps you see just what the engine is going to do for any particular bit of SQL. The term you're looking for in the documentation is EXPLAIN.

We thought the query was fairly simple. It followed this general form:

SELECT columns, COUNT(SELECT specific column WHERE conditions),
FROM database
WHERE conditions
ORDER BY ordering criteria;

and gave us not just results with specific conditions, but the count of rows which met those conditions too.

You will notice that I did not use first set of conditions and second set of conditions. This is because the selection criteria were in fact the same. That was the first clue.

A second clue was that of these selection conditions was that we had a range check - is the column's value between A and B? [We actually had two, but having just one was sufficient to show the problem]. This was expressed as

columnA >= smallervalue AND
columnA <= largervalue

So you'll see that for each row we had two comparisons to do. I tried making that condition singular, by using just >= smallervalue (I also tried <= largervalue> - no difference in timing) and while that did make the query faster, it did not reflect what we need to be able to do, so that was out.

Back to the query planner. It wasn't happy. It turns out that the most efficient plan it could come up with was using a Bitmap Index Scan. Twice. For the same query - but throwing the results of the first query away before running the whole thing again.

I knew that there had to be a better way (tm) - and there was. Here's the first thing I learned: the SQL standard has a thing called a Common Table Expression or CTE. This is a way of creating a temporary table that is used for just that query.

With this knowledge, I could now re-write the query in a considerably more efficient fashion. (It took me several days to figure this out - I'm no SQL guru!)

WITH better_query AS
    (SELECT columnname
    FROM tablename
    WHERE conditions)
SELECT columns, count(better_query.columnname)
FROM tablename
GROUP BY grouping criteria
ORDER BY ordering criteria;

Excellent! ... except that that was giving me all the rows in tablename which matched from better_query rather than the specific rows which met conditions. To solve this niggle I needed a join.

WITH better_query AS
    (SELECT columnname
    FROM tablename
    WHERE conditions)
SELECT columns, count(better_query.columnname)
FROM tablename TN
JOIN better_query BQ on BQ.columnname = TN.columnname
GROUP BY grouping criteria
ORDER BY ordering criteria;

The query planner was a lot happier. Happier to the tune of a 20x improvement.

There might have been a cheer or two.

So how do we put this into our application codebase?

While a major reason $employer hired me was my Python skills, $employer isn't a Python shop by any stretch of the imagination. We're a Java shop, and I'm cross-pollinating :-).

Part of that process has been learning how we abstract database specificities out of the code - we use an Object-Relational Mapping (aka ORM) to cope with using more than one vendor's database. There are a few options for Java ORMs and the one that we use is jOOQ. This is the first time I've used an ORM (of course I've read about them, but years ago) so this too was something new that I've learned. Heading to the documentation I read a lot, getting my head around the concepts and syntax.

It took me several days of research and hacking (stackoverflow, github, come on down!) to work out the correct syntax.

Here's the gist of what we needed:

public static QueryClass {

    private final String withClauseName = new String("betterQuery");
    private final String withClauseField = new String("fieldName");

    ....

    CommonTableExpression<Record1<Integer>> withClause = DSL.name(withClauseName)
        .fields(withClauseField)
        .as(select(columnName)
            .from(tableName)
            .where(conditions)
        );

    Field<Integer> results = DSL.field(context.select(
        count(DSL.field(columnName)))
        .from(withClause)
        .limit(1)
    ).as("results");

    /*
     * append results field to array of column names in the select, then ...
     */

    List<Typename> resultSet = DSLcontext.with(withClause)
        .select(columns)
        .from(tableName)
        .join(withClause)
            .on(withClause.field(withClauseField, typename.class)
                .eq(tablename.columnName))
        .groupBy(columnName)
        .orderBy(columnName)
    ....
};

No, it's not simple - but using jOOQ does mean that we can keep our code pretty close to how we would write bare SQL (in this case that would still be rather gnarly), and that is definitely a win.

Acknowledgement

Since this is another SQL-based post, I'd like to acknowledge the massive knowledge and confidence boost I got from reading Julia Evans (aka b0rk)'s SQL zine Become a SELECT star. She has a beautiful style and breaks complex technical concepts down into easily understandable chunks with great illustrations. Reading that zine advanced my database confidence immensely.




A brief introduction to the Python Database API

As popular as NoSQL databases currently are, there is still an immense amount of data in the world for which the relational database is still the most appropriate way to store, access and manipulate it. For those of us who like to use Python rather than JDBC, we are fortunate to have bindings for the major databases which implement the Python Database API, aka PEP249.

This standard is important because it allows you to write your application in a way that promotes cross-platform (cross-database engine) portability. While you do still need to be aware of differences between the databases (for example, Oracle's VARCHAR2 vs PostgreSQL's VARCHAR), the essential tasks that you need to accomplish when making use of a database are abstracted from you.

In this brief introduction I will show you how to install two popular Python database binding packages, connect to a database and run queries.

Installing the bits you need

I assume that you've got Python installed on your system already, and that the database you want to connect to is set up and running. What you need now are the bindings for that particular database. For the Oracle database, this is a package known as cx_Oracle. For PostgreSQL, I suggest psycopg2 though there are other bindings available.

You will definitely need to install the pre-packaged database client libraries on your host (or in your container) as well. For Oracle look for your platform in the Oracle Instant Client download pages, for PostgreSQL on linux you will need libpq-dev as supplied by your package manager. On MacOS, please find the appropriate option from the PostgreSQL MacOSX download page.

After you've installed these pieces, you can then pip install --user cx_Oracle or pip install --user psycopg2. My personal preference is to install bindings in a venv. For Solaris 11 you can get the cx_Oracle bits as well as the Oracle Instant Client by uttering pkg install cx_Oracle. [On a personal note, I made that possible - see the History tab on the Solaris Userland cx_Oracle github].

Getting a connection to the database

Now that you have the correct packages installed, you can start investigating. The first thing to do is start an interactive interpreter and import the modules you need:

$ python3.8
>> import cx_Oracle

(Note that unless you're using Solaris' pre-packaged version, you must point LD_LIBRARY_PATH to where the module can locate the Instant Client libraries, specifically libclntsh.so)

$ python 3.8
>>> import psycopg2
>>> from psycopg2.extras import execute_batch

We need a connection to the database, which in Oracle terms is a DSN, or Data Source Name. This is made up of the username, password, host (either address or IP), port and database instance name. Put together, the host, port and database instance are what Oracle calls the "service name".

The common example you will see in Oracle documentation (and on stackoverflow!) is

scott/tiger@orcl:1521/orcl

I'm a bit over seeing that, so I've created a different username and DBname:

DEMOUSER/DemoDbUser1@dbhost:1521/demodb

Since Oracle defaults to using port 1521 on the host, you only need to specify the port number if your database is listening on a different port.

An Oracle connection is therefore

user = "DEMOUSER"
passwd = "DemoDbUser1"
servicename = "dbhost:1521/demodb"

try:
    connection = cx_Oracle.connect(user, passwd, servicename)
except cx_Oracle.DatabaseError as dbe:
    print("""Unable to obtain a connection to {servicename}""".format(servicename=servicename))
    raise

With PostgreSQL we can also specify whether we want SSL enabled.

try:
    connection = psycopg2.connect(dbname=dbname,
                                  user=dbuser,
                                  password=dbpassword,
                                  host=dbhost,
                                  port=dbport,
                                  sslmode=dbsslmode)
except psycopg2.OperationalError as dboe:
    print("""Unable to obtain a connection to the database.\n
          Please check your database connection details\n
          Notify dbadmin@{dbhost}""".format(dbhost=dbhost),
          file=sys.stderr, flush=True)
    raise

Once we have the connection, we need a cursor so we can execute statements:

cursor = connection.cursor()

Now that we have our connection and a cursor to use, what can we do with it? SQL baby!

We need a handy dataset to muck around with, so I've got just the thing courtesy of my previous post about determining your electorate. My JSON files have a fairly basic structure reflecting my needs for that project, but that is still sufficient our purposes here.

>>> import json
>>> actf = json.load(open("find-my-electorate/json/ACT.json", "r"))
>>> actf.keys()
dict_keys(['Brindabella', 'Ginninderra', 'Kurrajong', 'Murrumbidgee', 'Yerrabi'])
>>> actf["Yerrabi"].keys()
dict_keys(['jurisdiction', 'locality', 'blocks', 'coords'])

The 'jurisdiction' key is the state or territory name (ACT, Australian Capital Territory in this case), the 'locality' is the electorate name, 'blocks' is a list of the Australian Bureau of Statistics Mesh Block which the electorate contains, and 'coords' is a list of the latitude, longitude pairs which are the boundaries of those blocks.

Now we need a schema to operate within. This is reasonably simple - if you've done any work with SQL databases. To start with, we need two SEQUENCE s, which are database objects from which any database user may generate a unique integer. This is the easiest and cheapest way that I know of to generate values for primary key columns. (For a good explanation on them, see the Oracle 19c CREATE SEQUENCE page). After that, we'll create two tables: electorate and geopoints. (We're going to ignore the Mesh Block data, it's not useful for this example).

-- This is the Oracle DB form
DROP TABLE GEOPOINTS CASCADE CONSTRAINTS;
DROP TABLE ELECTORATES CASCADE CONSTRAINTS;

DROP SEQUENCE ELECTORATE_SEQ CASCADE;
DROP SEQUENCE LATLONG_SEQ CASCADE;

CREATE SEQUENCE ELECTORATE_SEQ INCREMENT BY 1 MINVALUE 1 NOMAXVALUE;
CREATE SEQUENCE LATLONG_SEQ INCREMENT BY 1 MINVALUE 1 NOMAXVALUE;

CREATE TABLE ELECTORATES (
    ELECTORATE_PK   NUMBER DEFAULT DEMOUSER.ELECTORATE_SEQ.NEXTVAL NOT NULL PRIMARY KEY,
    ELECTORATE      VARCHAR2(64) NOT NULL,
    STATENAME       VARCHAR2(3) NOT NULL -- Using the abbreviated form
);

CREATE TABLE GEOPOINTS (
    LATLONG_PK      NUMBER DEFAULT DEMOUSER.LATLONG_SEQ.NEXTVAL NOT NULL PRIMARY KEY,
    LATITUDE        NUMBER NOT NULL,
    LONGITUDE       NUMBER NOT NULL,
    ELECTORATE_FK   NUMBER REFERENCES ELECTORATES(ELECTORATE_PK)
);

-- Now for the PostgreSQL form
DROP TABLE IF EXISTS GEOPOINTS CASCADE;
DROP TABLE IF EXISTS ELECTORATES CASCADE;

DROP SEQUENCE IF EXISTS ELECTORATE_SEQ CASCADE;
DROP SEQUENCE IF EXISTS LATLONG_SEQ CASCADE;

CREATE SEQUENCE ELECTORATE_SEQ INCREMENT BY 1 MINVALUE 1 NO MAXVALUE;
CREATE SEQUENCE LATLONG_SEQ INCREMENT BY 1 MINVALUE 1 NO MAXVALUE;

CREATE TABLE ELECTORATES (
    ELECTORATE_PK   INTEGER DEFAULT NEXTVAL('ELECTORATE_SEQ') NOT NULL PRIMARY KEY,
    ELECTORATE      VARCHAR(64) NOT NULL,
    STATENAME       VARCHAR(3) NOT NULL -- Using the abbreviated form
);

CREATE TABLE GEOPOINTS (
    LATLONG_PK      INTEGER DEFAULT NEXTVAL('LATLONG_SEQ') NOT NULL PRIMARY KEY,
    LATITUDE        NUMERIC NOT NULL,
    LONGITUDE       NUMERIC NOT NULL,
    ELECTORATE_FK   INTEGER REFERENCES ELECTORATES(ELECTORATE_PK) NOT NULL
);

We need to execute these statements one by one, using cursor.execute(). Come back once you've done that and we've got our schema set up.

[I see that a short time has passed - welcome back]

Now we need to populate our database. For both connection types, we'll make use of prepared statements, which allow us to insert, update, delete or select many rows at a time. For the Oracle connection we'll use the executemany() function, but for PostgreSQL's psycopg2 bindings we'll use execute_batch() instead. (See the psycopg2 website note about it).

For the Oracle examples, we'll use a bind variable so that when we INSERT into the ELECTORATES table we get the ELECTORATE_PK to use in the subsequent INSERT to the GEOPOINTS table. That saves us a query to get that information. The psycopg2 module does not have this support, unfortunately. Since we have many records to process, I've created some small functions to enable DRY

# Oracle version
>>> epk = cursor.var(int)
>>> electStmt = """INSERT INTO ELECTORATES (ELECTORATE, STATENAME)
...                VALUES (:electorate, :statename)
...                RETURNING ELECTORATE_PK INTO :epk"""
>>> geoStmt = """INSERT INTO GEOPOINTS(LATITUDE, LONGITUDE, ELECTORATE_FK)
...              VALUES (:lat, :long, :fk)"""

>>> def addstate(statename):
...     for electorate in statename:
...         edict = {"electorate": statename[electorate]["locality"],
...                  "statename": statename[electorate]["jurisdiction"],
...                  "epk": epk}
...         cursor.execute(electStmt, edict)
...         points = list()
...         for latlong in statename[electorate]["coords"]:
...             points.append({"latitude": float(latlong[1]),
...                            "longitude": float(latlong[0]),
...                            "fk": int(epk.getvalue(0)[0])})
...         cursor.executemany(geoStmt, points)
...         connection.commit()

>>> allpoints = dict()
>>> states = ["act", "nsw", "nt", "qld", "sa", "tas", "wa", "vic", "federal"]

>>> for st in states:
...     allpoints[st] = json.load(open("json/{stu}.json".format(stu=st.upper()), "r"))
...     addstate(allpoints[st])

For the PostgreSQL version, we need to subtly change the INSERT statements:

>>> electStmt = """INSERT INTO ELECTORATES (ELECTORATE, STATENAME)
...                VALUES (%(electorate)s, %(statename)s)
...                RETURNING ELECTORATE_PK"""
>>> geoStmt = """INSERT INTO GEOPOINTS(LATITUDE, LONGITUDE, ELECTORATE_FK)
...                VALUES (%(latitude)s, %(longitude)s, %(fk)s)"""

Another thing we need to change is the first execute, because bind variables are an Oracle extension, and we're also going to change from executemany() to execute_batch():

>>> def addstate(statename):
...     for electorate in statename:
...         edict = {"electorate": statename[electorate]["locality"],
...                  "statename": statename[electorate]["jurisdiction"]}
...         cursor.execute(electStmt, edict)
...         epk = cursor.fetchone()[0]
...         points = list()
...         for latlong in statename[electorate]["coords"]:
...             points.append({"latitude": float(latlong[1]),
...                            "longitude": float(latlong[0]), "fk": epk})
...         execute_batch(cursor, geoStmt, points)
...         connection.commit()

The rest of the data loading is the same as with Oracle. Since I'm curious about efficiencies, I did a rough benchmark of the data load with executemany() as well, and it was around half the speed of using execute_batch(). YMMV, of course, so always test your code with your data and as close to real-world conditions as possible.

Now that we have the data loaded we can do some investigations. I'm going to show the PostgreSQL output for this and call out differences with Oracle where necessary.

While some parts of our state and territory borders follow rivers and mountain ranges, quite a lot of them follow specific latitudes and longitudes. The border between Queensland and New South Wales, for example is mostly along the 29th parallel. Between the Northern Territory and South Australia it's the 26th parallel, and that between South Australia and Western Australia is along meridian 129 East.

If you go to a mapping service and plug in 29S,141E (the line between Queensland and New South Wales):

/images/2020/python-db-api/29s141e-400x400.png

you'll see that the exact point is not where the boundary is actually drawn. That means we need to use some fuzziness in our matching.

>>> cursor.execute("""SELECT STATENAME, ELECTORATE FROM ELECTORATES E WHERE ELECTORATE_PK IN
...                   (SELECT DISTINCT ELECTORATE_FK FROM GEOPOINTS WHERE LATITUDE BETWEEN -29.001 AND -28.995)
...                   ORDER BY STATENAME, ELECTORATE""")
>>> results = cursor.fetchall()
>>> for s, e in results:
...     print("{s:6} {e}".format(s=s, e=e))
...
NSW    Ballina
NSW    Barwon
NSW    Clarence
NSW    Lismore
NSW    New England
NSW    Northern Tablelands
NSW    Page
NSW    Parkes
QLD    Maranoa
QLD    Southern Downs
QLD    Warrego
SA     Giles
SA     Grey
SA     Stuart
SA     Stuart
WA     Durack
WA     Geraldton
WA     Kalgoorlie
WA     Moore
WA     North West Central

Hmm. Not only did I not want SA or WA electorates returned, the database has also given me the federal electorates as well. Let's update our electorates table to reflect that jurisdictional issue:

>>> cursor.execute("""ALTER TABLE ELECTORATES ADD FEDERAL BOOLEAN""")
>>> connection.commit()

Popping into a psql session for a moment, let's see what we have:

demodb=> \d electorates
                                    Table "public.electorates"
    Column     |         Type          | Collation | Nullable |               Default
---------------+-----------------------+-----------+----------+-------------------------------------
 electorate_pk | integer               |           | not null | nextval('electorate_seq'::regclass)
 electorate    | character varying(64) |           | not null |
 statename     | character varying(3)  |           | not null |
 federal       | boolean               |           |          |
Indexes:
    "electorates_pkey" PRIMARY KEY, btree (electorate_pk)
Referenced by:
    TABLE "geopoints" CONSTRAINT "geopoints_electorate_fk_fkey" FOREIGN KEY (electorate_fk) REFERENCES electorates(electorate_pk)

Now let's populate that column. Unfortunately, though, some state electorates have the same name as federal electorates - and some electorate names exist in more than one state, too! (I'm looking at you, Bass!). Tempting as it is to zorch our db and start from scratch, I'm going to take advantage of this information:

  1. We added the federal electorate list after all the states,

  2. the federal list was constructed starting with the ACT, and therefore

  3. Federal electorates will have a higher primary key value than all the states and territories.

With that in mind here's the first federal electorate entry:

demodb=> SELECT ELECTORATE_PK, ELECTORATE, STATENAME FROM ELECTORATES WHERE STATENAME = 'ACT' ORDER BY ELECTORATE_PK, STATENAME;
 electorate_pk |  electorate  | statename
---------------+--------------+-----------
             1 | Brindabella  | ACT
             2 | Ginninderra  | ACT
             3 | Kurrajong    | ACT
             4 | Murrumbidgee | ACT
             5 | Yerrabi      | ACT
           416 | Bean         | ACT
           417 | Canberra     | ACT
           418 | Fenner       | ACT
(8 rows)

Let's check how many electorates have a primary key higher than Bean:

>>> cursor.execute("""SELECT COUNT(ELECTORATE_PK), MAX(ELECTORATE_PK) FROM ELECTORATES""")
>>> cursor.fetchall()
[(566, 566)]

And a quick check to see that we do in fact have 151 electorates with that condition:

>>> 566 - 415
151

We do. Onwards.

>>> cursor.execute("""UPDATE ELECTORATES SET FEDERAL = TRUE WHERE ELECTORATE_PK > 415""")
>>> connection.commit()
>>> cursor.execute("""SELECT COUNT(*) FROM ELECTORATES WHERE FEDERAL IS TRUE""")
>>> cursor.fetchall()
[(151,)]

Likewise, we'll set the others to federal=False:

>>> cursor.execute("""UPDATE ELECTORATES SET FEDERAL = FALSE WHERE ELECTORATE_PK < 416""")
>>> connection.commit()

Back to our queries. I want to see both sorts of electorates, but grouped by whether they are federal or state electorates:

>>> cursor.execute("""SELECT E.STATENAME, E.ELECTORATE, E.FEDERAL FROM ELECTORATES E
...                   WHERE E.ELECTORATE_PK IN
...                       (SELECT DISTINCT ELECTORATE_FK FROM GEOPOINTS WHERE LATITUDE BETWEEN -29.001 AND -28.995)
...                   GROUP BY E.STATENAME, E.ELECTORATE, E.FEDERAL
...                   ORDER BY STATENAME, ELECTORATE""")
>>> fedstate = cursor.fetchall()
>>> fedstate
[('NSW', 'Ballina', False), ('NSW', 'Barwon', False), ('NSW', 'Clarence', False), ('NSW', 'Lismore', False), ('NSW', 'New England', True), ('NSW', 'Northern Tablelands', False), ('NSW', 'Page', True), ('NSW', 'Parkes', True), ('QLD', 'Maranoa', True), ('QLD', 'Southern Downs', False), ('QLD', 'Warrego', False), ('SA', 'Giles', False), ('SA', 'Grey', True), ('SA', 'Stuart', False), ('WA', 'Durack', True), ('WA', 'Geraldton', False), ('WA', 'Kalgoorlie', False), ('WA', 'Moore', False), ('WA', 'North West Central', False)]

>>> def tfy(input):
...     if input:
...         return "yes"
...     else:
...         return ""

>>> for res in fedstate:
...     fmtstr = """{statename:6} {electorate:30} {federal}"""
...     print(fmtstr.format(statename=res[0], electorate=res[1], federal=tfy(res[2])))
...
NSW    Ballina
NSW    Barwon
NSW    Clarence
NSW    Lismore
NSW    New England                    yes
NSW    Northern Tablelands
NSW    Page                           yes
NSW    Parkes                         yes
QLD    Maranoa                        yes
QLD    Southern Downs
QLD    Warrego
SA     Giles
SA     Grey                           yes
SA     Stuart
WA     Durack                         yes
WA     Geraldton
WA     Kalgoorlie
WA     Moore
WA     North West Central

I could have added a WHERE E.FEDERAL = TRUE to the query, too.

Finally, let's see what state electorates in WA and SA are on the border:

>>> cursor.execute("""SELECT STATENAME, ELECTORATE FROM ELECTORATES E
...                   WHERE ELECTORATE_PK IN
...                       (SELECT DISTINCT ELECTORATE_FK FROM GEOPOINTS WHERE LATITUDE BETWEEN -60.00 AND -25.995
                           AND LONGITUDE BETWEEN 128.995 AND 129.1)
...                   AND FEDERAL = FALSE ORDER BY STATENAME, ELECTORATE""")
>>> results = cursor.fetchall()
>>> for _res in results:
...     print("""{statename:6} {electorate}""".format(statename=_res[0], electorate=_res[1]))
...
NT     Namatjira
SA     Giles
WA     Kalgoorlie
WA     North West Central

Why do we have that electorate from the Northern Territory? It's because the coordinates are a bit fuzzy and we had to use a range (the BETWEEN) in our query.

Finally

I apologise, because while this was supposed to be a brief introduction I did get side-tracked with data investigation. I suppose that's an occupational hazard since I'm a Data Engineer working for a company which provides GIS-related services. Anyway, in terms of depth, this was definitely only scratching the surface of what is possible with Python and databases. I encourage you to go and read the Python Database API as well as the SQL reference manual(s) for your chosen database and its binding documentation.




init(Apache Spark)

In a previous post I wrote about how I've started down the Data Science path, kicking off with an exploration of sentiment analysis for political tweets. This is a topic which I will come back to in the future, not least because the nltk corpus I've made use of for au-pol-sentiment is based on British political tweets. While Australia and Britain share a common political heritage, I'm not completely confident that our political discourse is quite covered by that corpus.

In the meantime, another aspect of Data Science in practice is the use of an ecosystem called Apache Spark. Leaving aside my 20+ years of muscle memory spelling it as SPARC, this is a Machine Learning engine, described on the homepage as a unified analytics engine for large-scale data processing.

My experience is that when I want or need to learn a new toolkit or utility, the best way to do so is by trying to directly solve a specific problem with it. One problem (ok, not really a problem, more a set of questions) I have is that with all of the data I've gathered since 2013 from my solar inverter I'm dependent on pvoutput.org for finding per-year and per-month averages, maxima and minima. So with a data science-focused job interview (with Oracle Labs) approaching, I decided to get stuck in and get started with Apache Spark.

The first issue I faced was implementing the ETL pipeline. I have two types of files to load - the first contains the output from solarmonj, the second has the output from my solar inverter script.

Here's the first schema form:

Field name

Units

Timestamp

seconds-since-epoch

Temperature

float (degrees C)

energyNow

float (Watts)

energyToday

float (Watt-hours)

powerGenerated

float (Hertz)

voltageDC

float (Volts)

current

float (Amps)

energyTotal

float (Watt-hours)

voltageAC

float (Volts)


The second schema is from jfy-monitor, and has this schema:

Field name

Units

Timestamp

ISO8601-like ("yyyy-MM-dd'T'HH:mm:ss")

Temperature

float (degrees C)

PowerGenerated

float (Watts)

VoltageDC

float (Volts)

Current

float (Amps)

EnergyGenerated

float (Watts-Hours)

VoltageAC

float (Volts)

There are two other salient pieces of information about these files. The first is that the energyTotal and EnergyGenerated fields are running totals of the amount of energy generated on that particular day. The second is that in the first version of the schema, energyTotal needs to be multiplied by 1000 to get the actual KW/h value.

With that knowledge ready, let's dive into the code.

The first step is to start up a Spark session:

from pyspark.sql.functions import date_format
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Basic Spark session configuration
sc = SparkContext("local", "PV Inverter Analysis")
spark = SparkSession(sc)

# We don't need most of this output
log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)

I observed while prototyping this in the pyspark REPL environment that if I didn't turn the logging output right down, then I'd see squillions of INFO messages.

The second step is to generate a list of files. As you might have guessed, I've got a year/month/day hierarchy - but the older files have a csv extension. To get those files (and since I want to be able to process any given year or year+month combination), I need to use some globbing:

import glob

def generateFiles(topdir, year, month):
    """Construct per-year dicts of lists of files"""
    allfiles = {}
    kkey = ""
    patterns = []
    months = []
    # Since some of our data dirs have months as bare numbers and
    # others have a prepended 0, let's match them correctly.
    if month:
        if month < 10:
            months = [month, "0" + str(month)]
        else:
            months = [month]
    if year and month:
        patterns = ["{year}/{monthp}/**".format(year=year, monthp=monthp)
                    for monthp in months]
        kkey = year
    elif year:
        patterns = ["{year}/*/**".format(year=year)]
        kkey = year

    if patterns:
        globs = []
        for pat in patterns:
            globs.extend(glob.glob(os.path.join(topdir, pat)))
        allfiles[kkey] = globs
    else:
        for yy in allYears:
            allfiles[yy] = glob.glob(os.path.join(topdir,
                                                  "{yy}/*/*".format(yy=yy)))
    return allfiles

To load in each file, I turned to the tried-and-true Python standard module csv, and - rather than having a v1 and v2 processing function, I model DRY and use an input argument to determine which set of elements to match:

import csv
from datetime import datetime

def importCSV(fname, isOld):
    output = []
    if isOld:
        multiplier = 1000.0
    else:
        multiplier = 1.0

    csvreader = csv.reader(open(fname).readlines())
    for row in csvreader:
        try:
            if isOld:
                (tstamp, temp, _enow, _etoday, powergen, vdc,
                 current, energen, vac) = row
            else:
                (tstamp, temp, powergen, vdc, current, energen, vac) = row
        except ValueError as _ve:
            # print("failed at {row} of {fname}".format(row=row, fname=fname))
            continue

        if "e" in temp:
            # invalid line, skip it
            continue

        if isOld:
            isostamp = datetime.fromtimestamp(int(tstamp))
        else:
            isostamp = datetime.fromisoformat(tstamp)

        output.append({
            "timestamp": isostamp,
            "Temperature": float(temp),
            "PowerGenerated": float(powergen),
            "VoltageDC": float(vdc),
            "Current": float(current),
            "EnergyGenerated": float(energen) * multiplier,
            "VoltageAC": float(vac)})
    return output

Now we get to the Apache Spark part. Having got a dictionary of anonymous dicts I can turn them into an Resilient Distributed Dataset (RDD) and thence a DataFrame. I chose the DataFrame model rather than a Row because that matches up nicely with my existing data format. For other applications (such as when I extend my Twitter Sentiment Analysis project with the streaming API) I'll use the Row datatype instead.

def now():
    """ Returns an ISO8601-formatted (without microseconds) timestamp"""
    return datetime.now().strftime("%Y-%M-%dT%H:%m:%S")

allFiles = generateFiles("data", qyear, qmonth)

print(now(), "Importing data files")

for k in allFiles:
    rddyear = []
    for fn in allFiles[k]:
        if fn.endswith(".csv"):
            rddyear.extend(importCSV(fn, True))
        else:
            rddyear.extend(importCSV(fn, False))
    rdds[k] = rddyear

print(now(), "All data files imported")

for year in allYears:
    rdd = sc.parallelize(rdds[year])
    allFrames[year] = rdd.toDF()
    newFrame = "new" + str(year)
    # Extend the schema for our convenience
    allFrames[newFrame] = allFrames[year].withColumn(
        "DateOnly", date_format('timestamp', "yyyyMMdd")
    ).withColumn("TimeOnly", date_format('timestamp', "HHmmss"))
    allFrames[newFrame].createOrReplaceTempView("view{year}".format(
        year=year))

print(now(), "Data transformed from RDDs into DataFrames")

The reason I chose to extend the frames with two extra columns is because when I search for the record dates (minimum and maximum), I want to have a quick SELECT which I can aggregate on.

ymdquery = "SELECT DISTINCT DateOnly from {view} WHERE DateOnly "
ymdquery += "LIKE '{yyyymm}%' ORDER BY DateOnly ASC"

for year in allYears:
    for mon in allMonths:
        if mon < 10:
            yyyymm = str(year) + "0" + str(mon)
        else:
            yyyymm = str(year) + str(mon)

        _dates = spark.sql(ymdquery.format(
            view=view, yyyymm=yyyymm)).collect()
        days = [n.asDict()["DateOnly"] for n in _dates]

        _monthMax = frame.filter(
            frame.DateOnly.isin(days)).agg(
                {"EnergyGenerated": "max"}).collect()[0]
        monthMax = _monthMax.asDict()["max(EnergyGenerated)"]

I keep track of each day's maximum, and update my minval and minDay as required. All this information is then stored in a per-month dict, and then in a per-year dict.

The last stage is to print out the record dates, monthly and yearly totals, averages and other values.

Running this utility on my 4-core Ubuntu system at home, I get what I believe are ok timings for whole-year investigations, and reasonable timings if I check a specific month.

When I run the utility for January 2018, the output looks like this:

(v-3.7-linux) flerken:solar-spark $ SPARK_LOCAL_IP=0.0.0.0 time -f "%E"  spark-submit --executor-memory 2G --driver-memory 2G solar-spark.py  -y 2018 -m 1
19/10/15 12:23:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark default log4j profile: org/apache/spark/log4j-defaults.properties

[Most INFO-level output elided]

19/10/15 12:23:58 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/10/15 12:23:58 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://0.0.0.0:4040

2019-23-15T12:10:59 Importing data files
2019-23-15T12:10:59 All data files imported
/space/jmcp/web/v-3.7-linux/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/session.py:366: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
2019-24-15T12:10:01 Data transformed from RDDs into DataFrames
2019-24-15T12:10:01 Analysing 2018
2019-24-15T12:10:01          January
2019-24-15T12:10:15 All data analysed
2019-24-15T12:10:15 2018 total generation: 436130.00 KW/h
2019-24-15T12:10:15         January total:               436130.00 KW/h
2019-24-15T12:10:15         Record dates for January:    Max on 20180131 (16780.00 KW/h), Min on 20180102 (10560.00 KW/h)
2019-24-15T12:10:15         Average daily generation  14068.71 KW/h
2019-24-15T12:10:15 ----------------

0:18.76

While that processing is going on, you can see a dashboard with useful information about the application at http://localhost:4040:

ExecutorsJobsDetails of a query

You can find the code for this project in my GitHub repo solar-spark.