Some Pythonic Kafka stuff

I've been actively interviewing over the last few months, and was recently talking with a cloud streaming provider about a Staff Engineer SRE role specializing in Apache Kafka. I've been doing a lot of work in that space for $employer and quite like it as a data streaming solution.

The interviews went well, and they sent me a do-at-home challenge with a one week timeout.

The gist of it was I had to create a Flask app which allowed the user to enter a URL to monitor for status on a given frequency, use Python to write a Kafka Producer to publish this data to a topic, and write a Kafka Consumer to read from the topic and insert into a PostgreSQL database.

Fun!

I did a brief investigation into using threads within a Flask app for the monitoring code, but quickly decided that a better architecture would be to do the monitoring via a separate daemon. Separation of concerns to allow for easier maintenance. Suddenly I'm all about ongoing maintenance rather then how quickly I can deliver a new feature... Hmmm.

The next step was to sketch out the table schema I wanted in the database:



CREATE SEQUENCE public.urltargets_seq
            INCREMENT BY 1
            MINVALUE 1
            MAXVALUE 9223372036854775807
            START 1
            CACHE 1
            NO CYCLE;

    CREATE SEQUENCE public.monitor_results_seq
            INCREMENT BY 1
            MINVALUE 1
            MAXVALUE 9223372036854775807
            START 1
            CACHE 1
            NO CYCLE;

    CREATE TABLE IF NOT EXISTS urltargets (
            urltargets_pk           int4 NOT NULL DEFAULT nextval('urltargets_seq'::regclass),
            urltarget                       varchar(1024) NOT NULL,
            monitor_frequency       int NOT NULL CHECK (monitor_frequency in (1, 2, 5, 10, 20, 30)),
            CONSTRAINT urltargets_pkey PRIMARY KEY (urltargets_pk)
    );

    CREATE TABLE IF NOT EXISTS monitor_results (
            monitor_results_pk      int4 NOT NULL DEFAULT nextval('monitor_results_seq'::regclass),
            http_status                     int NOT NULL,
            start_time                      timestamp with time zone NOT NULL,
            duration            int4 NOT NULL,
            urltarget_fk            int4 NOT NULL,
            CONSTRAINT monitor_results_fk_fkey FOREIGN KEY (urltarget_fk) REFERENCES urltargets(urltargets_pk)
    );


Having decided that I would offer monitoring frequencies of 1, 2, 5, 10, 20 and 30 minutes, I created views for the Flask app to use as well, rather than direct queries. They all look like this, with other values substituted in as you would expect.

CREATE OR REPLACE VIEW view_1m as
            SELECT mr.monitor_results_pk
                    ,  ut.urltarget
                    ,  mr.http_status
                    ,  mr.start_time
                    ,  mr.duration
            FROM monitor_results mr
            JOIN urltargets ut on ut.urltargets_pk = mr.urltarget_fk
            WHERE ut.monitor_frequency = 1
    ;



Since I really like seeing schemata visualised, I created a nice(ish) ERD as well:

/images/2021/DB-ERD.png

Well that was straight forward, how about the application and daemon?


I split out the setup functionality into a separate file importable by the Flask app, the monitor daemon and the consumer. This contained the database connection, Kafka Producer and Kafka Consumer code. There's an interesting little niggle in the Kafka Producer setup which is not immediately obvious and required a bit of digging in StackOverflow as well as enabling debug output with librdkafka:

  def _get_kafka_configuration():
      """
      Common function to retrieve the Kafka configuration.
      """
      global appconfig

      configuration = {
          "bootstrap.servers": appconfig["kafka"]["broker"],
          "group.id": "website-monitor",
          "ssl.key.location": appconfig["kafka"]["keyfile"],
          "ssl.certificate.location": appconfig["kafka"]["certfile"],
          "ssl.ca.location": appconfig["kafka"]["cafile"],
          "security.protocol": "SSL",
          # "debug": "eos, broker, admin",  # re-enable if needed
          'transaction.timeout.ms': 60000,
          'enable.idempotence': True
      }
      return configuration


  def setup_kafka_producer(view):
      """
      Creates the connection to our Kafka brokers so we can publish
      messages to the topics we want. We take the {view} argument so
      that we can avoid blatting multiple producers together and getting
      errors from the broker about zombies and fencing. See
      https://stackoverflow.com/questions/50335227/how-to-pick-a-kafka-transaction-id
      for more details

      Return: a Producer
      """

      configuration = _get_kafka_configuration()
      configuration["transactional.id"] = "website-monitor" + str(view)
      kafkaProducer = Producer(configuration)
      try:
          kafkaProducer.init_transactions()
      except KafkaError as ke:
          # If we can't do this, then we have to quit
          print(f"""Producer failed to init_transactions(), throwing {ke}""")
          raise

      return kafkaProducer


When I was working with the daemon, my first iteration tried opening the DB connection and Producer in each thread's (one for each frequency) __init__() function, and .... that didn't work.

The DB connection is not picklable, so does _not_ survive the call to os.fork(). Once I had rewritten the setup and run methods to get the DB connection, that part was groovy.

The Kafka Producer still required a bit of work. After reading through stackoverflow and the upstream for librdkafka, I saw that I needed to similarly delay initialising the producer until the thread's run() method was called. I also observed that each Producer should also initialise the transaction feature, but leave the begin... end of the transaction to when it was called to publish a message.

I still had a problem, though - some transactions would get through, but then the Producer would be fenced. This was the niggle, and where the StackOverflow comments helped me out:

Finally, in distributed environments, applications will crash or —worse!— temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost. Through this process, we may have multiple instances processing the same input topics and writing to the same output topics, causing duplicate outputs and violating the exactly once processing semantics.

We call this the problem of “zombie instances.” [emphasis added]


I realised that I was giving the same transactional id to each of the six producer instances (setting the 'transactional.id' in the configuration dict generated by _get_kafka_configuration(), so I needed to uniqify them somehow. I decided to pass the monitoring frequency of the thread to the setup function, and ... booyah, I had messages being published.

That was a really nice feeling.


There is one other aspect of the monitoring daemon that I need to mention. Since each thread reads its list of URLs to monitor each time it wakes, I wanted to parallelize this effort. Monitoring each of the URLs in series could easily take too long from a sleep(...) point of view, and I really did not want to just fork a whole heap of processes and thread either - avoiding the potential for a fork-bomb.

To work around this I used the Python standard library concurrent.futures with a ThreadPoolExecutor for each target URL. Adding attributes to the future object enabled me to use an add_done_callback so that when the future crystallized it would then publish the message.


  def run(self):
      """
      Run's the monitor, updates account-keeping and kicks off
      notifications if required. Then back to sleep.
      """
      self._producer = setup_kafka_producer(self._view)
      self._conn = setup_db()
      self._cursor = self._conn.cursor()
      while True:
          alltargets = self._get_targets()
          if alltargets:
              # We use a 'with' statement to ensure threads in the pool
              # are cleaned up promptly
              with cf.ThreadPoolExecutor(max_workers=50) as executor:
                  for tgt in alltargets:
                      future = executor.submit(check_url,
                                               tgt[0], tgt[1])
                      future.args = (tgt[0], tgt[1])
                      future.producer = self._producer
                      future.add_done_callback(construct_and_publish)
                      self._futures.append(future)
                  for future in cf.as_completed(self._futures):
                      if future.done():
                          self._futures.remove(future)

          sleep(self._sleeptime)

The check and publish methods are outside of the thread definition:


  def construct_and_publish(input):
      """
      Callback function for the concurrent.future that each thread
      makes use of to query a website. Turns the future's attributes
      into a message for 'url-to-monitor' topic, then publishes that
      message to the topic.
      """
      if input.cancelled() or input.exception():
          errmsg = """Monitor attempt for {args} failed"""
          print(errmsg.format(args=input.args,
                              file=stderr, flush=True))
      else:
          message = json.dumps(dict(zip(msgFields, input.result())))
          input.producer.begin_transaction()
          publish_message(producer=input.producer,
                          topic="url-monitor-results",
                          message=message.encode('utf-8'))
          input.producer.commit_transaction()


  def check_url(url, fk):
      """
      Performs an 'HTTP GET' of the supplied url and returns a tuple containing
      (fk, start_time, duration, http_status).
      The start_time is expressed in milliseconds since the UNIX Epoch.
      """
      start_time = datetime.timestamp(datetime.now())
      result = requests.get(url)
      duration = datetime.timestamp(datetime.now()) - start_time
      return (fk, start_time, duration, result.status_code)

With the monitoring daemon written, I now needed the other end of the pipe - writing the Kafka Consumer to read from the topic and insert into the database. This was straightforward: we're polling for messages on both configured topics, when we read one we write it to the appropriate DB table using a prepared statement, commit and then do it all again with a while loop.


  urlToMonitorStmt = "INSERT INTO urltargets (urltarget, monitor_frequency "
  urlToMonitorStmt += "VALUES (%(urltarget)s, %(monitor_frequency)s)"

  urlMonitorResultsStmt = "INSERT INTO monitor_results (http_status, "
  urlMonitorResultsStmt += "urltarget_fk, start_time, duration) "
  urlMonitorResultsStmt += "VALUES (%(http_status)s, %(targetId)s, "
  urlMonitorResultsStmt += "to_timestamp(%(start_time)s), %(duration)s)"

  lookups = {
      "url-to-monitor": urlToMonitorStmt,
      "url-monitor-results": urlMonitorResultsStmt
  }

  if __name__ == "__main__":

      consumer = setup_kafka_consumer()

      connection = setup_db()

      while True:
          with connection.cursor() as curs:
              msglist = consumer.consume(200)
              for msg in msglist:
                  if not msg:
                      continue
                  elif msg.error():
                      print("Received error during poll: {error}".format(
                          error=msg.error()))
                  else:
                      stmt = lookups[msg.topic()]
                      values = json.loads(msg.value().decode('utf-8'))
                      curs.execute(stmt, values)
              curs.execute("COMMIT")

      connection.close()

Of course there should be error handling for the execute(). There should also be packaging and tests and templates for the report. Do not @ me, etc etc.

The reason why all these pieces are missing is because the day before I was due to hand this assignment in to my interviewer, I received a very, very nice offer from another company that I'd also been interviewing with - and I accepted it.




An unexpected live coding challenge

A few weeks ago I was in a technical interview, and was asked to do a live coding challenge. I was surprised, because this is the sort of thing that I expect a recruiter and hiring manager to mention ahead of time. Preparation is very important, and while I know that there are many people for whom live coding is a thrill, there are plenty of other people for whom it can be a terrifying experience.

I'm glad to say that I'm not terrified by it, but it's definitely not an ideal environment for me.

So after a few minutes of me describing what I've done in my career (it seemed pretty clear that the interviewer hadn't actually read my resume), and a few technical questions, we got into the challenge.

