Some Pythonic Kafka stuff

I've been actively interviewing over the last few months, and was recently talking with a cloud streaming provider about a Staff Engineer SRE role specializing in Apache Kafka. I've been doing a lot of work in that space for $employer and quite like it as a data streaming solution.

The interviews went well, and they sent me a do-at-home challenge with a one week timeout.

The gist of it was I had to create a Flask app which allowed the user to enter a URL to monitor for status on a given frequency, use Python to write a Kafka Producer to publish this data to a topic, and write a Kafka Consumer to read from the topic and insert into a PostgreSQL database.

Fun!

I did a brief investigation into using threads within a Flask app for the monitoring code, but quickly decided that a better architecture would be to do the monitoring via a separate daemon. Separation of concerns to allow for easier maintenance. Suddenly I'm all about ongoing maintenance rather then how quickly I can deliver a new feature... Hmmm.

The next step was to sketch out the table schema I wanted in the database:



CREATE SEQUENCE public.urltargets_seq
            INCREMENT BY 1
            MINVALUE 1
            MAXVALUE 9223372036854775807
            START 1
            CACHE 1
            NO CYCLE;

    CREATE SEQUENCE public.monitor_results_seq
            INCREMENT BY 1
            MINVALUE 1
            MAXVALUE 9223372036854775807
            START 1
            CACHE 1
            NO CYCLE;

    CREATE TABLE IF NOT EXISTS urltargets (
            urltargets_pk           int4 NOT NULL DEFAULT nextval('urltargets_seq'::regclass),
            urltarget                       varchar(1024) NOT NULL,
            monitor_frequency       int NOT NULL CHECK (monitor_frequency in (1, 2, 5, 10, 20, 30)),
            CONSTRAINT urltargets_pkey PRIMARY KEY (urltargets_pk)
    );

    CREATE TABLE IF NOT EXISTS monitor_results (
            monitor_results_pk      int4 NOT NULL DEFAULT nextval('monitor_results_seq'::regclass),
            http_status                     int NOT NULL,
            start_time                      timestamp with time zone NOT NULL,
            duration            int4 NOT NULL,
            urltarget_fk            int4 NOT NULL,
            CONSTRAINT monitor_results_fk_fkey FOREIGN KEY (urltarget_fk) REFERENCES urltargets(urltargets_pk)
    );


Having decided that I would offer monitoring frequencies of 1, 2, 5, 10, 20 and 30 minutes, I created views for the Flask app to use as well, rather than direct queries. They all look like this, with other values substituted in as you would expect.

CREATE OR REPLACE VIEW view_1m as
            SELECT mr.monitor_results_pk
                    ,  ut.urltarget
                    ,  mr.http_status
                    ,  mr.start_time
                    ,  mr.duration
            FROM monitor_results mr
            JOIN urltargets ut on ut.urltargets_pk = mr.urltarget_fk
            WHERE ut.monitor_frequency = 1
    ;



Since I really like seeing schemata visualised, I created a nice(ish) ERD as well:

/images/2021/DB-ERD.png

Well that was straight forward, how about the application and daemon?


I split out the setup functionality into a separate file importable by the Flask app, the monitor daemon and the consumer. This contained the database connection, Kafka Producer and Kafka Consumer code. There's an interesting little niggle in the Kafka Producer setup which is not immediately obvious and required a bit of digging in StackOverflow as well as enabling debug output with librdkafka:

  def _get_kafka_configuration():
      """
      Common function to retrieve the Kafka configuration.
      """
      global appconfig
      configuration = {
          "bootstrap.servers": appconfig["kafka"]["broker"],
          "group.id": "website-monitor",
          "ssl.key.location": appconfig["kafka"]["keyfile"],
          "ssl.certificate.location": appconfig["kafka"]["certfile"],
          "ssl.ca.location": appconfig["kafka"]["cafile"],
          "security.protocol": "SSL",
          # "debug": "eos, broker, admin",  # re-enable if needed
          'transaction.timeout.ms': 60000,
          'enable.idempotence': True
      }
      return configuration
  def setup_kafka_producer(view):
      """
      Creates the connection to our Kafka brokers so we can publish
      messages to the topics we want. We take the {view} argument so
      that we can avoid blatting multiple producers together and getting
      errors from the broker about zombies and fencing. See
      https://stackoverflow.com/questions/50335227/how-to-pick-a-kafka-transaction-id
      for more details
      Return: a Producer
      """
      configuration = _get_kafka_configuration()
      configuration["transactional.id"] = "website-monitor" + str(view)
      kafkaProducer = Producer(configuration)
      try:
          kafkaProducer.init_transactions()
      except KafkaError as ke:
          # If we can't do this, then we have to quit
          print(f"""Producer failed to init_transactions(), throwing {ke}""")
          raise
      return kafkaProducer


When I was working with the daemon, my first iteration tried opening the DB connection and Producer in each thread's (one for each frequency) __init__() function, and .... that didn't work.

The DB connection is not picklable, so does _not_ survive the call to os.fork(). Once I had rewritten the setup and run methods to get the DB connection, that part was groovy.

