A lot of data science and machine learning practice assumes a static dataset, maybe with some MLOps tooling for rerunning a model pipeline with the freshest version of the dataset.
Working with streaming data is an entirely different ball game, and it wasn’t clear to me what tools a data scientist might reach for when dealing with streaming data1.
I recently came across a pretty straightforward and robust solution: WebSockets and Tornado. Tornado is a Python web framework with strong support for asynchronous networking. WebSockets are a way for two processes (or apps) to communicate with each other (similar to HTTP requests with REST endpoints). Of course, Tornado has pretty good support for WebSockets as well.
In this blog post I’ll give a minimal example of using Tornado and WebSockets
to handle streaming data. The toy example I have is one app (
writing samples of a Bernoulli to a WebSocket, and another app (
listening to the WebSocket and keeping track of the posterior distribution for
a Beta-Binomial conjugate model.
After walking through the code, I’ll discuss these tools, and why they’re good
choices for working with streaming data.
For another tutorial on this same topic, you can check out
WebSocketServeris registered to a REST endpoint (in
main), it keeps track of any processes who are listening to that endpoint, and pushes messages to them when
- Note that
clientsis a class variable, so
send_messageis a class method.
- This class could be extended to also listen to the endpoint, instead of just blindly pushing messages out - after all, WebSockets allow for bidirectional data flow.
- Note that
PeriodicCallbackmake a pretty crude example, but you could write a class that transmits data in real-time to suit your use case. For example, you could watch a file for any modifications using
watchdog, and dump the changes into the WebSocket.
tornado.Applicationconfigure periodic pings of WebSocket connections, keeping connections alive and allowing dropped connections to be detected and closed.
- It’s also worth noting that there’s a
tornado.websocket.WebSocketHandler.websocket_max_message_sizeattribute. While this is set to a generous 10 MiB, it’s important that the WebSocket messages don’t exceed this limit!
WebSocketClientis a class that:
- Can be
stopped to connect/disconnect to the WebSocket and start/stop listening to it in a separate thread
- Can process every message (
on_message) it hears from the WebSocket: in this case it simply maintains a count of the number of trials and successes, but this processing could theoretically be anything. For example, you could do some further processing of the message and then dump that into a separate WebSocket for other apps (or even users!) to subscribe to.
- Can be
- To connect to the WebSocket, we need to use a WebSocket library: thankfully
Tornado has a built-in WebSocket functionality (
tornado.websocket), but we’re also free to use other libraries such as the creatively named
- Note that we run
on_messageon the same thread as we run
connect_and_read. This isn’t a problem so long as
on_messageis fast enough, but a potentially wiser choice would be to offload
connect_and_readto a separate thread by instantiating a
tornado.ioloop.IOLoop.run_in_executor, so as not to block the thread where the
main(as well as in
server.py) is important: it’s how Tornado schedules tasks (a.k.a. callbacks) for delayed (a.k.a. asynchronous) execution. To add a callback, we simply call
websocket_connectconfigure periodic pings of the WebSocket connection, keeping connections alive and allowing dropped connections to be detected and closed.
callback=self.maybe_retry_connectionis run on a future
websocket_connectdoesn’t actually establish the connection directly, but rather returns a future. Hence, we try to get the
future.result()itself (i.e. the WebSocket client connection) — I don’t actually do anything with the
self.connection, but you could if you wanted. In the event of an exception while doing that, we assume there’s a problem with the WebSocket connection and retry
connect_and_readafter 3 seconds. This all has the effect of recovering gracefully if the WebSocket is dropped or
server.pyexperiences a brief outage for whatever reason (both of which are probably inevitable for long-running apps using WebSockets).
Tornado is a Python web framework, but unlike the more popular Python web
frameworks like Flask or
Django, it has strong support for
asynchronous networking and non-blocking
essentially, Tornado apps have one (single-threaded) event loop
tornado.ioloop.IOLoop), which handles all requests asynchronously,
dispatching incoming requests to the relevant non-blocking function as the
request comes in. As far as I know, Tornado is the only Python web framework
that does this.
As an aside, Tornado seems to be more popular in finance, where streaming real-time data (e.g. market data) is very common.
A sharper question might be, why WebSockets over HTTP requests to a REST endpoint? After all, both theoretically allow a client to stream data in real-time from a server.
[A] WebSocket is more suitable for cases where a push-based and real-time communication defines the requirement more appropriately. Additionally, WebSocket works well for scenarios where a message needs to be pushed to multiple clients simultaneously. These are the cases where client and server communication over RESTful services will find it difficult if not prohibitive.
Tangentially, there’s one alternative that seems to be better than WebSockets from a protocol standpoint, but unfortunately doesn’t seem to have support from many Python web frameworks, and that is Server-Sent Events (a.k.a. SSE): it seems to be a cleaner protocol for unidirectional data flow, which is really all that we need.
Additionally, Armin Ronacher has a much starker view of WebSockets, seeing no value in using WebSockets over TCP/IP sockets for this application:
Websockets make you sad. […] Websockets are complex, way more complex than I anticipated. I can understand that they work that way but I definitely don’t see a value in using websockets instead of regular TCP connections if all you want is to exchange data between different endpoints and neither is a browser.
My thought after reading these criticisms is that perhaps WebSockets aren’t the ideal technology for handling streaming data (from a maintainability or architectural point of view), but that doesn’t mean that they aren’t good scalable technologies when they do work.
There is technically a difference between “real-time” and “streaming”: “real-time” refers to data that comes in as it is created, whereas “streaming” refers to a system that processes data continuously. You stream your TV show from Netflix, but since the show was created long before you watched it, you aren’t viewing it in real-time. ↩