"""
For a given string composed of parenthesis ("(", "{", "["), check if the string is valid parenthesis.
1. "()" -- valid
2. "({})" -- valid
3. "(}{)" -- invalid
4. "{()}[{}]" -- valid
5. "({(}))" -- invalid
"""

I noted immediately that this is an issue which requires the processing function to track state, because you not only need to determine open and closed pairings, but also what type it is.

It took a minute to sketch out the classifications that I needed, talking through my decision process all the while:

OPENS = ["(", "{", "["]
CLOSES = [")", "}", "]"]

braces = [ "{", "}"]
parens = [ "(", ")"]
brackets = [ "[", "]"]

classes = { "braces": braces,
            "parens": parens,
            "brackets": brackets
}

I was able to stub out a check function pretty quickly, but got stuck when I went from the stub to implementation, because I realised that I needed to keep track of what the previous element in the string was.

Oh no! How do I do that? (A stack, btw)

Mental blank :(

I needed time to jog my memory, so I asked the interviewer to tell me about himself, what he does on the team and a few other questions.

This, I think, was a very good decision - with the focus of the interview not on me, I could step back and think about what basic data types in Python I could use to implement a stack.

The data type I needed is indeed pretty basic: a list().

A Python list() lets you push (the append() operation) and pop so with the addition of another data structure

counts = { "braces": 0,
           "parens": 0,
           "brackets": 0
}

and a short function to return the class of the element

def __classof(c):
    """ returns whether 'c' is in brackets, braces or parens """
    if c in braces:
        return "braces"
    elif c in brackets:
        return "brackets"
    else:
        return "parens"

we're now in a much better position for the algorithm.

By this time I had also calmed myself down, because everything came together pretty easily for me.

With the above code blocks already noted, here is the body of the function:

def check_valid(input):
    """ For the given examples at top, determine validity.
        Assumption: the input is _only_ braces, parens and brackets
    """

    # start
    c = input[0]

    stack = list()
    stack.append(c)

    counts[__classof(c)] += 1

    for c in input[1:]:
        if c in OPENS:
            ## increment count & add to stack
            stack.append(c)
            counts[__classof(c)] += 1
        else:
            ## closing checks
            if __classof(c) !=  __classof(stack[-1]):
                return "invalid"
            else:
                # decrement count_ (__classof(c))
                counts[__classof(c)] -= 1
                stack.pop()

    return "valid"

We're playing fast and loose here with input validity checking - there's no "is this an empty string?" and we're not handling a single-character string, let alone validating that our input only contains braces, parens and brackets.

With this main() function, though, we're doing pretty well:

## main
strings = ["""()""",
           """({})""",
           """(}{)""",
           """{()}[{}]""",
           """({(}))""",
           """](){}"""
           ]

for element in strings:
    print("""{element:20}""".format(element=element),
          check_valid(element))

which gives us the following output:

()                   valid
({})                 valid
(}{)                 invalid
{()}[{}]             valid
({(}))               invalid
](){}                valid

Using the criteria specified, the final case is invalid, given that it starts with a terminating rather than initiating/opening element - there's nothing to balance the element with. However at that point my time was up and I didn't worry about it.

My interviewer then asked whether I had considered using recursion to solve the problem.

I hadn't considered recursion because I generally don't have to for the use-cases I need to write - and in this particular problem space it didn't seem to me to be an effective use of resources.

Consider the longest case, {()}[{}]. If you're recursing on the check function, then you'll wind up calling the function four times, so that's four new stack frames to be created and destroyed. That doesn't strike me as particularly efficient in time or space. Iterating over the input, however, avoids all of the setup + teardown overhead.

Anyway, it was a relatively fun exercise, and I'm glad I did it. I was able to keep a cool head and buy myself enough time to jog my memory and finish the problem, and it worked the first time (I know, that _never_ happens!).

For future encounters like this, I think it's handy to remember these points:

  1. Breathe

  2. Talk through what you are doing, and why

  3. If you hit a problem, see point 1, and then say that you're stuck and need to think through a particular part of the issue.

  4. If you need to stop talking so you can think, say that that's what you need to do.

It is my impression that if your interviewer is a decent person, they will help you work through your point of stuckness so that you can complete as much as possible of the task.




Why do I see "Duplicate main class"?

I've recently started work on improving my skills and knowledge in the Java ecosystem, and while working on a previous post I burned several hours trying to work out why I was seeing this error:

[ERROR] ..../bearertokenCLI.java:[42,1] duplicate class.... bearer_token_cli.bearerTokenCLI

I didn't find the answers at StackOverflow to be very useful, because they invariably said something along the lines of "clean your project and let the IDE re-index things, it'll be fine".

Which is not a solution - it's like "curing" a memory leak by rebooting the host. I like to know the why of a problem.

I eventually re-re-read the message from the Maven compiler plugin and noticed that it was trying to compile 2 source files. For an exploratory project which only had one source file, this was unexpected:

[INFO] --- maven-compiler-plugin:3.8.1:compile (default-compile) @ bearer_token_cli ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to /home/jmcp/IdeaProjects/bearer-token-cli/target/classes

Why did I now have two files? The answer lies in a bit of laziness on my part. The previous post had hard-coded credentials and URLs, but I really wanted to start using a getopt()-like library called picocli, and rather than git commit ... I just copied my first version of the source to bearerTokenCLI-hc-v1.java and kept on editing.

Apart from the relevant information (2 source files) being in an [INFO] block rather than in the [ERROR] block, why on earth couldn't it have printed the names of the files it was compiling along the way?

If you come across this error, a quick

$ find src/main/java -name \*.java |xargs grep -i "public static void main"

should help you find where that erroneous main class is hiding.

Here's where I get a bit ranty. One of the patterns that we invented and applied to the Solaris OS/Net (ON) source tree during the development of #ProjectLullaby was that for every subdirectory which contained source files, the Makefile specified each file individually.