The Kafka Producer still required a bit of work. After reading through stackoverflow and the upstream for librdkafka, I saw that I needed to similarly delay initialising the producer until the thread's run() method was called. I also observed that each Producer should also initialise the transaction feature, but leave the begin... end of the transaction to when it was called to publish a message.

I still had a problem, though - some transactions would get through, but then the Producer would be fenced. This was the niggle, and where the StackOverflow comments helped me out:

Finally, in distributed environments, applications will crash or —worse!— temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost. Through this process, we may have multiple instances processing the same input topics and writing to the same output topics, causing duplicate outputs and violating the exactly once processing semantics.

We call this the problem of “zombie instances.” [emphasis added]


I realised that I was giving the same transactional id to each of the six producer instances (setting the 'transactional.id' in the configuration dict generated by _get_kafka_configuration(), so I needed to uniqify them somehow. I decided to pass the monitoring frequency of the thread to the setup function, and ... booyah, I had messages being published.

That was a really nice feeling.


There is one other aspect of the monitoring daemon that I need to mention. Since each thread reads its list of URLs to monitor each time it wakes, I wanted to parallelize this effort. Monitoring each of the URLs in series could easily take too long from a sleep(...) point of view, and I really did not want to just fork a whole heap of processes and thread either - avoiding the potential for a fork-bomb.

To work around this I used the Python standard library concurrent.futures with a ThreadPoolExecutor for each target URL. Adding attributes to the future object enabled me to use an add_done_callback so that when the future crystallized it would then publish the message.


  def run(self):
      """
      Runs the monitor, updates account-keeping and kicks off
      notifications if required. Then back to sleep.
      """
      self._producer = setup_kafka_producer(self._view)
      self._conn = setup_db()
      self._cursor = self._conn.cursor()
      while True:
          alltargets = self._get_targets()
          if alltargets:
              # We use a 'with' statement to ensure threads in the pool
              # are cleaned up promptly
              with cf.ThreadPoolExecutor(max_workers=50) as executor:
                  for tgt in alltargets:
                      future = executor.submit(check_url,
                                               tgt[0], tgt[1])
                      future.args = (tgt[0], tgt[1])
                      future.producer = self._producer
                      future.add_done_callback(construct_and_publish)
                      self._futures.append(future)
                  for future in cf.as_completed(self._futures):
                      if future.done():
                          self._futures.remove(future)
          sleep(self._sleeptime)

The check and publish methods are outside of the thread definition:


  def construct_and_publish(input):
      """
      Callback function for the concurrent.future that each thread
      makes use of to query a website. Turns the future's attributes
      into a message for 'url-to-monitor' topic, then publishes that
      message to the topic.
      """
      if input.cancelled() or input.exception():
          errmsg = """Monitor attempt for {args} failed"""
          print(errmsg.format(args=input.args,
                              file=stderr, flush=True))
      else:
          message = json.dumps(dict(zip(msgFields, input.result())))
          input.producer.begin_transaction()
          publish_message(producer=input.producer,
                          topic="url-monitor-results",
                          message=message.encode('utf-8'))
          input.producer.commit_transaction()
  def check_url(url, fk):
      """
      Performs an 'HTTP GET' of the supplied url and returns a tuple containing
      (fk, start_time, duration, http_status).
      The start_time is expressed in milliseconds since the UNIX Epoch.
      """
      start_time = datetime.timestamp(datetime.now())
      result = requests.get(url)
      duration = datetime.timestamp(datetime.now()) - start_time
      return (fk, start_time, duration, result.status_code)

With the monitoring daemon written, I now needed the other end of the pipe - writing the Kafka Consumer to read from the topic and insert into the database. This was straightforward: we're polling for messages on both configured topics, when we read one we write it to the appropriate DB table using a prepared statement, commit and then do it all again with a while loop.


  urlToMonitorStmt = "INSERT INTO urltargets (urltarget, monitor_frequency "
  urlToMonitorStmt += "VALUES (%(urltarget)s, %(monitor_frequency)s)"
  urlMonitorResultsStmt = "INSERT INTO monitor_results (http_status, "
  urlMonitorResultsStmt += "urltarget_fk, start_time, duration) "
  urlMonitorResultsStmt += "VALUES (%(http_status)s, %(targetId)s, "
  urlMonitorResultsStmt += "to_timestamp(%(start_time)s), %(duration)s)"
  lookups = {
      "url-to-monitor": urlToMonitorStmt,
      "url-monitor-results": urlMonitorResultsStmt
  }
  if __name__ == "__main__":
      consumer = setup_kafka_consumer()
      connection = setup_db()
      while True:
          with connection.cursor() as curs:
              msglist = consumer.consume(200)
              for msg in msglist:
                  if not msg:
                      continue
                  elif msg.error():
                      print("Received error during poll: {error}".format(
                          error=msg.error()))
                  else:
                      stmt = lookups[msg.topic()]
                      values = json.loads(msg.value().decode('utf-8'))
                      curs.execute(stmt, values)
              curs.execute("COMMIT")
      connection.close()

Of course there should be error handling for the execute(). There should also be packaging and tests and templates for the report. Do not @ me, etc etc.

The reason why all these pieces are missing is because the day before I was due to hand this assignment in to my interviewer, I received a very, very nice offer from another company that I'd also been interviewing with - and I accepted it.