⁂ George Ho

Streaming Data with Tornado and WebSockets

A lot of data science and machine learning practice assumes a static dataset, maybe with some MLOps tooling for rerunning a model pipeline with the freshest version of the dataset.

Working with streaming data is an entirely different ball game, and it wasn’t clear to me what tools a data scientist might reach for when dealing with streaming data1.

I recently came across a pretty straightforward and robust solution: WebSockets and Tornado. Tornado is a Python web framework with strong support for asynchronous networking. WebSockets are a way for two processes (or apps) to communicate with each other (similar to HTTP requests with REST endpoints). Of course, Tornado has pretty good support for WebSockets as well.

In this blog post I’ll give a minimal example of using Tornado and WebSockets to handle streaming data. The toy example I have is one app (server.py) writing samples of a Bernoulli to a WebSocket, and another app (client.py) listening to the WebSocket and keeping track of the posterior distribution for a Beta-Binomial conjugate model. After walking through the code, I’ll discuss these tools, and why they’re good choices for working with streaming data.

For another tutorial on this same topic, you can check out proft’s blog post.

Server

""" Every 100ms, sample from a Bernoulli and write the value to a WebSocket. """

import random
import tornado.ioloop
import tornado.web
import tornado.websocket


class WebSocketServer(tornado.websocket.WebSocketHandler):
    """Simple WebSocket handler to serve clients."""

    # Note that `clients` is a class variable and `send_message` is a
    # classmethod.
    clients = set()

    def open(self):
        WebSocketServer.clients.add(self)

    def on_close(self):
        WebSocketServer.clients.remove(self)

    @classmethod
    def send_message(cls, message: str):
        print(f"Sending message {message} to {len(cls.clients)} client(s).")
        for client in cls.clients:
            client.write_message(message)


class RandomBernoulli:
    def __init__(self):
        self.p = 0.72
        print(f"True p = {self.p}")

    def sample(self):
        return int(random.uniform(0, 1) <= self.p)


def main():
    # Create a web app whose only endpoint is a WebSocket, and start the web
    # app on port 8888.
    app = tornado.web.Application(
        [(r"/websocket/", WebSocketServer)],
        websocket_ping_interval=10,
        websocket_ping_timeout=30,
    )
    app.listen(8888)

    # Create an event loop (what Tornado calls an IOLoop).
    io_loop = tornado.ioloop.IOLoop.current()

    # Before starting the event loop, instantiate a RandomBernoulli and
    # register a periodic callback to write a sampled value to the WebSocket
    # every 100ms.
    random_bernoulli = RandomBernoulli()
    periodic_callback = tornado.ioloop.PeriodicCallback(
        lambda: WebSocketServer.send_message(str(random_bernoulli.sample())), 100
    )
    periodic_callback.start()

    # Start the event loop.
    io_loop.start()


if __name__ == "__main__":
    main()

Client

""" Stream data from the WebSocket and update the Beta posterior parameters online. """

import tornado.ioloop
import tornado.websocket


class WebSocketClient:
    def __init__(self, io_loop):
        self.connection = None
        self.io_loop = io_loop
        self.num_successes = 0
        self.num_trials = 0

    def start(self):
        self.connect_and_read()

    def stop(self):
        self.io_loop.stop()

    def connect_and_read(self):
        print("Reading...")
        tornado.websocket.websocket_connect(
            url=f"ws://localhost:8888/websocket/",
            callback=self.maybe_retry_connection,
            on_message_callback=self.on_message,
            ping_interval=10,
            ping_timeout=30,
        )

    def maybe_retry_connection(self, future) -> None:
        try:
            self.connection = future.result()
        except:
            print("Could not reconnect, retrying in 3 seconds...")
            self.io_loop.call_later(3, self.connect_and_read)

    def on_message(self, message):
        if message is None:
            print("Disconnected, reconnecting...")
            self.connect_and_read()

        message = int(message)
        self.num_successes += message
        self.num_trials += 1

        alpha = 2 + self.num_successes
        beta = 2 + self.num_trials - self.num_successes
        mean = self.num_successes / self.num_trials
        print(f"α = {alpha}; β = {beta}; mean = {mean}")


def main():
    # Create an event loop (what Tornado calls an IOLoop).
    io_loop = tornado.ioloop.IOLoop.current()

    # Before starting the event loop, instantiate a WebSocketClient and add a
    # callback to the event loop to start it. This way the first thing the
    # event loop does is to start the client.
    client = WebSocketClient(io_loop)
    io_loop.add_callback(client.start)

    # Start the event loop.
    io_loop.start()


if __name__ == "__main__":
    main()

Why Tornado?

Tornado is a Python web framework, but unlike the more popular Python web frameworks like Flask or Django, it has strong support for asynchronous networking and non-blocking calls — essentially, Tornado apps have one (single-threaded) event loop (tornado.ioloop.IOLoop), which handles all requests asynchronously, dispatching incoming requests to the relevant non-blocking function as the request comes in. As far as I know, Tornado is the only Python web framework that does this.

As an aside, Tornado seems to be more popular in finance, where streaming real-time data (e.g. market data) is very common.

Why WebSockets?

A sharper question might be, why WebSockets over HTTP requests to a REST endpoint? After all, both theoretically allow a client to stream data in real-time from a server.

A lot can be said when comparing WebSockets and RESTful services, but I think the main points are accurately summarized by Kumar Chandrakant on Baeldung:

[A] WebSocket is more suitable for cases where a push-based and real-time communication defines the requirement more appropriately. Additionally, WebSocket works well for scenarios where a message needs to be pushed to multiple clients simultaneously. These are the cases where client and server communication over RESTful services will find it difficult if not prohibitive.

Tangentially, there’s one alternative that seems to be better than WebSockets from a protocol standpoint, but unfortunately doesn’t seem to have support from many Python web frameworks, and that is Server-Sent Events (a.k.a. SSE): it seems to be a cleaner protocol for unidirectional data flow, which is really all that we need.

Additionally, Armin Ronacher has a much starker view of WebSockets, seeing no value in using WebSockets over TCP/IP sockets for this application:

Websockets make you sad. […] Websockets are complex, way more complex than I anticipated. I can understand that they work that way but I definitely don’t see a value in using websockets instead of regular TCP connections if all you want is to exchange data between different endpoints and neither is a browser.

My thought after reading these criticisms is that perhaps WebSockets aren’t the ideal technology for handling streaming data (from a maintainability or architectural point of view), but that doesn’t mean that they aren’t good scalable technologies when they do work.


  1. There is technically a difference between “real-time” and “streaming”: “real-time” refers to data that comes in as it is created, whereas “streaming” refers to a system that processes data continuously. You stream your TV show from Netflix, but since the show was created long before you watched it, you aren’t viewing it in real-time. ↩︎

#Python #Streaming