There are solid reasons for this, starting with the need to ensure that when you build part or all of the tree, we do not miss dependencies. Over the years there were enough instances of "developer changes code, adds new file or renames/splits old file, FAILS TO CHECK IN NEW FILES, breaks build after integration" that we forced specificity. You want to add, remove or delete files that the build depended on? It's ON YOU to make sure we're tracking them. A broken build meant either a followup changeset (with "(add missing file)" etc appended to the comment), or getting backed out.

While I'm enjoying some aspects of developing in Java and I do like leaving a lot of heavy lifting to a framework or a toolset, the heavy reliance in the Java world on an IDE to do thinking for you leaves me cold.




Queensland's 2011 floods

It's now ten years since we experienced the Queensland floods of December 2010-January 2011 . I took quite a few photos around the Centenary Suburbs and put some of them into a twitter thread last week. I've put those and many more together into an album for the record.

For our part, we got off lightly. The waters came to within about 1km of our home, and while Energex shut down the West Darra substation at 1pm on the day the waters rose on our part of the river, power was back on again 24 hours later. J was pregnant with #Child2 so the lack of air movement during an incredibly hot and humid night was very draining. But that was it for us. Many people were a lot more affected; the Mud Army helped with cleanup and it was heartbreaking to see just how many homes were damaged.




Access token retrieval in Python and Java

At $work I'm part of our API team, enabling access to the rather large datasets that we have acquired (and generated) over the years. We make heavy use of Postman for testing, but every now and again I find that I want to do something on the commandline.

All of our APIs require authorization, for which we use OAuth, and specifically the Bearer Token.

Rather than having to fire up Postman to extract a Bearer Token, I decided to write a Python script to do it for me, so I could then set an environment variable and pass that to curl as a command-line argument. I was a little lazy and hard-coded my clientid and secret - I'm not going to be requesting anybody else's token!

#!/usr/bin/python3
import requests

url="""https://$AUTHSERVER/access/oauth/token?grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}"""
client_id = "nope"
client_secret = "still_nope"

resp = requests.get(url.format(client_id=client_id,
                               client_secret=client_secret))
print("export BEARER=\"Authorization: Bearer " +
      resp.json()["access_token"] + "\"")

I'm taking advantage of the fact that I know through many years of use that the requests package does a lot of heavy lifting for me, particularly the JSON decoding.

Since $work has a shutdown between Christmas and New Year, I figured that I would spent some time implmenting this in Java. Not because I have a need to, but because I need to get more Java under my belt since $work is a Java shop and cross-pollination / polyglotting is useful.

The first step was to determine how to send an HTTP GET for a specific URI. A few searches later and I'd arrived at

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.URI;

and since it seems cleaner to import the Exceptions that these throw, I added

import java.io.IOException;
import java.net.URISyntaxException;

