Streaming Graphs

This section assumes you’ve completed the Tutorial and have a good working knowledge of Graphcat.

As you’ve seen so far, computational graphs capture the dependencies in a process so that individual tasks can be executed only when they’re needed. Static graphs identify those tasks based solely on the topology of the graph. Dynamic graphs improve on this by detecting which task outputs are actually used, so that tasks with unused outputs can be skipped. The section on avoiding unnecessary updates describes how Graphcat takes this a step further by ensuring that irrelevant upstream changes don’t trigger downstream execution.

In this section, we introduce streaming graphs, which make it possible to retrieve a subset of a task’s total output. This can be useful in several ways: first, you might want to improve interactivity by reducing the work performed when there are minor upstream changes. An example of this might be a paint program where only the region under a brush stroke is updated, instead of an entire image. Another reason to use streaming is when you have a problem so large that it can’t be loaded into memory all at once: you can split the problem into smaller pieces and work on them one at a time, only accumulating the final result. Finally, you can use streaming to implement parallelism, using multiple threads or processes to each work on one piece of a problem.

But before we can get to all that, let’s begin with a simple streaming graph that computes the squares of values in a Numpy array:

[1]:
import logging
logging.basicConfig(level=logging.DEBUG)

import graphcat.notebook
import numpy

def squared(graph, name, inputs, extent):
    return numpy.power(inputs.getone(None, extent), 2)

graph = graphcat.StreamingGraph()
logger = graphcat.Logger(graph)
graph.set_task("A", graphcat.array(numpy.arange(8)))
graph.set_task("B", squared)
graph.set_links("A", "B")

graphcat.notebook.display(graph)
../_images/user-guide_streaming-graphs_1_0.svg

Note

Since this is a streaming graph, the arrows are drawn with partial arrow heads to suggest that the tasks can retrieve partial data.

Note that we used graphcat.array to provide the source data instead of graphcat.constant. Although the latter can certainly be used to provide a Numpy array as an output, the former provides additional functionality when used with streaming graphs.

Our squared task function simply uses numpy to compute the square of its inputs.

Let’s confirm that the default behavior of the graph is what we would normally expect:

[2]:
graph.output("B")
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {None} Extent: None
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {} Extent: None
INFO:graphcat.common:Task A finished. Output: [0 1 2 3 4 5 6 7]
INFO:graphcat.common:Task B finished. Output: [ 0  1  4  9 16 25 36 49]
[2]:
array([ 0,  1,  4,  9, 16, 25, 36, 49])

That looks pretty familiar - when requested, task “A” returns the array we used to initialize it, and task “B” computes the square of each value from “A”. We can see some new output in the logs, but otherwise everything is working the way it would with static and dynamic graphs.

What if the array was much larger, or task “B” was something truly time consuming, so that we wanted to perform an operation on just a subset? This is what streaming graphs were meant for. To do so, they allow us to specify “extents”, which define the subset of a task’s output that we want:

[3]:
graph.output("B", extent=graphcat.ArrayExtent[0:4])
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {None} Extent: slice(0, 4, None)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {} Extent: slice(0, 4, None)
INFO:graphcat.common:Task A finished. Output: [0 1 2 3]
INFO:graphcat.common:Task B finished. Output: [0 1 4 9]
[3]:
array([0, 1, 4, 9])

There’s a lot to unpack here. First, we specified an extent when we called output. An extent (which can only be used in streaming graphs) is any Python object that can be used by a task to specify a subset of its output. In this case, we used graphcat.ArrayExtent, which can be thought of as a factory for creating extents that request a subset of an array using standard Numpy indexing. Here, we are requesting the first four values from the array (ArrayExtent[0:4]).

Next, the extent is passed to the task function, which in this case is our squared function. Looking back at the function source:

def squared(graph, name, inputs, extent):
    return numpy.power(inputs.getone(None, extent), 2)

… we see that the function takes a mandatory fourth argument - extent. This is an important point when working with streaming graphs: the task functions must be written to accept the “extent” argument along with the standard “graph”, “name”, and “inputs” arguments, and do something with the extent information.

In the case of the squared function, the extent is simply passed-along in the call to inputs.getone(None, extent) … in other words, squared is counting on the upstream task providing its input to know what to do with the extents object.

This is why we used graphcat.array to implement task “A”, because it knows how to use the extent returned by graphcat.ArrayExtent, using it to return a subset of the stored array.

That subset is returned by task “A”, and task “B” computes the squares as requested.

Note that, as with static and dynamic graphs, streaming graphs don’t execute tasks unless they have to. For example, if we ask for the same data again, using the same extent:

[4]:
graph.output("A", extent=graphcat.ArrayExtent[0:4])
INFO:graphcat.common:Task A updating.
[4]:
array([0, 1, 2, 3])

… we get the same result, but the tasks haven’t been executed again. If we use a different extent:

[5]:
graph.output("A", extent=None)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {} Extent: None
INFO:graphcat.common:Task A finished. Output: [0 1 2 3 4 5 6 7]
[5]:
array([0, 1, 2, 3, 4, 5, 6, 7])

… then the tasks are executed. Note from the above that when you use None as an extent, it means “return everything”. None is the default extent everywhere in the API that extents are used, so the default behavior of streaming graph methods is to behave exactly like static and dynamic graphs.

Note that ArrayExtent is just one example of an extent. You can use any Python object as an extent, so long as your task functions know how to make use of it. You could use a string containing an filepath or an SQL query as an extent, or a tuple containing a range of timestamps.

Furthermore, your task functions are free to modify or replace extents - for example, a task function that performs convolutional filtering on arrays would need to grow an incoming extent to avoid edge effects.