A bit more hard-coding laziness for the clientid and secret and I had the beginnings of a solution. (Note that I do not claim that any of this is using Best Practices, it's just a building block).

class bearerTokenCLI {

    private static URI authServer;
    private static HttpResponse<String> response;

    public static void main(String... args) {

        try {
            authServer = new URI("https", null,
                    "$AUTHSERVER", 443,
                    "/access/oauth/token",
                    "grant_type=client_credentials" +
                            "&client_id=nope" +
                            "&client_secret=still_node", null);
        } catch (URISyntaxException exc) {
            System.out.println("Received URISyntaxException");
            System.out.println(exc.getReason());
            System.out.println(exc.getMessage());
            System.out.println(exc.getInput());
        }

        System.out.println("Requesting " + authServer.toString());

Ok so far - we've created a new URI object, caught the specific exception that it could throw, and (because I'm learning) printing the stringified version of the URI to stdout.

Now we need an HttpRequest to send via an HttpClient:

HttpRequest request = HttpRequest.newBuilder(authServer).build();
HttpClient client = HttpClient.newHttpClient();

try {
    response = client.send(request, BodyHandlers.ofString());
} catch (java.io.IOException | java.lang.InterruptedException jiie) {
    /*
     * Note that this catch() uses Java7++ syntax for handling
     * multiple exceptions in the same block
     */
    System.out.println("Received java.io.IOException");
    System.out.println(jiie.getCause());
    System.out.println(jiie.getMessage());
    System.out.println(jiie.getStackTrace());
}

Assuming we didn't get an exception, we need to check that the HTTP Status Code of the response is OK, or 200:

if (response.statusCode() != 200) {
    /*
     * Something went wrong so print the url we requested, status code,
     * an error message and the response body as text.
     */
    System.out.println("Request was unsuccessful. " +
            "Received status code " + response.statusCode());
    System.out.println("URL requested was\n" + authServer.toString());
    System.out.println("Response body text:\n" + response.body());
    System.exit(1);
}

If it isn't ok, we bail out, and otherwise we check for the Content-Type header being set to 'application/json'. Why that specific value? If you refer to the RFCs for OAuth (RFC6749 and RFC6750) specifically section 5 of the latter, you see that

The parameters are included in the entity-body of the HTTP response using the "application/json" media type as defined by [RFC4627]. The parameters are serialized into a JavaScript Object Notation (JSON) structure by adding each parameter at the highest structure level. Parameter names and string values are included as JSON strings. Numerical values are included as JSON numbers. The order of parameters does not matter and can vary.

Let's check for that type then.

/*
 * Check that we've got 'application/json' as the Content-Type.
 * Per https://tools.ietf.org/html/rfc7231#section-3.1 we know that
 * Content-Type is a semicolon-delimited string of
 *     type/subtype;charset;...
 * More importantly, we know from https://tools.ietf.org/html/rfc6749#section-5.1
 * that the response type MUST be JSON.
 */
List<String> contentTypeHeader = response.headers().map().get("Content-Type");
if (contentTypeHeader.isEmpty()) {
    System.out.println("ERROR: Content-Type header is empty!");
    System.out.println(response.headers());
    System.exit(1);
}

Since contentTypeHeader is a List<T> we can either iterate over it, or, since we know that it can only occur once in an HTTP response we can grab element 0 directly. Here's iteration (and yes, I know we should be confirming that we've actually got 'application/json', do not @ me, etc etc):

for (String el: contentTypeHeader) {
    String contentType = el.split(";")[0];
    System.out.println("Actual Content-Type bit:   " + contentType);
}

On the other hand, making use of our knowledge that there's only one Content-Type header in the response, and per RFC7231 we know the format of the header, we can take a shortcut and grab the value directly:

String contentType = contentTypeHeader.get(0).split(";")[0];
if (!contentType.equalsIgnoreCase("application/json")) {
    /* Not JSON! */
    System.out.println("Content-Type is " + contentType +
            " not application/json. Exiting");
    System.exit(1);
}

So far, so good. It took me several hours to get to this point because I not only had to refresh my memory of those RFCs, but also realising that a short-circuit was possible.

Now we can move onto the response body text. By way of printing out response.getClass() I know that the response is an instance of class jdk.internal.net.http.HttpResponseImpl, and visual inspection of it shows that it's JSON. But how do I turn that into an array that I can pull the access_token information from?

At first I tried using Google's GSON but I just couldn't get my head around it. I need to find and understand more code examples. Until I do that, however, I turned to Jackson JR, which I found a lot more straightforward.

We need another import, this time

import com.fasterxml.jackson.jr.ob.JSON;

And then we construct a Map<String, Object> from the response body:

try {
    Map<String, Object> containerJSON = JSON.std.mapFrom(response.body());
    String accessToken = containerJSON.get("access_token").toString();
    System.out.println("export BEARER=\"BEARER " + accessToken + "\"\n");
} catch (IOException exc) {
    System.out.println("Caught exception " + exc.toString());
    System.out.println("Message:\n" + exc.getMessage());
}

You'll observe that I'm again being a bit lazy here by wrapping this block in the one try {...} catch (..) {..} block. Whyso? Because by this point we should be certain that we've actually got an access_token element in the response, and if we don't then there's something going wrong upstream.

Finally, how do we build this thing? As much as I'd like to just run javac over the source and create a solitary jar, I've found that including external dependencies is made immensely easier by using a build system like Maven, Ant or Gradle in the Java ecosystem. For C, of course, there's no place like make(1s) (ok, or GNU Make).

I started with using a Maven *archetype*, added this dependency to pom.xml:

<dependencies>
    <dependency>
        <groupId>com.fasterxml.jackson.jr</groupId>
        <artifactId>jackson-jr-objects</artifactId>
        <version>2.12.0</version>
    </dependency>
</dependencies>

and added the Maven Assembly plugin to the <build> lifecycle. Then building was a matter of

$ mvn clean install package assembly:single

and then I could run the package with

$ java -jar target/bearer_token_cli-1.0-SNAPSHOT-jar-with-dependencies.jar

All up, I estimate that researching and writing this in Java took me about 12 hours. Most of which was ecosystem research and exploration. There was only one syntax issue which tripped me up - I needed an Array and for about 10 minutes was searching through Javadocs for an appropriate class before I remembered I could use String[] arrName. Sigh.

Learning the ecosystem is the thing I'm finding most difficult with Java - it's huge and there are so many different classes to solve overlapping problems. I haven't even begun to work with @Annotations or dependency injection for my own code yet. Truth be told, after a decade+ of working in Solaris, the idea that anything could be injected into the code I've written puts a chill down my spine. I'm sure I'll get past it one day.




I know, I'll use a regex!

This past week, a colleague asked me for help with a shell script that he had come across while investigating how we run one of our data ingestion pipelines. The shell script was designed to clean input CSV files if they had lines which didn't match a specific pattern.

Now to start with, the script was run over a directory and used a very gnarly bit of shell globbing to generate a list of files in a subdirectory. That list was then iterated over to check for a .csv extension.

[Please save your eye-rolls and "but couldn't they..." for later].

Once that list of files had been weeded to only contain CSVs, each of those files was catted and read line by line to see if the line matched a desired pattern - using shell regular expression parsing. If the line did not match the pattern, it was deleted. The matching lines were then written to a new file.

[Again, please save your eye-rolls and "but couldn't they..." for later].

The klaxons went off for my colleague when he saw the regex:

NEW=${f%.csv}_clean.csv;
  {
  buffer=""
  read
  while IFS="" read -r line && [ -n "$line" ]
  do
        buffer="${buffer}${line}"
        if [[ "$buffer" =~ ^\"[0-9]{4}-([0][0-9]|1[0-2])-([0-2][0-9]|3[01])\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",[^,]*,\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",\"[^\"]*\",.*$ ]];
        then
              echo "$buffer"
              buffer=""
        else
              buffer="${buffer} "
        fi
  done
  } < "${f}" > "${NEW}"

My eyes got whiplash. To make it easier to understand, let's put each element of the pattern on a single line:

 ^\"[0-9]{4}-([0][0-9]|1[0-2])-([0-2][0-9]|3[01])\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 [^,]*,
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 \"[^\"]*\",
 .*$

Which is really something. The first field matches a date format - "yyyy-mm-dd" (which is ok), then we have 12 fields where we care that they are enclosed in double quotes, one field that we want to not be quoted, another 12 fields which are quoted again, and any other fields we don't care about.

Wow.

I told my colleague that this wasn't a good way of doing things (he agreed).

There are better ways to achieve this, so let's walk through them.

Firstly, the shell globbing. There's a Unix command to generate a list of filesystem entries which match particular criteria. It's called find. If we want a list of files which have a 'csv' extension we do this:

$ find DIR -type f -name \*.csv

You can use '.' or '*' or any way of representing a DIRectory in the filesystem.

Now since we want this in a list to iterate over, let's put it in a variable:

$ CSVfiles=$( find DIR -type f -name \*.csv -o -name \*.CSV )

(You can redirect stderr to /dev/null, with 2>/dev/null inside the parens if you'd like).

Now that we've got our list, we can move to the second phase - removing lines which do not match our pattern. Let's try this first with awk. Awk has the concept of a Field Separator, and since CSV files are Comma-Separated-Value files, let's make use of that feature. We also know that we are only really interested in two fields - the first (yyyy-mm-dd) and the fourteenth.

$ awk -F',' '$1 ~ /"[0-9]{4}-([0][0-9]|1[0-2])-([0-2][0-9]|3[01])"/ &&
    $14 !~ /".*"/ {print}' < $old > $new

That's still rather ugly but considerably easier to read. For the record, the bare ~ is awk's equals operator, and !~ is not-equals.

We could also do this with grep, but at the cost of using more of that horrible regex.

In my opinion a better method is to cons up a Python script for this validation purpose, and we don't need to use the CSV module.

from collections import UserString
from datetime import datetime

infile = open("/path/to/file.csv", "rw")
input = infile.readlines()

linecount = len(input)

for line in input:

    fields = line.split(",")
    togo = False

    try:
        datetime.strptime(fields[0], "%Y-%m-%d")
    except ValueError as _ve:
        togo = True

    if '"' in fields[14] or not UserString(fields[14]).isnumeric():
        togo = True
    if togo:
        del line

if len(input) != linecount:
    # We've modified the input, so have to write out a new version, but
    # let's overwrite our input file rather than creating a new instance.
    infile.seek(0)
    infile.write("\n".join(input))

infile.close()

This script is pretty close to how I would write it in C (could you tell?).

We first open the file (for reading and writing) and read in every line, which yields us a list. While it's not the most memory-efficient way of approaching this problem, it does make processing more efficient because it's one read(), rather than one-read-per-line. We store the number of lines that we've read in for comparison at the end of our loop, and then start the processing.

Since this is a CSV file we know we can split() on the comma, and having done so, we check that we can parse the first field. We're not assigning to a variable with datetime.strptime() because we only care that we can rather than what the object's value is. The second check is to see that we cannot find the double apostrophe in the element, and that the content of the field is in fact numeric. If neither of these checks succeed, we know to delete the line from our input.

Finally, if we have in fact had to delete any lines, we rewind our file (I was going to write pointer, but it's a File object. Told you it was close to C!) to the start, and write out each line of input with a newline character before closing the file.

Whenever I think about regexes, especially the ones I've written in C over the years, I think about this quote which Jeffrey Friedl wrote about a long time ago:

Some people, when confronted with a problem, think “I know, I'll use regular expressions.” Now they have two problems.

It was true when I first heard it some time during my first year of uni, and still true today.




Common Table Expressions and an ORM

I solved a gnarly performance problem last week and I'd like to share with you what I learnt while doing it.

We received an alert from one of our automated monitoring systems that a particular query being run via an API was taking too long. And by "too long" I mean minutes. Since database engines are optimised to return data to you in milliseconds, this seemed wildly wrong.

The first step when you are checking a query for performance is to use the query analyzer. This is a very powerful tool in any database, and helps you see just what the engine is going to do for any particular bit of SQL. The term you're looking for in the documentation is EXPLAIN.

We thought the query was fairly simple. It followed this general form:

SELECT columns, COUNT(SELECT specific column WHERE conditions),
FROM database
WHERE conditions
ORDER BY ordering criteria;

and gave us not just results with specific conditions, but the count of rows which met those conditions too.

You will notice that I did not use first set of conditions and second set of conditions. This is because the selection criteria were in fact the same. That was the first clue.

A second clue was that of these selection conditions was that we had a range check - is the column's value between A and B? [We actually had two, but having just one was sufficient to show the problem]. This was expressed as

columnA >= smallervalue AND
columnA <= largervalue

So you'll see that for each row we had two comparisons to do. I tried making that condition singular, by using just >= smallervalue (I also tried <= largervalue> - no difference in timing) and while that did make the query faster, it did not reflect what we need to be able to do, so that was out.

Back to the query planner. It wasn't happy. It turns out that the most efficient plan it could come up with was using a Bitmap Index Scan. Twice. For the same query - but throwing the results of the first query away before running the whole thing again.

I knew that there had to be a better way (tm) - and there was. Here's the first thing I learned: the SQL standard has a thing called a Common Table Expression or CTE. This is a way of creating a temporary table that is used for just that query.

With this knowledge, I could now re-write the query in a considerably more efficient fashion. (It took me several days to figure this out - I'm no SQL guru!)

WITH better_query AS
    (SELECT columnname
    FROM tablename
    WHERE conditions)
SELECT columns, count(better_query.columnname)
FROM tablename
GROUP BY grouping criteria
ORDER BY ordering criteria;

Excellent! ... except that that was giving me all the rows in tablename which matched from better_query rather than the specific rows which met conditions. To solve this niggle I needed a join.

WITH better_query AS
    (SELECT columnname
    FROM tablename
    WHERE conditions)
SELECT columns, count(better_query.columnname)
FROM tablename TN
JOIN better_query BQ on BQ.columnname = TN.columnname
GROUP BY grouping criteria
ORDER BY ordering criteria;

The query planner was a lot happier. Happier to the tune of a 20x improvement.

There might have been a cheer or two.

So how do we put this into our application codebase?

While a major reason $employer hired me was my Python skills, $employer isn't a Python shop by any stretch of the imagination. We're a Java shop, and I'm cross-pollinating :-).

Part of that process has been learning how we abstract database specificities out of the code - we use an Object-Relational Mapping (aka ORM) to cope with using more than one vendor's database. There are a few options for Java ORMs and the one that we use is jOOQ. This is the first time I've used an ORM (of course I've read about them, but years ago) so this too was something new that I've learned. Heading to the documentation I read a lot, getting my head around the concepts and syntax.

It took me several days of research and hacking (stackoverflow, github, come on down!) to work out the correct syntax.

Here's the gist of what we needed:

public static QueryClass {

    private final String withClauseName = new String("betterQuery");
    private final String withClauseField = new String("fieldName");

    ....

    CommonTableExpression<Record1<Integer>> withClause = DSL.name(withClauseName)
        .fields(withClauseField)
        .as(select(columnName)
            .from(tableName)
            .where(conditions)
        );

    Field<Integer> results = DSL.field(context.select(
        count(DSL.field(columnName)))
        .from(withClause)
        .limit(1)
    ).as("results");

    /*
     * append results field to array of column names in the select, then ...
     */

    List<Typename> resultSet = DSLcontext.with(withClause)
        .select(columns)
        .from(tableName)
        .join(withClause)
            .on(withClause.field(withClauseField, typename.class)
                .eq(tablename.columnName))
        .groupBy(columnName)
        .orderBy(columnName)
    ....
};

No, it's not simple - but using jOOQ does mean that we can keep our code pretty close to how we would write bare SQL (in this case that would still be rather gnarly), and that is definitely a win.

Acknowledgement

Since this is another SQL-based post, I'd like to acknowledge the massive knowledge and confidence boost I got from reading Julia Evans (aka b0rk)'s SQL zine Become a SELECT star. She has a beautiful style and breaks complex technical concepts down into easily understandable chunks with great illustrations. Reading that zine advanced my database confidence immensely.




A brief introduction to the Python Database API

As popular as NoSQL databases currently are, there is still an immense amount of data in the world for which the relational database is still the most appropriate way to store, access and manipulate it. For those of us who like to use Python rather than JDBC, we are fortunate to have bindings for the major databases which implement the Python Database API, aka PEP249.

This standard is important because it allows you to write your application in a way that promotes cross-platform (cross-database engine) portability. While you do still need to be aware of differences between the databases (for example, Oracle's VARCHAR2 vs PostgreSQL's VARCHAR), the essential tasks that you need to accomplish when making use of a database are abstracted from you.

In this brief introduction I will show you how to install two popular Python database binding packages, connect to a database and run queries.

Installing the bits you need

I assume that you've got Python installed on your system already, and that the database you want to connect to is set up and running. What you need now are the bindings for that particular database. For the Oracle database, this is a package known as cx_Oracle. For PostgreSQL, I suggest psycopg2 though there are other bindings available.

You will definitely need to install the pre-packaged database client libraries on your host (or in your container) as well. For Oracle look for your platform in the Oracle Instant Client download pages, for PostgreSQL on linux you will need libpq-dev as supplied by your package manager. On MacOS, please find the appropriate option from the PostgreSQL MacOSX download page.

After you've installed these pieces, you can then pip install --user cx_Oracle or pip install --user psycopg2. My personal preference is to install bindings in a venv. For Solaris 11 you can get the cx_Oracle bits as well as the Oracle Instant Client by uttering pkg install cx_Oracle. [On a personal note, I made that possible - see the History tab on the Solaris Userland cx_Oracle github].

Getting a connection to the database

Now that you have the correct packages installed, you can start investigating. The first thing to do is start an interactive interpreter and import the modules you need:

$ python3.8
>> import cx_Oracle

(Note that unless you're using Solaris' pre-packaged version, you must point LD_LIBRARY_PATH to where the module can locate the Instant Client libraries, specifically libclntsh.so)

$ python 3.8
>>> import psycopg2
>>> from psycopg2.extras import execute_batch

We need a connection to the database, which in Oracle terms is a DSN, or Data Source Name. This is made up of the username, password, host (either address or IP), port and database instance name. Put together, the host, port and database instance are what Oracle calls the "service name".

The common example you will see in Oracle documentation (and on stackoverflow!) is

scott/tiger@orcl:1521/orcl

I'm a bit over seeing that, so I've created a different username and DBname:

DEMOUSER/DemoDbUser1@dbhost:1521/demodb

Since Oracle defaults to using port 1521 on the host, you only need to specify the port number if your database is listening on a different port.

An Oracle connection is therefore

user = "DEMOUSER"
passwd = "DemoDbUser1"
servicename = "dbhost:1521/demodb"

try:
    connection = cx_Oracle.connect(user, passwd, servicename)
except cx_Oracle.DatabaseError as dbe:
    print("""Unable to obtain a connection to {servicename}""".format(servicename=servicename))
    raise

With PostgreSQL we can also specify whether we want SSL enabled.

try:
    connection = psycopg2.connect(dbname=dbname,
                                  user=dbuser,
                                  password=dbpassword,
                                  host=dbhost,
                                  port=dbport,
                                  sslmode=dbsslmode)
except psycopg2.OperationalError as dboe:
    print("""Unable to obtain a connection to the database.\n
          Please check your database connection details\n
          Notify dbadmin@{dbhost}""".format(dbhost=dbhost),
          file=sys.stderr, flush=True)
    raise

Once we have the connection, we need a cursor so we can execute statements:

cursor = connection.cursor()

Now that we have our connection and a cursor to use, what can we do with it? SQL baby!

We need a handy dataset to muck around with, so I've got just the thing courtesy of my previous post about determining your electorate. My JSON files have a fairly basic structure reflecting my needs for that project, but that is still sufficient our purposes here.

>>> import json
>>> actf = json.load(open("find-my-electorate/json/ACT.json", "r"))
>>> actf.keys()
dict_keys(['Brindabella', 'Ginninderra', 'Kurrajong', 'Murrumbidgee', 'Yerrabi'])
>>> actf["Yerrabi"].keys()
dict_keys(['jurisdiction', 'locality', 'blocks', 'coords'])

The 'jurisdiction' key is the state or territory name (ACT, Australian Capital Territory in this case), the 'locality' is the electorate name, 'blocks' is a list of the Australian Bureau of Statistics Mesh Block which the electorate contains, and 'coords' is a list of the latitude, longitude pairs which are the boundaries of those blocks.

Now we need a schema to operate within. This is reasonably simple - if you've done any work with SQL databases. To start with, we need two SEQUENCE s, which are database objects from which any database user may generate a unique integer. This is the easiest and cheapest way that I know of to generate values for primary key columns. (For a good explanation on them, see the Oracle 19c CREATE SEQUENCE page). After that, we'll create two tables: electorate and geopoints. (We're going to ignore the Mesh Block data, it's not useful for this example).

-- This is the Oracle DB form
DROP TABLE GEOPOINTS CASCADE CONSTRAINTS;
DROP TABLE ELECTORATES CASCADE CONSTRAINTS;

DROP SEQUENCE ELECTORATE_SEQ CASCADE;
DROP SEQUENCE LATLONG_SEQ CASCADE;

CREATE SEQUENCE ELECTORATE_SEQ INCREMENT BY 1 MINVALUE 1 NOMAXVALUE;
CREATE SEQUENCE LATLONG_SEQ INCREMENT BY 1 MINVALUE 1 NOMAXVALUE;

CREATE TABLE ELECTORATES (
    ELECTORATE_PK   NUMBER DEFAULT DEMOUSER.ELECTORATE_SEQ.NEXTVAL NOT NULL PRIMARY KEY,
    ELECTORATE      VARCHAR2(64) NOT NULL,
    STATENAME       VARCHAR2(3) NOT NULL -- Using the abbreviated form
);

CREATE TABLE GEOPOINTS (
    LATLONG_PK      NUMBER DEFAULT DEMOUSER.LATLONG_SEQ.NEXTVAL NOT NULL PRIMARY KEY,
    LATITUDE        NUMBER NOT NULL,
    LONGITUDE       NUMBER NOT NULL,
    ELECTORATE_FK   NUMBER REFERENCES ELECTORATES(ELECTORATE_PK)
);

-- Now for the PostgreSQL form
DROP TABLE IF EXISTS GEOPOINTS CASCADE;
DROP TABLE IF EXISTS ELECTORATES CASCADE;

DROP SEQUENCE IF EXISTS ELECTORATE_SEQ CASCADE;
DROP SEQUENCE IF EXISTS LATLONG_SEQ CASCADE;

CREATE SEQUENCE ELECTORATE_SEQ INCREMENT BY 1 MINVALUE 1 NO MAXVALUE;
CREATE SEQUENCE LATLONG_SEQ INCREMENT BY 1 MINVALUE 1 NO MAXVALUE;

CREATE TABLE ELECTORATES (
    ELECTORATE_PK   INTEGER DEFAULT NEXTVAL('ELECTORATE_SEQ') NOT NULL PRIMARY KEY,
    ELECTORATE      VARCHAR(64) NOT NULL,
    STATENAME       VARCHAR(3) NOT NULL -- Using the abbreviated form
);

CREATE TABLE GEOPOINTS (
    LATLONG_PK      INTEGER DEFAULT NEXTVAL('LATLONG_SEQ') NOT NULL PRIMARY KEY,
    LATITUDE        NUMERIC NOT NULL,
    LONGITUDE       NUMERIC NOT NULL,
    ELECTORATE_FK   INTEGER REFERENCES ELECTORATES(ELECTORATE_PK) NOT NULL
);

We need to execute these statements one by one, using cursor.execute(). Come back once you've done that and we've got our schema set up.

[I see that a short time has passed - welcome back]

Now we need to populate our database. For both connection types, we'll make use of prepared statements, which allow us to insert, update, delete or select many rows at a time. For the Oracle connection we'll use the executemany() function, but for PostgreSQL's psycopg2 bindings we'll use execute_batch() instead. (See the psycopg2 website note about it).

For the Oracle examples, we'll use a bind variable so that when we INSERT into the ELECTORATES table we get the ELECTORATE_PK to use in the subsequent INSERT to the GEOPOINTS table. That saves us a query to get that information. The psycopg2 module does not have this support, unfortunately. Since we have many records to process, I've created some small functions to enable DRY

# Oracle version
>>> epk = cursor.var(int)
>>> electStmt = """INSERT INTO ELECTORATES (ELECTORATE, STATENAME)
...                VALUES (:electorate, :statename)
...                RETURNING ELECTORATE_PK INTO :epk"""
>>> geoStmt = """INSERT INTO GEOPOINTS(LATITUDE, LONGITUDE, ELECTORATE_FK)
...              VALUES (:lat, :long, :fk)"""

>>> def addstate(statename):
...     for electorate in statename:
...         edict = {"electorate": statename[electorate]["locality"],
...                  "statename": statename[electorate]["jurisdiction"],
...                  "epk": epk}
...         cursor.execute(electStmt, edict)
...         points = list()
...         for latlong in statename[electorate]["coords"]:
...             points.append({"latitude": float(latlong[1]),
...                            "longitude": float(latlong[0]),
...                            "fk": int(epk.getvalue(0)[0])})
...         cursor.executemany(geoStmt, points)
...         connection.commit()

>>> allpoints = dict()
>>> states = ["act", "nsw", "nt", "qld", "sa", "tas", "wa", "vic", "federal"]

>>> for st in states:
...     allpoints[st] = json.load(open("json/{stu}.json".format(stu=st.upper()), "r"))
...     addstate(allpoints[st])

For the PostgreSQL version, we need to subtly change the INSERT statements:

>>> electStmt = """INSERT INTO ELECTORATES (ELECTORATE, STATENAME)
...                VALUES (%(electorate)s, %(statename)s)
...                RETURNING ELECTORATE_PK"""
>>> geoStmt = """INSERT INTO GEOPOINTS(LATITUDE, LONGITUDE, ELECTORATE_FK)
...                VALUES (%(latitude)s, %(longitude)s, %(fk)s)"""

Another thing we need to change is the first execute, because bind variables are an Oracle extension, and we're also going to change from executemany() to execute_batch():

>>> def addstate(statename):
...     for electorate in statename:
...         edict = {"electorate": statename[electorate]["locality"],
...                  "statename": statename[electorate]["jurisdiction"]}
...         cursor.execute(electStmt, edict)
...         epk = cursor.fetchone()[0]
...         points = list()
...         for latlong in statename[electorate]["coords"]:
...             points.append({"latitude": float(latlong[1]),
...                            "longitude": float(latlong[0]), "fk": epk})
...         execute_batch(cursor, geoStmt, points)
...         connection.commit()

The rest of the data loading is the same as with Oracle. Since I'm curious about efficiencies, I did a rough benchmark of the data load with executemany() as well, and it was around half the speed of using execute_batch(). YMMV, of course, so always test your code with your data and as close to real-world conditions as possible.

Now that we have the data loaded we can do some investigations. I'm going to show the PostgreSQL output for this and call out differences with Oracle where necessary.

While some parts of our state and territory borders follow rivers and mountain ranges, quite a lot of them follow specific latitudes and longitudes. The border between Queensland and New South Wales, for example is mostly along the 29th parallel. Between the Northern Territory and South Australia it's the 26th parallel, and that between South Australia and Western Australia is along meridian 129 East.

If you go to a mapping service and plug in 29S,141E (the line between Queensland and New South Wales):

/images/2020/python-db-api/29s141e-400x400.png

you'll see that the exact point is not where the boundary is actually drawn. That means we need to use some fuzziness in our matching.

>>> cursor.execute("""SELECT STATENAME, ELECTORATE FROM ELECTORATES E WHERE ELECTORATE_PK IN
...                   (SELECT DISTINCT ELECTORATE_FK FROM GEOPOINTS WHERE LATITUDE BETWEEN -29.001 AND -28.995)
...                   ORDER BY STATENAME, ELECTORATE""")
>>> results = cursor.fetchall()
>>> for s, e in results:
...     print("{s:6} {e}".format(s=s, e=e))
...
NSW    Ballina
NSW    Barwon
NSW    Clarence
NSW    Lismore
NSW    New England
NSW    Northern Tablelands
NSW    Page
NSW    Parkes
QLD    Maranoa
QLD    Southern Downs
QLD    Warrego
SA     Giles
SA     Grey
SA     Stuart
SA     Stuart
WA     Durack
WA     Geraldton
WA     Kalgoorlie
WA     Moore
WA     North West Central

Hmm. Not only did I not want SA or WA electorates returned, the database has also given me the federal electorates as well. Let's update our electorates table to reflect that jurisdictional issue:

>>> cursor.execute("""ALTER TABLE ELECTORATES ADD FEDERAL BOOLEAN""")
>>> connection.commit()

Popping into a psql session for a moment, let's see what we have:

demodb=> \d electorates
                                    Table "public.electorates"
    Column     |         Type          | Collation | Nullable |               Default
---------------+-----------------------+-----------+----------+-------------------------------------
 electorate_pk | integer               |           | not null | nextval('electorate_seq'::regclass)
 electorate    | character varying(64) |           | not null |
 statename     | character varying(3)  |           | not null |
 federal       | boolean               |           |          |
Indexes:
    "electorates_pkey" PRIMARY KEY, btree (electorate_pk)
Referenced by:
    TABLE "geopoints" CONSTRAINT "geopoints_electorate_fk_fkey" FOREIGN KEY (electorate_fk) REFERENCES electorates(electorate_pk)

Now let's populate that column. Unfortunately, though, some state electorates have the same name as federal electorates - and some electorate names exist in more than one state, too! (I'm looking at you, Bass!). Tempting as it is to zorch our db and start from scratch, I'm going to take advantage of this information:

  1. We added the federal electorate list after all the states,

  2. the federal list was constructed starting with the ACT, and therefore

  3. Federal electorates will have a higher primary key value than all the states and territories.

With that in mind here's the first federal electorate entry:

demodb=> SELECT ELECTORATE_PK, ELECTORATE, STATENAME FROM ELECTORATES WHERE STATENAME = 'ACT' ORDER BY ELECTORATE_PK, STATENAME;
 electorate_pk |  electorate  | statename
---------------+--------------+-----------
             1 | Brindabella  | ACT
             2 | Ginninderra  | ACT
             3 | Kurrajong    | ACT
             4 | Murrumbidgee | ACT
             5 | Yerrabi      | ACT
           416 | Bean         | ACT
           417 | Canberra     | ACT
           418 | Fenner       | ACT
(8 rows)

Let's check how many electorates have a primary key higher than Bean:

>>> cursor.execute("""SELECT COUNT(ELECTORATE_PK), MAX(ELECTORATE_PK) FROM ELECTORATES""")
>>> cursor.fetchall()
[(566, 566)]

And a quick check to see that we do in fact have 151 electorates with that condition:

>>> 566 - 415
151

We do. Onwards.

>>> cursor.execute("""UPDATE ELECTORATES SET FEDERAL = TRUE WHERE ELECTORATE_PK > 415""")
>>> connection.commit()
>>> cursor.execute("""SELECT COUNT(*) FROM ELECTORATES WHERE FEDERAL IS TRUE""")
>>> cursor.fetchall()
[(151,)]

Likewise, we'll set the others to federal=False:

>>> cursor.execute("""UPDATE ELECTORATES SET FEDERAL = FALSE WHERE ELECTORATE_PK < 416""")
>>> connection.commit()

Back to our queries. I want to see both sorts of electorates, but grouped by whether they are federal or state electorates:

>>> cursor.execute("""SELECT E.STATENAME, E.ELECTORATE, E.FEDERAL FROM ELECTORATES E
...                   WHERE E.ELECTORATE_PK IN
...                       (SELECT DISTINCT ELECTORATE_FK FROM GEOPOINTS WHERE LATITUDE BETWEEN -29.001 AND -28.995)
...                   GROUP BY E.STATENAME, E.ELECTORATE, E.FEDERAL
...                   ORDER BY STATENAME, ELECTORATE""")
>>> fedstate = cursor.fetchall()
>>> fedstate
[('NSW', 'Ballina', False), ('NSW', 'Barwon', False), ('NSW', 'Clarence', False), ('NSW', 'Lismore', False), ('NSW', 'New England', True), ('NSW', 'Northern Tablelands', False), ('NSW', 'Page', True), ('NSW', 'Parkes', True), ('QLD', 'Maranoa', True), ('QLD', 'Southern Downs', False), ('QLD', 'Warrego', False), ('SA', 'Giles', False), ('SA', 'Grey', True), ('SA', 'Stuart', False), ('WA', 'Durack', True), ('WA', 'Geraldton', False), ('WA', 'Kalgoorlie', False), ('WA', 'Moore', False), ('WA', 'North West Central', False)]

>>> def tfy(input):
...     if input:
...         return "yes"
...     else:
...         return ""

>>> for res in fedstate:
...     fmtstr = """{statename:6} {electorate:30} {federal}"""
...     print(fmtstr.format(statename=res[0], electorate=res[1], federal=tfy(res[2])))
...
NSW    Ballina
NSW    Barwon
NSW    Clarence
NSW    Lismore
NSW    New England                    yes
NSW    Northern Tablelands
NSW    Page                           yes
NSW    Parkes                         yes
QLD    Maranoa                        yes
QLD    Southern Downs
QLD    Warrego
SA     Giles
SA     Grey                           yes
SA     Stuart
WA     Durack                         yes
WA     Geraldton
WA     Kalgoorlie
WA     Moore
WA     North West Central

I could have added a WHERE E.FEDERAL = TRUE to the query, too.

Finally, let's see what state electorates in WA and SA are on the border:

>>> cursor.execute("""SELECT STATENAME, ELECTORATE FROM ELECTORATES E
...                   WHERE ELECTORATE_PK IN
...                       (SELECT DISTINCT ELECTORATE_FK FROM GEOPOINTS WHERE LATITUDE BETWEEN -60.00 AND -25.995
                           AND LONGITUDE BETWEEN 128.995 AND 129.1)
...                   AND FEDERAL = FALSE ORDER BY STATENAME, ELECTORATE""")
>>> results = cursor.fetchall()
>>> for _res in results:
...     print("""{statename:6} {electorate}""".format(statename=_res[0], electorate=_res[1]))
...
NT     Namatjira
SA     Giles
WA     Kalgoorlie
WA     North West Central

Why do we have that electorate from the Northern Territory? It's because the coordinates are a bit fuzzy and we had to use a range (the BETWEEN) in our query.

Finally

I apologise, because while this was supposed to be a brief introduction I did get side-tracked with data investigation. I suppose that's an occupational hazard since I'm a Data Engineer working for a company which provides GIS-related services. Anyway, in terms of depth, this was definitely only scratching the surface of what is possible with Python and databases. I encourage you to go and read the Python Database API as well as the SQL reference manual(s) for your chosen database and its binding documentation.




init(Apache Spark)

In a previous post I wrote about how I've started down the Data Science path, kicking off with an exploration of sentiment analysis for political tweets. This is a topic which I will come back to in the future, not least because the nltk corpus I've made use of for au-pol-sentiment is based on British political tweets. While Australia and Britain share a common political heritage, I'm not completely confident that our political discourse is quite covered by that corpus.

In the meantime, another aspect of Data Science in practice is the use of an ecosystem called Apache Spark. Leaving aside my 20+ years of muscle memory spelling it as SPARC, this is a Machine Learning engine, described on the homepage as a unified analytics engine for large-scale data processing.

My experience is that when I want or need to learn a new toolkit or utility, the best way to do so is by trying to directly solve a specific problem with it. One problem (ok, not really a problem, more a set of questions) I have is that with all of the data I've gathered since 2013 from my solar inverter I'm dependent on pvoutput.org for finding per-year and per-month averages, maxima and minima. So with a data science-focused job interview (with Oracle Labs) approaching, I decided to get stuck in and get started with Apache Spark.

The first issue I faced was implementing the ETL pipeline. I have two types of files to load - the first contains the output from solarmonj, the second has the output from my solar inverter script.

Here's the first schema form:

Field name

Units

Timestamp

seconds-since-epoch

Temperature

float (degrees C)

energyNow

float (Watts)

energyToday

float (Watt-hours)

powerGenerated

float (Hertz)

voltageDC

float (Volts)

current

float (Amps)

energyTotal

float (Watt-hours)

voltageAC

float (Volts)


The second schema is from jfy-monitor, and has this schema:

Field name

Units

Timestamp

ISO8601-like ("yyyy-MM-dd'T'HH:mm:ss")

Temperature

float (degrees C)

PowerGenerated

float (Watts)

VoltageDC

float (Volts)

Current

float (Amps)

EnergyGenerated

float (Watts-Hours)

VoltageAC

float (Volts)

There are two other salient pieces of information about these files. The first is that the energyTotal and EnergyGenerated fields are running totals of the amount of energy generated on that particular day. The second is that in the first version of the schema, energyTotal needs to be multiplied by 1000 to get the actual KW/h value.

With that knowledge ready, let's dive into the code.

The first step is to start up a Spark session:

from pyspark.sql.functions import date_format
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Basic Spark session configuration
sc = SparkContext("local", "PV Inverter Analysis")
spark = SparkSession(sc)

# We don't need most of this output
log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)

I observed while prototyping this in the pyspark REPL environment that if I didn't turn the logging output right down, then I'd see squillions of INFO messages.

The second step is to generate a list of files. As you might have guessed, I've got a year/month/day hierarchy - but the older files have a csv extension. To get those files (and since I want to be able to process any given year or year+month combination), I need to use some globbing:

import glob

def generateFiles(topdir, year, month):
    """Construct per-year dicts of lists of files"""
    allfiles = {}
    kkey = ""
    patterns = []
    months = []
    # Since some of our data dirs have months as bare numbers and
    # others have a prepended 0, let's match them correctly.
    if month:
        if month < 10:
            months = [month, "0" + str(month)]
        else:
            months = [month]
    if year and month:
        patterns = ["{year}/{monthp}/**".format(year=year, monthp=monthp)
                    for monthp in months]
        kkey = year
    elif year:
        patterns = ["{year}/*/**".format(year=year)]
        kkey = year

    if patterns:
        globs = []
        for pat in patterns:
            globs.extend(glob.glob(os.path.join(topdir, pat)))
        allfiles[kkey] = globs
    else:
        for yy in allYears:
            allfiles[yy] = glob.glob(os.path.join(topdir,
                                                  "{yy}/*/*".format(yy=yy)))
    return allfiles

To load in each file, I turned to the tried-and-true Python standard module csv, and - rather than having a v1 and v2 processing function, I model DRY and use an input argument to determine which set of elements to match:

import csv
from datetime import datetime

def importCSV(fname, isOld):
    output = []
    if isOld:
        multiplier = 1000.0
    else:
        multiplier = 1.0

    csvreader = csv.reader(open(fname).readlines())
    for row in csvreader:
        try:
            if isOld:
                (tstamp, temp, _enow, _etoday, powergen, vdc,
                 current, energen, vac) = row
            else:
                (tstamp, temp, powergen, vdc, current, energen, vac) = row
        except ValueError as _ve:
            # print("failed at {row} of {fname}".format(row=row, fname=fname))
            continue

        if "e" in temp:
            # invalid line, skip it
            continue

        if isOld:
            isostamp = datetime.fromtimestamp(int(tstamp))
        else:
            isostamp = datetime.fromisoformat(tstamp)

        output.append({
            "timestamp": isostamp,
            "Temperature": float(temp),
            "PowerGenerated": float(powergen),
            "VoltageDC": float(vdc),
            "Current": float(current),
            "EnergyGenerated": float(energen) * multiplier,
            "VoltageAC": float(vac)})
    return output

Now we get to the Apache Spark part. Having got a dictionary of anonymous dicts I can turn them into an Resilient Distributed Dataset (RDD) and thence a DataFrame. I chose the DataFrame model rather than a Row because that matches up nicely with my existing data format. For other applications (such as when I extend my Twitter Sentiment Analysis project with the streaming API) I'll use the Row datatype instead.

def now():
    """ Returns an ISO8601-formatted (without microseconds) timestamp"""
    return datetime.now().strftime("%Y-%M-%dT%H:%m:%S")

allFiles = generateFiles("data", qyear, qmonth)

print(now(), "Importing data files")

for k in allFiles:
    rddyear = []
    for fn in allFiles[k]:
        if fn.endswith(".csv"):
            rddyear.extend(importCSV(fn, True))
        else:
            rddyear.extend(importCSV(fn, False))
    rdds[k] = rddyear

print(now(), "All data files imported")

for year in allYears:
    rdd = sc.parallelize(rdds[year])
    allFrames[year] = rdd.toDF()
    newFrame = "new" + str(year)
    # Extend the schema for our convenience
    allFrames[newFrame] = allFrames[year].withColumn(
        "DateOnly", date_format('timestamp', "yyyyMMdd")
    ).withColumn("TimeOnly", date_format('timestamp', "HHmmss"))
    allFrames[newFrame].createOrReplaceTempView("view{year}".format(
        year=year))

print(now(), "Data transformed from RDDs into DataFrames")

The reason I chose to extend the frames with two extra columns is because when I search for the record dates (minimum and maximum), I want to have a quick SELECT which I can aggregate on.

ymdquery = "SELECT DISTINCT DateOnly from {view} WHERE DateOnly "
ymdquery += "LIKE '{yyyymm}%' ORDER BY DateOnly ASC"

for year in allYears:
    for mon in allMonths:
        if mon < 10:
            yyyymm = str(year) + "0" + str(mon)
        else:
            yyyymm = str(year) + str(mon)

        _dates = spark.sql(ymdquery.format(
            view=view, yyyymm=yyyymm)).collect()
        days = [n.asDict()["DateOnly"] for n in _dates]

        _monthMax = frame.filter(
            frame.DateOnly.isin(days)).agg(
                {"EnergyGenerated": "max"}).collect()[0]
        monthMax = _monthMax.asDict()["max(EnergyGenerated)"]

I keep track of each day's maximum, and update my minval and minDay as required. All this information is then stored in a per-month dict, and then in a per-year dict.

The last stage is to print out the record dates, monthly and yearly totals, averages and other values.

Running this utility on my 4-core Ubuntu system at home, I get what I believe are ok timings for whole-year investigations, and reasonable timings if I check a specific month.

When I run the utility for January 2018, the output looks like this:

(v-3.7-linux) flerken:solar-spark $ SPARK_LOCAL_IP=0.0.0.0 time -f "%E"  spark-submit --executor-memory 2G --driver-memory 2G solar-spark.py  -y 2018 -m 1
19/10/15 12:23:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark default log4j profile: org/apache/spark/log4j-defaults.properties

[Most INFO-level output elided]

19/10/15 12:23:58 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/10/15 12:23:58 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://0.0.0.0:4040

2019-23-15T12:10:59 Importing data files
2019-23-15T12:10:59 All data files imported
/space/jmcp/web/v-3.7-linux/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/session.py:366: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
2019-24-15T12:10:01 Data transformed from RDDs into DataFrames
2019-24-15T12:10:01 Analysing 2018
2019-24-15T12:10:01          January
2019-24-15T12:10:15 All data analysed
2019-24-15T12:10:15 2018 total generation: 436130.00 KW/h
2019-24-15T12:10:15         January total:               436130.00 KW/h
2019-24-15T12:10:15         Record dates for January:    Max on 20180131 (16780.00 KW/h), Min on 20180102 (10560.00 KW/h)
2019-24-15T12:10:15         Average daily generation  14068.71 KW/h
2019-24-15T12:10:15 ----------------

0:18.76

While that processing is going on, you can see a dashboard with useful information about the application at http://localhost:4040:

ExecutorsJobsDetails of a query

You can find the code for this project in my GitHub repo solar-spark.




Microservices (part 2)

One principle that I work on is that I should always extend the fix (learnt via the Kepner-Tregoe Analytical Troubleshooting training many years ago). Following my investigation of how to provide a more accessible method of determining your electorate, I came back to the political polling ideas and got to thinking about how we can track the temperature of a conversation in, for example #auspol.

The term for this is sentiment analysis and while the major cloud providers have their own implementations of this (Microsoft Azure Text Analytics, AWS Comprehend, Google Cloud Natural Language API) you can also use Python's nltk in the comfort of your own venv. It's cheaper, too!

A bit of searching lead me to @Chapagain's post which was very useful and got me started - thankyou

I decided that I really want to do something more real-time, and while I could have done more scraping with Beautiful Soup, a quick look at the html that's returned with you run

import requests

url = "https://twitter.com/search?q=%23auspol&src=typed_query&f=live"
result = requests.get(url)
print(result.text)

is eye-wateringly complex. (Go on, try it!) I just couldn't be bothered with that so I signed up for a Twitter developer account and started looking at the APIs available for searching. These are easily used with the Twython library:

from twython import Twython

twitter = Tython(consumer_key, consumer_secret,
                 access_token, access_token_secret)
hashtag = "#auspol"
results = twitter.search(q=hashtag, result_type="recent")
for tweet in results["statuses"]:
    sentiment = classifier_func(tweet["text"])
    print(sentiment.prob("pos"))

I hit the rate limit a few times until I realiased that there was a while true going on inside Twython when using the cursor method. In my print-to-shell proof of concept, I got past that by using the search function inside a while loop with a 30second sleep call. I knew that that wasn't good enough for a web app, and would actually be a road block for doing a properly updated graph-focused page.

For that I would need a charting library, and some JavaScript. I started out using Chart.js, but quickly realised that it didn't have any sort of flow, so then I retooled to use C3js instead.

The initial render of the template provides the first set of data, which is a JavaScript array ([]), and checks for a saved hashtag and the id of the most recently found tweet using Window.sessionStorage(). Then we set up a function to get new data when called:

function getNewData() {
    var xhr = new XMLHttpRequest();
    xhr.open("GET", "/sentiment?hashtag="+hashtag+"&lastid="+lastid, true);
    xhr.onload = function (e) {
        if ((xhr.readyState === 4) && (xhr.status === 200)) {
            parsed = JSON.parse(xhr.responseText);
            sessionStorage.setItem("lastid", parsed["lastid"]);
            lastid = parsed["lastid"];
            labels = parsed["labels"];
            // Did we get new data points?
            if (parsed["chartdata"].length > 1) {
                // yes
                newDataCol = [ parsed["chartdata"] ];
                curidx += parsed["chartdata"].length - 1;
            } else {
                newDataCol = [];
            }
        }
    };
    xhr.onerror = function (e) {
        console.error(xhr.statusText);
    };
    xhr.send(null);
};

Finally, we need to tell the window to call our updateChart() function every 30 seconds, and define that function:

function updateChart() {
    getNewData();
    if (newDataCol !== []) {
        chart.flow({
            columns: prevDataCol,
            done: function () {
               chart.flow({
                   columns: newDataCol,
                   line: { connectNull: true }
               })
            }
        });
        prevDataCol = newDataCol;
    }
};

/* Update the chart every 30 seconds */
window.setInterval(updateChart, 30000);

So that you can change the hashtag to watch, I added a small <form> element which POSTs the new hashtag to the /sentiment method on submit and then re-renders the template.


/images/2019/sentiment1.png




What I'm particularly happy with is that the JavaScript took me only a few hours last Saturday morning and was pretty straightforward to write.

You can find the code for this project in my GitHub repo au-pol-sentiment.