
Welcome!¶
Welcome to Graphcat … the lightweight, flexible Python library for managing computational graphs.
Suppose you have a workflow composed of tasks, and the tasks need to be completed in a specific order, and you never want to execute a task unless it’s really necessary: keeping track of which tasks need to be executed and when can become extremely complex as your workflow grows, branches, and merges:
Sample image processing workflow. Boxes are tasks to be performed, edges represent data flow / dependencies between tasks.
Graphcat is a tool that allows you to explicitly capture a workflow in a computational graph, managing the details of executing each task in the proper order and at the proper time, no matter the state of the graph or the complexity of the workflow. Graphcat doesn’t care what kind of data your graph manages, doesn’t dictate how you name the entities in the graph, provides advanced functionality like streaming and expression-based tasks, and is easy to learn, with features including the following:
- Tasks defined using standard Python functions or callables.
- No limitation on data structures / task outputs.
- Name tasks using any naming scheme you like.
- Support for advanced workflows including fan-in, fan-out, and loops.
- Built-in support for tasks based on Python expressions, with automatic dependency tracking.
Documentation¶

Installation¶
Graphcat¶
To install the latest stable version of Graphcat and its dependencies, use pip:
$ pip install graphcat
… once it completes, you’ll be able to use all of Graphcat’s core features.
Visualization¶
If you want to visualize Graphcat network diagrams like the ones seen elsewhere in this documentation, you’ll need to install Graphviz, which can’t be installed via pip. If you use Conda (which we strongly recommend), you can install it as follows:
$ conda install graphviz
Once you have Graphviz, you can install Graphcat with the necessary dependencies:
$ pip install graphcat[vis]
Documentation¶
We assume that you’ll normally access this documentation online, but if you want a local copy on your own computer, do the following:
First, you’ll need the pandoc universal document converter, which can’t be installed with pip … if you use Conda (again, strongly recommended), you can install it with the following:
$ conda install pandoc
Once you have pandoc, install Graphcat along with all of the dependencies needed to build the docs:
$ pip install graphcat[doc]
Next, do the following to download a tarball to the current directory containing all of the Graphcat source code, which includes the documentation:
$ pip download graphcat --no-binary=:all: --no-deps
Now, you can extract the tarball contents and build the documentation (adjust the following for the version you downloaded):
$ tar xzvf graphcat-1.0.4.tar.gz
$ cd graphcat-1.0.4/docs
$ make html

Tutorial¶
Imagine that you have a workflow made up of three tasks “A”, “B”, and “C”, and the tasks must always be perfomed in the right order, because task “C” depends on the output of task “A”, and also depends on the output of task “B”. Further, imagine that the individual tasks are time-consuming, so that you don’t want to execute a task unless it’s really necessary: if something has changed that only affects task “C”, and tasks “A” and “B” have already been completed, then you should only need to redo task “C”. Over time, keeping track of which tasks need to be executed can become extremely complex as your workflow grows, branches, and merges.
Graphcat is a tool that allows you to explicitly capture a workflow in a computational graph, managing the details of executing each task in the proper order and at the proper time, no matter the state of the tasks or the complexity of the workflow. Graphcat doesn’t care what kind of data your graph manages, doesn’t dictate how you name the entities in the graph, provides advanced functionality like expression-based tasks, and is easy to learn.
Intrigued? Let’s look at some code!
The Basics¶
First, we import graphcat
, which includes all of the functionality for managing computational graphs. If you’re using Graphcat in your scripts, this will likely be all you need. For this tutorial we also import graphcat.notebook
, so we can see the state of our graphs as we work.
[1]:
import graphcat
import graphcat.notebook
Next, let’s reproduce the example workflow from above, starting with an (initially empty) computational graph:
[2]:
graph = graphcat.StaticGraph()
Next, we will add tasks to the graph, identified using unique string names:
[3]:
graph.add_task("A")
graph.add_task("B")
graph.add_task("C")
Note that a task name can be any hashable object, not just a string - we used strings in this case because they map well to our particular problem.
Now, we can define the links that determine which tasks depend on previous tasks:
[4]:
graph.add_links(source="A", targets="C")
graph.add_links(source="B", targets="C")
There are two ways to think about links. One way is to picture data “flowing” through the links from the source tasks to the target tasks, which is why we sometimes call the sources “upstream” and the targets “downstream”. Alternatively, you can say that the target of a link “depends on” the source - anytime the source changes, the target needs to change, along with all of its targets, and-so-on. Both viewpoints are completely valid, and you will find that both are useful, depending on the context.
Finally, because a picture is worth \(1\times10^3\) words, let’s see what the graph looks like so far:
[5]:
graphcat.notebook.display(graph)
Notice that each task is drawn as a box, labelled with the task name, and the links are drawn as arrows that point from sources to targets, i.e. the arrows point in the direction of data flow.
Of course, all we’ve done so far is define how our tasks relate to one another - we haven’t actually executed any of them. Before we do so, let’s introduce some logging so we can see what Graphcat is doing under the hood. We’ll import the standard Python logging
module and configure it to log informational messages. Then, we create a special graphcat.Logger
object that will watch the computational graph and log events as they happen:
[6]:
import logging
logging.basicConfig(level=logging.INFO)
logger = graphcat.Logger(graph)
By default, newly-created tasks are considered unfinished, because they haven’t been executed yet. Let’s finish task “A” by updating it:
[7]:
graph.update("A")
graphcat.notebook.display(graph)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {}
INFO:graphcat.common:Task A finished. Output: None
The call to update
executes the unfinished task, which we see in the second line of logging; once the task has been executed, the third line in the log shows that its state is now finished (ignore the “Inputs: …” and “Output: …” text in the log, we will explain their meaning shortly). Note that in our visualization task “A” is now rendered with a black background to show that the task is finished.
Continuing on, let’s update task “C” and see what happens:
[8]:
graph.update("C")
graphcat.notebook.display(graph)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {}
INFO:graphcat.common:Task B finished. Output: None
INFO:graphcat.common:Task C updating.
INFO:graphcat.common:Task C executing. Inputs: {None: None, None: None}
INFO:graphcat.common:Task C finished. Output: None
Looking closely at the log, we see that Task “C” is executed, but only after Task “B”. Task “A” isn’t executed, because it was already finished before update
was called. Note that this conforms to our original goals for our workflow: tasks “A” and “B” must be completed before task “C”, and we never re-execute tasks that are already finished.
To reinforce this point, let’s look at what happens if a task becomes unfinished again. Imagine that some outside change has made the results of task “A” obsolete. We can notify Graphcat that this has happened using mark_unfinished
:
[9]:
graph.mark_unfinished("A")
graphcat.notebook.display(graph)
Notice that both “A” and “C” have become unfinished: because “A” is unfinished and “C” depends on “A”, “C” becomes unfinished too. “B” is unaffected because it doesn’t depend on “A”. Let’s update “C” again:
[10]:
graph.update("C")
graphcat.notebook.display(graph)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {}
INFO:graphcat.common:Task A finished. Output: None
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task C updating.
INFO:graphcat.common:Task C executing. Inputs: {None: None, None: None}
INFO:graphcat.common:Task C finished. Output: None
This time “C” is executed, but only after “A”. As expected, “B” isn’t executed because it was already finished.
Hopefully, we’ve convinced you that Graphcat always knows which tasks to execute, and in what order. This is true no matter how complex your computational graph becomes. In the next section, we will explore how to configure the graph to perform real work.
Task Functions¶
In the previous section, we learned how to represent our workflow using tasks and links, but the tasks themselves didn’t actually do anything when executed. To rectify this, we will assign task functions that define what a task does when executed. A task function is simply a Python function (technically: a Python callable) that is called when a task is executed, returning a value that is stored as the output for the task. When downstream tasks are executed, their task functions have access to the outputs from their upstream dependencies. Thus, upstream task function outputs become downstream task function inputs.
Let’s turn our current example into a simple calculator. Tasks “A” and “B” will have task functions that return numbers, and task “C” will return the sum of its inputs. First, we define the task functions for each task:
[11]:
def task_a(graph, name, inputs):
return 2
def task_b(graph, name, inputs):
return 3
def add(graph, name, inputs):
return sum([value() for value in inputs.values()])
Note that every task function must accept three keyword arguments: graph
, name
and inputs
. The graph
argument is the graph that this task is a part of; name
is the name of the task being executed, and is useful for logging or changing the function’s behavior based on the task’s identity; inputs
is an object that behaves like a Python dict and contains the outputs from upstream tasks.
Don’t worry too much about how add()
is implemented, we’ll discuss that in detail in a bit. Let’s assign our task functions to each task in the graph:
[12]:
graph.set_task("A", task_a)
graph.set_task("B", task_b)
graph.set_task("C", add)
graphcat.notebook.display(graph)
Notice that changing the task functions with set_task
also marks the tasks as unfinished. This is an example of how Graphcat always ensures that changes to the graph will propagate to its results. Let’s update the graph and see what happens:
[13]:
graph.update("C")
graphcat.notebook.display(graph)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {}
INFO:graphcat.common:Task A finished. Output: 2
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {}
INFO:graphcat.common:Task B finished. Output: 3
INFO:graphcat.common:Task C updating.
INFO:graphcat.common:Task C executing. Inputs: {None: 2, None: 3}
INFO:graphcat.common:Task C finished. Output: 5
Now, the full meaning of the log messages should be clearer - tasks “A” and “B” have no inputs when they execute, returning the values 2 and 3 respectively as their outputs. Those outputs become the inputs to “C” when it executes, where they are summed, so that the output of “C” is 5, as expected.
Of course, you normally want to retrieve the outputs from your graph so you can do something with them. So far, all we’ve seen are log messages. To retrieve the most recent output for a task, use output
instead of update
:
[14]:
print("Result:", graph.output("C"))
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task C updating.
Result: 5
Note that output
implicitly calls update
for you, so you can simply use it whenever you need to execute your graph and retrieve an output.
Now that our graph is performing a real (albeit trivial) task, let’s look at some ways to simplify setting it up:
First, it is extremely common for a graph to have “parameter” tasks that simply return a value, as tasks “A” and “B” do in our example. Having to create a separate function for every parameter would be perverse. Fortunately, Graphcat provides a helper function, graphcat.constant
, that you can use instead:
[15]:
graph.set_task("A", graphcat.constant(4))
graph.set_task("B", graphcat.constant(5))
print("Result:", graph.output("C"))
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {}
INFO:graphcat.common:Task A finished. Output: 4
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {}
INFO:graphcat.common:Task B finished. Output: 5
INFO:graphcat.common:Task C updating.
INFO:graphcat.common:Task C executing. Inputs: {None: 4, None: 5}
INFO:graphcat.common:Task C finished. Output: 9
Result: 9
graphcat.constant
is a factory for task functions that always return a value you provide, eliminating the need to create dedicated task functions of your own for parameters. Use graphcat.constant
with set_task
any time you need to change the parameters in your workflow, whether due to user input, changes in the environment, network traffic, or any other externality that affects your workflow outputs.
Next, you may wonder why we had to call both add_task
and set_task
just to create a working task. In fact, we didn’t - either method can create a task and assign its function in a single step:
[16]:
graph.set_task("D", graphcat.constant(6))
The difference between add_task
and set_task
is that the former will fail if a task with the given name already exists, while the latter will quietly overwrite it.
Let’s connect our newly created task “D” to “C”, and see that it integrates nicely with the rest of the computation:
[17]:
graph.set_links(source="D", targets="C")
print("Result:", graph.output("C"))
graphcat.notebook.display(graph)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task D updating.
INFO:graphcat.common:Task D executing. Inputs: {}
INFO:graphcat.common:Task D finished. Output: 6
INFO:graphcat.common:Task C updating.
INFO:graphcat.common:Task C executing. Inputs: {None: 4, None: 5, None: 6}
INFO:graphcat.common:Task C finished. Output: 15
Result: 15
Named Inputs¶
By now, you should have questions about the way inputs are passed to task functions. From the log message in the preceding example - {None: 4, None: 5, None: 6}
- it’s obvious that the results from “A”, “B”, and “D” are passed to “C” using something that looks like a dict, but what’s with the key None
, and why does it appear multiple times (something that can’t happen with an actual dict)?
What’s happening is that when you create a link between a source and a target, you also - implicitly or explicitly - specify a named input on the target. When the target task function is executed, the named inputs become the keys used to access the corresponding values. This makes it possible for task functions with multiple inputs to tell those inputs apart. If you don’t specify a named input when you create a link, the name defaults to None
.
Let’s look back at the implementation of the add()
function:
def add(graph, name, inputs):
return sum([value() for value in inputs.values()])
Here, the function doesn’t need to know the names of its inputs, since all it does is add them together. That is why it uses the values()
method of the inputs object - like a normal Python dict, values()
provides access to just the values, ignoring the input names. Note though, that unlike a Python dict, the objects returned by values()
aren’t the values themselves - they are callables that have to be executed to return the values - which is why the code is sum([value() ...
instead of sum([value ...
.
Let’s modify our current example to access inputs by name. Instead of adding values, we’ll create a new task function that generates a familiar greeting:
[18]:
def greeting(graph, name, inputs):
return f"{inputs.getone('greeting')}, {inputs.getone('subject')}!"
Note that the greeting()
task function uses two inputs named "greeting"
and "subject"
. Each call to inputs.getone(<name>)
will return the value of the named input. If there isn’t an input with the given name, or there’s more than one, the call will fail.
Now we can setup the parameter and greeting task functions for our existing graph:
[19]:
graph.set_task("A", graphcat.constant("Hello"))
graph.set_task("B", graphcat.constant("World"))
graph.set_task("C", greeting)
And we’ll replace our existing links with links that connect to the named inputs required by the greeting()
function (note that set_links
replaces all of the outgoing links for a given source, instead of add_links
, which adds new links):
[20]:
graph.set_links(source="A", targets=("C", "greeting"))
graph.set_links(source="B", targets=("C", "subject"))
… instead of passing just a task name as the target for set_links
, we pass a (task name, input name)
tuple instead. Like task names, input names don’t have to be strings - they can be any hashable object. Let’s see the result:
[21]:
print("Result:", graph.output("C"))
graphcat.notebook.display(graph)
INFO:graphcat.common:Task D updating.
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {}
INFO:graphcat.common:Task A finished. Output: Hello
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {}
INFO:graphcat.common:Task B finished. Output: World
INFO:graphcat.common:Task C updating.
INFO:graphcat.common:Task C executing. Inputs: {None: 6, greeting: Hello, subject: World}
INFO:graphcat.common:Task C finished. Output: Hello, World!
Result: Hello, World!
Note that the notebook diagram links are labelled when they’re connected to inputs with names other than None
.
Now, the input dict for “C” printed to the log should make more sense - it contains all of the named inputs and corresponding upstream outputs for the task. Note that task “D” is still connected to input None
, but it’s ignored by the greeting()
implementation.
It should also be clear now why a name can appear more than once in a task’s inputs: you can connect multiple tasks to a single input, one task to multiple inputs, or any combination of the two.
By examining the input object, a task function can implement any desired behavior, from very strict (failing unless the input contains a specific set of names, numbers, and types of values) to very permissive (adjusting functionality based on names, numbers, and types of values in the input dict), or anywhere in-between.
Errors¶
What happens when things go wrong and your task function fails? Let’s find out, using a special Graphcat helper function for generating task functions that throw exceptions:
[22]:
graph.set_task("D", graphcat.raise_exception(RuntimeError("Whoops!")))
(In case you’re wondering, we use this for testing and debugging)
[23]:
try:
print("Result:", graph.output("C"))
except Exception as e:
print(f"Exception: {e!r}")
graphcat.notebook.display(graph)
INFO:graphcat.common:Task D updating.
INFO:graphcat.common:Task D executing. Inputs: {}
ERROR:graphcat.common:Task D failed. Exception: Whoops!
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task C updating.
Exception: RuntimeError('Whoops!')
As always, Graphcat ensures that task states are always consistent - when a task functions fails (“D” in this case), execution stops, the task and its dependents are marked as being in the “error” state, and the update
or output
methods that initiated the update re-raise the exception. This will keep happening as long as the error condition persists:
[24]:
try:
print("Result:", graph.output("C"))
except Exception as e:
print(f"Exception: {e!r}")
graphcat.notebook.display(graph)
INFO:graphcat.common:Task D updating.
INFO:graphcat.common:Task D executing. Inputs: {}
ERROR:graphcat.common:Task D failed. Exception: Whoops!
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task C updating.
Exception: RuntimeError('Whoops!')
Once, the error is cleared-up, things will return to normal:
[25]:
graph.set_task("D", graphcat.constant(42))
print("Result:", graph.output("C"))
graphcat.notebook.display(graph)
INFO:graphcat.common:Task D updating.
INFO:graphcat.common:Task D executing. Inputs: {}
INFO:graphcat.common:Task D finished. Output: 42
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task C updating.
INFO:graphcat.common:Task C executing. Inputs: {None: 42, greeting: Hello, subject: World}
INFO:graphcat.common:Task C finished. Output: Hello, World!
Result: Hello, World!
User Guide¶
The User Guide includes detailed individual subjects covering how to use Graphcat effectively.
Avoiding Updates¶
Consider the following, trivial graph:
[1]:
import logging
logging.basicConfig(level=logging.DEBUG)
import graphcat.notebook
graph = graphcat.StaticGraph()
logger = graphcat.Logger(graph)
graph.add_task("A", graphcat.constant(3.14))
graph.add_task("B", graphcat.passthrough())
graph.add_links("A", "B")
graphcat.notebook.display(graph)
The graphcat.passthrough
task function simply copies one of its inputs to its output. When we request the output from “B”, both tasks are executed and the value from “A” is returned:
[2]:
print("Output:", graph.output("B"))
graphcat.notebook.display(graph)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {}
INFO:graphcat.common:Task A finished. Output: 3.14
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {None: 3.14}
INFO:graphcat.common:Task B finished. Output: 3.14
Output: 3.14
If we replace the task “A” function, both tasks become unfinished:
[3]:
graph.set_task("A", graphcat.constant(42))
graphcat.notebook.display(graph)
And retrieving the task “B” output executes both tasks again:
[4]:
print("Output:", graph.output("B"))
graphcat.notebook.display(graph)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {}
INFO:graphcat.common:Task A finished. Output: 42
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {None: 42}
INFO:graphcat.common:Task B finished. Output: 42
Output: 42
All this is as expected. But what if we were to re-assign the same task function returning 42
to “A”? What if task “B” takes a long time to execute? What if there are many expensive tasks downstream from “B”? You may be concerned (rightly so) that this would lead to lots of unnecessary computation, if downstream tasks have to be re-executed even when the upstream value hasn’t actually changed.
Not to worry, Graphcat has your back! Let’s try re-assigning the same task function to see what happens:
[5]:
graph.set_task("A", graphcat.constant(42))
graphcat.notebook.display(graph)
Notice that even though we assigned the new task function to “A”, both tasks are still marked as finished.
[6]:
print("Output:", graph.output("B"))
graphcat.notebook.display(graph)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task B updating.
Output: 42
Neither task is executed, and the returned value is the same. What’s going on here? How does Graphcat know?
In a nutshell, when setting the function for a task, Graphcat only marks the task as unfinished if the new task function compares unequal to the old function. The constant
function returns instances of a callable class - Constant
- which is what is executed by the graph. Normally, two instances of a Python object will always compare unequal, even when they’re the same type and contain the same data, but Constant
overloads the equality operator __eq__
so that two instances compare equal if their return values compare equal. Thus, multiple calls to set_task()
using constant
won’t trigger execution if their values are the same.
This behavior applies to all of the builtin task functions provided with Graphcat, and you can apply the same technique to your own task functions to suppress unnecessary updates. For example, you might create your own version of Constant
that only considers two floating point values unequal if they differ by more than some threshold. Then, you could feed continous incoming sensor data to the graph, with the graph only executing when the data changes enough to trigger it, using your own definition of what constitutes a significant change.
Cycles¶
Take a close look at the following static computational graph:
[1]:
import logging
logging.basicConfig(level=logging.DEBUG)
import graphcat.notebook
graph = graphcat.StaticGraph()
logger = graphcat.Logger(graph)
graph.add_task("A")
graph.add_task("B")
graph.add_links("A", "B")
graph.add_links("B", "A")
Do you see a problem? Let’s look at the diagram:
[2]:
graphcat.notebook.display(graph)
Notice the cycle in the graph - “B” depends on “A”, but “A” depends on “B”. This is a problem, because it means that both tasks depend on each other. This begs the question: what should happen when we execute either task, when it’s impossible to define which one should be executed first? You could imagine the computation getting stuck in an infinite loop, but as always this is Graphcat, and Graphcat always has your back. Let’s see what happens when we update one of the tasks - keep your hand close to the power switch:
[3]:
graph.update("B")
graphcat.notebook.display(graph)
INFO:graphcat.common:Task B cycle detected.
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {None: None}
INFO:graphcat.common:Task A finished. Output: None
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {None: None}
INFO:graphcat.common:Task B finished. Output: None
Well, that wasn’t very exciting. Notice the first log message, where Graphcat has detected a cycle originating from task “B” (the task we executed). Graphcat detects the cycle and “breaks” it, so that the network executes in the same order that it would have if there wasn’t an edge from “B” to “A”, and both tasks are finished.
Let’s see what happens when we reset one of the tasks:
[4]:
graph.mark_unfinished("A")
graphcat.notebook.display(graph)
Hmmm … as you might expect, Graphcat marked both tasks as unfinished, since they both depend on each other. Let’s try updating the other task:
[5]:
graph.update("A")
graphcat.notebook.display(graph)
INFO:graphcat.common:Task A cycle detected.
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {None: None}
INFO:graphcat.common:Task B finished. Output: None
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {None: None}
INFO:graphcat.common:Task A finished. Output: None
This time the cycle is detected, but originating from task “A” (since that’s the task we executed). Execution proceeds in the same order that it would have if there wasn’t an edge from “A” to “B”.
What happens when our graph is more complex than just one cycle? Let’s add another task and see:
[6]:
graph.mark_unfinished("A")
graph.add_task("C")
graph.add_links("B", "C")
graphcat.notebook.display(graph)
Now, when we execute “C”:
[7]:
graph.update("C")
graphcat.notebook.display(graph)
INFO:graphcat.common:Task B cycle detected.
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {None: None}
INFO:graphcat.common:Task A finished. Output: None
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {None: None}
INFO:graphcat.common:Task B finished. Output: None
INFO:graphcat.common:Task C updating.
INFO:graphcat.common:Task C executing. Inputs: {None: None}
INFO:graphcat.common:Task C finished. Output: None
… the cycle is detected as originating at “B”, because it’s the first task on the cycle that’s upstream from “C”. From there, the graph executes as if there’s no edge from “B” to “A”.
So far, we’ve only been looking at static graphs. What about dynamic? Let’s recreate the same example using a dynamic graph:
[8]:
graph = graphcat.DynamicGraph()
logger = graphcat.Logger(graph)
graph.add_task("A", graphcat.consume)
graph.add_task("B", graphcat.consume)
graph.add_task("C", graphcat.consume)
graph.add_links("A", "B")
graph.add_links("B", "A")
graph.add_links("B", "C")
graphcat.notebook.display(graph)
Note that we’ve assigned graphcat.consume
as our task functions because this is a dynamic graph - without it, none of the tasks would use any of their inputs, and we wouldn’t actually cause a cycle.
Now, let’s update “C” again and see what happens:
[9]:
graph.update("C")
graphcat.notebook.display(graph)
INFO:graphcat.common:Task C updating.
INFO:graphcat.common:Task C executing. Inputs: {None}
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {None}
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {None}
INFO:graphcat.common:Task B cycle detected.
INFO:graphcat.common:Task A finished. Output: None
INFO:graphcat.common:Task B finished. Output: None
INFO:graphcat.common:Task C finished. Output: None
Although the order of the logged messages is different because this is a dynamic graph, the cycle originating at “B” is detected, and the graph is executed as if the link from “B” to “A” doesn’t exist, just like the static version.
Final Thoughts
As you can see, Graphcat will never misbehave or hang due to a cycle in your graphs, and we’ve put in real effort to ensure that its behavior when cycles are encountered is consistent and well defined. This is to ensure that you can work confidently with Graphcat, never having to worry that an experiment or a misconfiguration will cause an infinite loop or a crash. Having said that, there is little reason to ever intentionally put cycles in a graph, because cycles are almost always the product of an ill-posed problem. When you have tasks with mutual dependencies, it typically means that you need to further decompose your problem into smaller sub-tasks, gradually teasing them apart until the interdependencies disappear.
If you want to enforce a “no cycles” policy, you can tap into the same signal mechanism used for logging to raise an exception or implement other behavior when a cycle occurs. For example:
[10]:
def no_cycles_allowed(graph, name):
raise RuntimeError(f"Illegal cycle detected originating with task {name}!")
graph.on_cycle.connect(no_cycles_allowed)
graph.mark_unfinished("A")
try:
graph.update("C")
except Exception as e:
print(f"Unhandled exception: {e}")
INFO:graphcat.common:Task C updating.
INFO:graphcat.common:Task C executing. Inputs: {None}
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {None}
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {None}
ERROR:graphcat.common:Task A failed. Exception: Illegal cycle detected originating with task B!
ERROR:graphcat.common:Task B failed. Exception: Illegal cycle detected originating with task B!
ERROR:graphcat.common:Task C failed. Exception: Illegal cycle detected originating with task B!
Unhandled exception: Illegal cycle detected originating with task B!
Dynamic Graphs¶
This section assumes you’ve completed the Tutorial and have a good working knowledge of Graphcat.
Graphcat always ensures that tasks are executed when they’re needed and in the correct order, but there can be more than way to define “when a task is needed”, and those definitions have different tradeoffs in time and space. Graphcat allows you to select among those tradeoffs by providing two main types of graph: static and dynamic. To illustrate the difference between the two, we will setup a simple static graph:
[1]:
import graphcat.notebook
graph = graphcat.StaticGraph()
graph.add_task("A", graphcat.constant("A"))
graph.add_task("B", graphcat.constant("B"))
graph.add_task("C", graphcat.passthrough(input=0))
graph.add_links("A", ("C", 0))
graph.add_links("B", ("C", 1))
graphcat.notebook.display(graph)
Note that there are two tasks “A” and “B” that produce the strings "A"
and "B"
respectively, connected to inputs 0
and 1
of task “C”. In turn, task “C” uses the graphcat.common.passthrough()
function to return whatever value it receives from input 0
:
[2]:
print("Output:", graph.output("C"))
graphcat.notebook.display(graph)
Output: A
… as expected, task “C” returns "A"
, and all three tasks are finished (solid boxes), because tasks “A” and “B” are inputs to task “C”. This is what we mean when we say that the graph is static: it performs a static analysis of the graph topology to determine which tasks are inputs to “C”, and then executes them first.
However, a static analysis can be suboptimal: in this case, even though task “B” is an input to “C”, it isn’t really needed, since “C” is only using the input named 0
. What if task “B” were time consuming? All of that effort is wasted if the output from “B” is never actually used by “C”.
Fortunately, we can use dynamic analysis to determine which tasks need to be executed during updates based on whether or not they actually get used. To do so, we simply use an instance of DynamicGraph
instead of StaticGraph
:
[3]:
graph = graphcat.DynamicGraph()
graph.add_task("A", graphcat.constant("A"))
graph.add_task("B", graphcat.constant("B"))
graph.add_task("C", graphcat.passthrough(input=0))
graph.add_links("A", ("C", 0))
graph.add_links("B", ("C", 1))
graphcat.notebook.display(graph)
Note
The arrow heads in the diagram are open instead of solid, which indicates that this is a DynamicGraph
.
Now, when we get the output from “C”, we should still get the same answer:
[4]:
print("Output:", graph.output("C"))
graphcat.notebook.display(graph)
Output: A
… which we do, but task “B” hasn’t been executed, because it isn’t needed in this case. If we alter task “C” so that it uses its other input, we should see that task “B” gets executed:
[5]:
graph.set_task("C", graphcat.passthrough(input=1))
print("Output:", graph.output("C"))
graphcat.notebook.display(graph)
Output: B
Voila! We see that task “B” is executed, because its output is now used by task “C”.
Given that dynamic graphs are potentially more efficient by eliminating unneeded computation, why have static graphs at all? This is because dynamic graphs use more resources to run, and may exceed the Python interpreter stack limits for large, complex graphs. As of this writing, Python has a default stack recursion limit of 1000, which means that updating a dynamic graph will fail as the number of tasks upstream from the task being updated nears a thousand. In those cases you can increase the recursion limit using sys.setrecursionlimit()
, or switch back to static graphs, which may waste some computation, but will never run out of stack space.
Expression Tasks¶
As your computational graphs grow in complexity, you may find that creating individual task functions for simple operations becomes onerous. These operations are typically “computational duct tape” … one-offs and special cases like type-conversion, arithmetic, and string formatting that don’t lend themselves to dedicated task functions.
To address this, Graphcat includes special support for expression tasks, which are tasks that execute a Python expression. Expression tasks are also incredibly useful if you’re using Graphcat in a GUI application and your end-users aren’t software developers - using expression tasks, they can supplement the domain specific functionality you provide with their own custom tasks.
Let’s start with a simple example:
[1]:
import logging
logging.basicConfig(level=logging.DEBUG)
import graphcat.notebook
graph = graphcat.StaticGraph()
logger = graphcat.Logger(graph)
graph.set_expression("expr", "3 + 4")
graphcat.notebook.display(graph)
Here, we’ve created a single task that will return the value of the expression 3 + 4
:
[2]:
print("Output:", graph.output("expr"))
graphcat.notebook.display(graph)
INFO:graphcat.common:Task expr updating.
INFO:graphcat.common:Task expr executing. Inputs: {}
INFO:graphcat.common:Task expr finished. Output: 7
Output: 7
If we change the expression, the task becomes unfinished until the next time it’s executed, just as you’d normally expect:
[3]:
graph.set_expression("expr", "'Hello' + ' ' + 'World!'")
graphcat.notebook.display(graph)
[4]:
print("Output:", graph.output("expr"))
INFO:graphcat.common:Task expr updating.
INFO:graphcat.common:Task expr executing. Inputs: {}
INFO:graphcat.common:Task expr finished. Output: Hello World!
Output: Hello World!
Of course, an expression task wouldn’t be of much use if it didn’t have inputs … let’s create an expression task that converts its input to an integer:
[5]:
import math
graph.set_expression("expr", "int(inputs.getone(None))")
graph.set_task("A", graphcat.constant(math.pi))
graph.set_links("A", "expr")
print("Output:", graph.output("expr"))
graphcat.notebook.display(graph)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {}
INFO:graphcat.common:Task A finished. Output: 3.141592653589793
INFO:graphcat.common:Task expr updating.
INFO:graphcat.common:Task expr executing. Inputs: {None: 3.141592653589793}
INFO:graphcat.common:Task expr finished. Output: 3
Output: 3
Note that the expression has access to the same inputs argument as any task - in fact, it has access to the same arguments as every task function, including graph, name, inputs, and - for streaming graphs - extent. Here’s an expression that converts its input into a nicely formatted string, using both name and inputs:
[6]:
graph.set_expression("expr", "f'Task {name!r} input: {inputs.getone(None):.4f}'")
print("Output:", graph.output("expr"))
graphcat.notebook.display(graph)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task expr updating.
INFO:graphcat.common:Task expr executing. Inputs: {None: 3.141592653589793}
INFO:graphcat.common:Task expr finished. Output: Task 'expr' input: 3.1416
Output: Task 'expr' input: 3.1416
Here’s an expression in a dynamic graph that returns one of its inputs - similar to the passthrough
function - but the choice of which input is itself an input:
[7]:
graph = graphcat.DynamicGraph()
logger = graphcat.Logger(graph)
graph.set_task("A", graphcat.constant("A"))
graph.set_task("B", graphcat.constant("B"))
graph.set_task("choice", graphcat.constant(0))
graph.set_expression("expr", "inputs.getone(inputs.getone('choice'))")
graph.add_links("A", ("expr", 0))
graph.add_links("B", ("expr", 1))
graph.add_links("choice", ("expr", "choice"))
print("Output:", graph.output("expr"))
graphcat.notebook.display(graph)
INFO:graphcat.common:Task expr updating.
INFO:graphcat.common:Task expr executing. Inputs: {0, 1, 'choice'}
INFO:graphcat.common:Task choice updating.
INFO:graphcat.common:Task choice executing. Inputs: {}
INFO:graphcat.common:Task choice finished. Output: 0
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {}
INFO:graphcat.common:Task A finished. Output: A
INFO:graphcat.common:Task expr finished. Output: A
Output: A
… and if we change the value of the “choice” task:
[8]:
graph.set_task("choice", graphcat.constant(1))
print("Output:", graph.output("expr"))
graphcat.notebook.display(graph)
INFO:graphcat.common:Task expr updating.
INFO:graphcat.common:Task expr executing. Inputs: {0, 1, 'choice'}
INFO:graphcat.common:Task choice updating.
INFO:graphcat.common:Task choice executing. Inputs: {}
INFO:graphcat.common:Task choice finished. Output: 1
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {}
INFO:graphcat.common:Task B finished. Output: B
INFO:graphcat.common:Task expr finished. Output: B
Output: B
As we mentioned earlier, an expression task has access to the same arguments as any task, including the graph object itself. What do you think will happen when we execute the expression in the following graph?
[9]:
graph.clear_tasks()
graph.set_task("A", graphcat.constant(2))
graph.set_task("B", graphcat.constant(3))
graph.set_expression("expr", 'inputs.getone(None) * graph.output("B")')
graph.set_links("A", "expr")
graphcat.notebook.display(graph)
There isn’t a link (dependency) between “B” and the expression task, but the expression is using the graph object to directly access the output from “B”. If we execute the expression, we will get the expected result:
[10]:
print("Output:", graph.output("expr"))
INFO:graphcat.common:Task expr updating.
INFO:graphcat.common:Task expr executing. Inputs: {None}
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {}
INFO:graphcat.common:Task A finished. Output: 2
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {}
INFO:graphcat.common:Task B finished. Output: 3
INFO:graphcat.common:Task expr finished. Output: 6
Output: 6
Two times three does equal six. However, you should be concerned that this introduces an implicit dependency between the “upstream” tasks “B” and the expression task - the point of a computational graph is that dependencies are explicitly captured by the links in the graph. Without those links, a change to an upstream task won’t trigger changes to downstream tasks that depend on it. Or will it? Let’s change “B” and see what happens:
[11]:
graph.set_task("B", graphcat.constant(5))
print("Output:", graph.output("expr"))
INFO:graphcat.common:Task expr updating.
INFO:graphcat.common:Task expr executing. Inputs: {None, <Input.IMPLICIT: 1>}
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {}
INFO:graphcat.common:Task B finished. Output: 5
INFO:graphcat.common:Task expr finished. Output: 10
Output: 10
Interesting - we got the correct result. Let’s take a look at the graph diagram to see what happened:
[12]:
graphcat.notebook.display(graph)
The expression task has added an edge to capture the implicit dependency introduced by the code in the expression! This edge ensures that changes to the upstream task cause affect the expression task.
Image Processing¶
Let’s say that you’re creating a hot new photo editing application whose job is to soften images, giving them a traditional “portrait” feel. A typical digital workflow for achieving this is to take an existing image, blur it, then blend the original with the blurred version to produce the softened output.
For our app, users will be able to specify the image to soften, and modify a set of parameters that control how much blurring will occur and how much of the blurred version is blended with the original. They’ll also have a “scale” parameter that controls the size of the output image. We want the app to be as responsive as possible, so it should only perform computations if they’re needed.
This is a perfect use-case for Graphcat! We’ll use it to create a set of tasks that represent our workflow and its parameters, and modify those parameters based on user input. Graphcat will make sure that every task is executed when - and only when - needed. The output from the final task in our network will be the softened image.
Note
For a more substantial image processing workflow based on Graphcat, see Imagecat (https://imagecat.readthedocs.io).
To get started, let’s import graphcat and create an empty computational graph:
[1]:
import graphcat
graph = graphcat.StaticGraph()
The first step in our workflow will be to load an image from disk. We’re going to use Pillow to do the heavy lifting, so you’ll need to install it with
$ pip install pillow
if you don’t already have it. With that out of the way, the first thing we need is a parameter with the filename of the image to be loaded. In Graphcat, everything that affects your computation - including parameters - should be represented as a task:
[2]:
graph.set_task("filename", graphcat.constant("astronaut.jpg"))
… note that graphcat.constant()
is used to create a task function that returns the given value. Next, we need to create the task that will actually load the image:
[3]:
import PIL.Image
def load(graph, name, inputs):
path = inputs.get("path")
return PIL.Image.open(path)
graph.set_task("load", load)
The load
function expects an input named path which will supply the filename to be loaded, and returns a Pillow image as output. Our “filename” task produces a filename, so we connect it to the “load” task’s path input:
[4]:
graph.set_links("filename", ("load", "path"))
Finally, let’s stop and take stock of what we’ve done so far, with a diagram of the current computational graph:
[5]:
import graphcat.notebook
graphcat.notebook.display(graph)
For our next step, we’ll resize the incoming image:
[6]:
def resize(graph, name, inputs):
image = inputs.get("image")
scale = inputs.get("scale")
return image.resize((int(image.width * scale), int(image.height * scale)))
graph.set_task("resize", resize)
Notice that the resize function expects an image for the image input, plus a scale factor for the scale input, and produces a modified image as output. The pattern of “parameter” tasks that feed into a larger task performing data manipulation is a common one. In fact, it’s so common that Graphcat provides a special helper - graphcat.static.StaticGraph.set_parameter()
- to simplify the process. Let’s use it to setup the scale:
[7]:
graph.set_parameter(target="resize", input="scale", source="scale_parameter", value=0.2)
And of course, we need to connect the load
function to resize
:
[8]:
graph.set_links("load", ("resize", "image"))
graphcat.notebook.display(graph)
Before going any further, let’s execute the current graph to see what the loaded image looks like:
[9]:
graph.output("resize")
[9]:

When we use graphcat.static.StaticGraph.output()
to retrieve the output of a task, it implicitly executes any unfinished tasks that might have an impact on the result. If we look at the graph diagram again, we see that all of the tasks have been executed (are rendered with a black background):
[10]:
graphcat.notebook.display(graph)
Creating the blurred version of the input image works much like the resize operation - the blur
task function takes an image and a blur radius as inputs, and produces a modified image as output:
[11]:
import PIL.ImageFilter
def blur(graph, name, inputs):
image = inputs.get("image")
radius = inputs.get("radius")
return image.filter(PIL.ImageFilter.GaussianBlur(radius))
graph.set_task("blur", blur)
graph.set_parameter("blur", "radius", "radius_parameter", 5)
graph.set_links("resize", ("blur", "image"))
graphcat.notebook.display(graph)
Notice that the tasks we just added have white backgrounds in the diagram, indicating that they haven’t been executed yet.
Now, we’re ready to combine the blurred and unblurred versions of the image. Notably, our “blend” task will take three inputs: one for each version of the image, plus one for the “alpha” parameter that will control how much each image contributes to the final result:
[12]:
import PIL.ImageChops
def blend(graph, name, inputs):
image1 = inputs.get("image1")
image2 = inputs.get("image2")
alpha = inputs.get("alpha")
return PIL.ImageChops.blend(image1, image2, alpha)
graph.set_task("blend", blend)
graph.set_parameter("blend", "alpha", "alpha_parameter", 0.65)
graph.add_links("resize", ("blend", "image1"))
graph.set_links("blur", ("blend", "image2"))
graphcat.notebook.display(graph)
That’s it! Now we’re ready to execute the graph and see the softened result:
[13]:
graph.output("blend")
[13]:

Don’t ask how, but I can confirm that the image now looks like it was taken in a department store, circa 1975.
Of course, executing the graph once doesn’t really demonstrate Graphcat’s true abilities. The real benefit of a computational graph only becomes clear when its parameters are changing, with the graph only executing the tasks that need to be recomputed.
To demonstrate this, we will use Jupyter notebook widgets - https://ipywidgets.readthedocs.io - to provide a simple, interactive user interface. In particular, we’ll use interactive sliders to drive the “scale”, “radius”, and “alpha” parameters in the computational graph. We won’t discuss how the widgets work in any detail, focusing instead on just the places where they are integrated with Graphcat. To begin, we will need to define some callback functions that will be called when the value of a widget changes:
[14]:
def set_graph_value(name):
def implementation(change):
graph.set_task(name, graphcat.constant(change["new"]))
return implementation
… when the function is called, it will assign an updated graphcat.constant()
function to the parameter task, with the widget’s new value.
Next, we’ll create the widgets, and connect them to their tasks:
[15]:
import ipywidgets as widgets
scale_widget = widgets.FloatSlider(description="scale:", min=0.01, max=1, value=0.2, step=0.01, continuous_update=False)
scale_widget.observe(set_graph_value("scale_parameter"), names="value")
radius_widget = widgets.FloatSlider(description="radius:", min=0, max=10, value=5, step=1, continuous_update=False)
radius_widget.observe(set_graph_value("radius_parameter"), names="value")
alpha_widget = widgets.FloatSlider(description="alpha:", min=0, max=1, value=0.7, step=0.01, continuous_update=False)
alpha_widget.observe(set_graph_value("alpha_parameter"), names="value")
We’ll also need an output widget where our results will be displayed:
[16]:
output_widget = widgets.Output()
output_widget.layout.height="1000px"
So we can see exactly which tasks are executed when a slider is moved, we will create our own custom logging function and connect it to the graph:
[17]:
def log_execution(graph, name, inputs):
with output_widget:
print(f"Executing {name}")
graph.on_execute.connect(log_execution);
This function will be called every time a task is executed.
We also need a function that will be called whenever the graph changes. This function will be responsible for clearing the previous output, displaying an up-to-date graph diagram, and displaying the new graph output:
[18]:
import IPython.display
def graph_changed(graph):
with output_widget:
IPython.display.clear_output(wait=True)
graphcat.notebook.display(graph)
IPython.display.display(graph.output("blend"))
graph.on_changed.connect(graph_changed);
Note that the “on_changed” event is emitted by the graph whenever tasks or connections are modified - in our case, every time we move a slider and change the corresponding parameter task. Finally, we’re ready to display our live user interface.
Note
If you’re reading this page online as part of the Graphcat documentation, the interface won’t be visible - to see it in operation, you need to run this notebook for yourself - if you don’t already have the full Graphcat sources from Github, you can download the individual notebook from https://github.com/shead-custom-design/graphcat/blob/main/docs/image-processing-case-study.ipynb
After executing the following, try dragging the sliders and watch the results change. Take note of the following:
- The graph diagram shows which tasks have been affected by parameter changes (white backgrounds).
- Below the graph diagram, our custom log output shows exactly which tasks are executed to produce the updated image.
- Drag each slider, and notice how the diagram and log outputs change:
- Only the tasks that are affected by the slider are executed.
[19]:
IPython.display.display(scale_widget)
IPython.display.display(radius_widget)
IPython.display.display(alpha_widget)
IPython.display.display(output_widget)
graph_changed(graph)
Performance Monitoring¶
As your computational graphs become more complex, it can be helpful to see which tasks are being executed, and how long it takes. To demonstrate how to do this, let’s start with a simple graph that simulates some long-running tasks:
[1]:
import graphcat
graph = graphcat.StaticGraph()
graph.set_task("A", graphcat.delay(2))
graph.set_task("B", graphcat.delay(1))
graph.set_task("C", graphcat.delay(0.1))
graph.set_task("D", graphcat.null)
graph.add_links("A", "B")
graph.add_links("B", ["C", "D"])
Note that we’ve used graphcat.delay
- which simply sleeps for the given time in seconds - as the task functions. Now, before we update the graph, we’ll create an instance of graphcat.PerformanceMonitor
, which will handle gathering information about each task as it executes:
[2]:
monitor = graphcat.PerformanceMonitor(graph)
Now, we’re ready to execute the graph and visualize the results:
[3]:
graph.update("C")
[4]:
import graphcat.diagram
import graphcat.notebook
agraph = graphcat.diagram.draw(graph, rankdir="TB")
agraph = graphcat.diagram.performance(agraph, monitor)
graphcat.notebook.display(agraph)
Note that graphcat.diagram.performance()
operates by supplementing the data in the original diagram. See Visualizing Graphs for more details.
As you can see, when we merge the performance data into our graph diagram, every task that executes displays the time it took, with a color coded dot highlighting which tasks were the slowest.
You can also access the performance data manually, as a mapping from tasks to execution times:
[5]:
monitor.tasks
[5]:
{'A': [2.0007550716400146],
'B': [1.0006828308105469],
'C': [0.10363316535949707]}
Note that the execution times are stored in a list, so you can track multiple times, not just the most recent. For example, if we force task “C” to execute again:
[6]:
graph.mark_unfinished("C")
graph.update("C")
monitor.tasks
[6]:
{'A': [2.0007550716400146],
'B': [1.0006828308105469],
'C': [0.10363316535949707, 0.10011696815490723]}
Finally, the monitor can be reset to begin recording times from scratch:
[7]:
monitor.reset()
graph.mark_unfinished("C")
graph.update("C")
monitor.tasks
[7]:
{'C': [0.1026008129119873]}
Streaming Graphs¶
This section assumes you’ve completed the Tutorial and have a good working knowledge of Graphcat.
As you’ve seen so far, computational graphs capture the dependencies in a process so that individual tasks can be executed only when they’re needed. Static graphs identify those tasks based solely on the topology of the graph. Dynamic graphs improve on this by detecting which task outputs are actually used, so that tasks with unused outputs can be skipped. The section on avoiding unnecessary updates describes how Graphcat takes this a step further by ensuring that irrelevant upstream changes don’t trigger downstream execution.
In this section, we introduce streaming graphs, which make it possible to retrieve a subset of a task’s total output. This can be useful in several ways: first, you might want to improve interactivity by reducing the work performed when there are minor upstream changes. An example of this might be a paint program where only the region under a brush stroke is updated, instead of an entire image. Another reason to use streaming is when you have a problem so large that it can’t be loaded into memory all at once: you can split the problem into smaller pieces and work on them one at a time, only accumulating the final result. Finally, you can use streaming to implement parallelism, using multiple threads or processes to each work on one piece of a problem.
But before we can get to all that, let’s begin with a simple streaming graph that computes the squares of values in a Numpy array
:
[1]:
import logging
logging.basicConfig(level=logging.DEBUG)
import graphcat.notebook
import numpy
def squared(graph, name, inputs, extent):
return numpy.power(inputs.getone(None, extent), 2)
graph = graphcat.StreamingGraph()
logger = graphcat.Logger(graph)
graph.set_task("A", graphcat.array(numpy.arange(8)))
graph.set_task("B", squared)
graph.set_links("A", "B")
graphcat.notebook.display(graph)
Note
Since this is a streaming graph, the arrows are drawn with partial arrow heads to suggest that the tasks can retrieve partial data.
Note that we used graphcat.array
to provide the source data instead of graphcat.constant
. Although the latter can certainly be used to provide a Numpy array as an output, the former provides additional functionality when used with streaming graphs.
Our squared task function simply uses numpy to compute the square of its inputs.
Let’s confirm that the default behavior of the graph is what we would normally expect:
[2]:
graph.output("B")
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {None} Extent: None
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {} Extent: None
INFO:graphcat.common:Task A finished. Output: [0 1 2 3 4 5 6 7]
INFO:graphcat.common:Task B finished. Output: [ 0 1 4 9 16 25 36 49]
[2]:
array([ 0, 1, 4, 9, 16, 25, 36, 49])
That looks pretty familiar - when requested, task “A” returns the array we used to initialize it, and task “B” computes the square of each value from “A”. We can see some new output in the logs, but otherwise everything is working the way it would with static and dynamic graphs.
What if the array was much larger, or task “B” was something truly time consuming, so that we wanted to perform an operation on just a subset? This is what streaming graphs were meant for. To do so, they allow us to specify “extents”, which define the subset of a task’s output that we want:
[3]:
graph.output("B", extent=graphcat.ArrayExtent[0:4])
INFO:graphcat.common:Task B updating.
INFO:graphcat.common:Task B executing. Inputs: {None} Extent: slice(0, 4, None)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {} Extent: slice(0, 4, None)
INFO:graphcat.common:Task A finished. Output: [0 1 2 3]
INFO:graphcat.common:Task B finished. Output: [0 1 4 9]
[3]:
array([0, 1, 4, 9])
There’s a lot to unpack here. First, we specified an extent when we called output
. An extent (which can only be used in streaming graphs) is any Python object that can be used by a task to specify a subset of its output. In this case, we used graphcat.ArrayExtent
, which can be thought of as a factory for creating extents that request a subset of an array using standard Numpy indexing. Here, we are requesting the first four values from the array (ArrayExtent[0:4]
).
Next, the extent is passed to the task function, which in this case is our squared function. Looking back at the function source:
def squared(graph, name, inputs, extent):
return numpy.power(inputs.getone(None, extent), 2)
… we see that the function takes a mandatory fourth argument - extent
. This is an important point when working with streaming graphs: the task functions must be written to accept the “extent” argument along with the standard “graph”, “name”, and “inputs” arguments, and do something with the extent information.
In the case of the squared function, the extent is simply passed-along in the call to inputs.getone(None, extent) … in other words, squared is counting on the upstream task providing its input to know what to do with the extents object.
This is why we used graphcat.array
to implement task “A”, because it knows how to use the extent returned by graphcat.ArrayExtent
, using it to return a subset of the stored array.
That subset is returned by task “A”, and task “B” computes the squares as requested.
Note that, as with static and dynamic graphs, streaming graphs don’t execute tasks unless they have to. For example, if we ask for the same data again, using the same extent:
[4]:
graph.output("A", extent=graphcat.ArrayExtent[0:4])
INFO:graphcat.common:Task A updating.
[4]:
array([0, 1, 2, 3])
… we get the same result, but the tasks haven’t been executed again. If we use a different extent:
[5]:
graph.output("A", extent=None)
INFO:graphcat.common:Task A updating.
INFO:graphcat.common:Task A executing. Inputs: {} Extent: None
INFO:graphcat.common:Task A finished. Output: [0 1 2 3 4 5 6 7]
[5]:
array([0, 1, 2, 3, 4, 5, 6, 7])
… then the tasks are executed. Note from the above that when you use None
as an extent, it means “return everything”. None
is the default extent everywhere in the API that extents are used, so the default behavior of streaming graph methods is to behave exactly like static and dynamic graphs.
Note that ArrayExtent
is just one example of an extent. You can use any Python object as an extent, so long as your task functions know how to make use of it. You could use a string containing an filepath or an SQL query as an extent, or a tuple containing a range of timestamps.
Furthermore, your task functions are free to modify or replace extents - for example, a task function that performs convolutional filtering on arrays would need to grow an incoming extent to avoid edge effects.
Visualizing Graphs¶
As you’ve probably noticed, visualizing computational graphs as diagrams plays a big role in this documentation, and these diagrams are extremely useful to understand and debug your own graphs. This section will demonstrate how you can customize the standard visualization for your own work.
First, let’s create a sample graph, and see the default visualization style:
[1]:
import contextlib
import graphcat
def sample_graph(graph):
graph.add_task("A")
graph.add_task("B")
graph.add_task("C")
graph.add_task("D")
graph.add_task("E")
graph.add_task("F")
graph.add_task("G", graphcat.raise_exception(RuntimeError()))
graph.set_links("A", "B")
graph.set_links("B", "D")
graph.set_links("C", "D")
graph.set_links("D", ["E", "G"])
graph.set_links("E", "F")
return graph
graph = sample_graph(graphcat.StaticGraph())
The easiest way to visualize our new graph is to use Graphcat’s builtin functionality to display it in a notebook:
[2]:
import graphcat.notebook
graphcat.notebook.display(graph)
The diagram represents each task as a box labelled with the task name. Arrows point in the direction of data flow, from upstream tasks to the downstream tasks that depend on them. The tasks are drawn as outlines because they haven’t been executed yet:
[3]:
graph.update("E")
with contextlib.suppress(RuntimeError):
graph.update("G")
graphcat.notebook.display(graph)
As you can see, the tasks that executed successfully are drawn as solid boxes, to suggest that they’re finished, while failed tasks (tasks that raise an exception), are highlighted in red.
Keep in mind that we’ve been looking at a StaticGraph
. Let’s see what the same topology looks like when we use a DynamicGraph
:
[4]:
graph = sample_graph(graphcat.DynamicGraph())
graph.update("E")
with contextlib.suppress(RuntimeError):
graph.update("G")
graphcat.notebook.display(graph)
The new diagram is different in two ways: first, because this is a dynamic graph and task “E” doesn’t use any of its inputs (it’s using the default graphcat.null
task function), none of the upstream tasks have been executed. This is expected behavior for a dynamic graph, see Dynamic Graphs for details.
Second, the arrows are drawn as outlines instead of solid. This is meant to suggest that the relationships are conditional and dynamic rather than static: an upstream task is only executed if the downstream task actually uses its output.
Let’s look at the equivalent StreamingGraph
:
[5]:
graph = sample_graph(graphcat.StreamingGraph())
graph.update("E", extent=None)
with contextlib.suppress(RuntimeError):
graph.update("G", extent=None)
graphcat.notebook.display(graph)
You can see that the streaming graph arrows are drawn as outlined half arrows, to suggest that the streaming graph is dynamic and that downstream tasks can request partial results from upstream tasks.
Now that we’ve seen the default behavior for diagrams, let’s look at the ways with which we can customize them.
First, depending on the size of your graph, the lengths of its task names, and the width of your screen, you may wish to display the graph with the data flowing top-to-bottom instead of the default left-to-right:
[6]:
graphcat.notebook.display(graph, rankdir="TB")
Depending on the reading direction of your language, you might also prefer right-to-left flow:
[7]:
graphcat.notebook.display(graph, rankdir="RL")
You could also render the flow bottom-to-top, if you’ve been working on your Tagbanwa:
[8]:
graphcat.notebook.display(graph, rankdir="BT")
Next, let’s start to modify the appearance of the graph, not just the layout. To do so, we’ll need to import a new module, graphcat.diagram
, which is where all of Graphcat’s drawing actually takes place:
[9]:
import graphcat.diagram
agraph = graphcat.diagram.draw(graph)
The call to graphcat.diagram.draw()
returns an AGraph
, which is provided by the PyGraphviz library. Once you have an AGraph created by graphcat, you can use its API to make modifications. For example, you might want to change the appearance of a key task:
[10]:
agraph.get_node("C").attr.update(color="royalblue", fillcolor="royalblue", fontcolor="white", shape="circle")
To render the modified graph, pass it to the usual display function:
[11]:
graphcat.notebook.display(agraph)
You could also highlight an important relationship:
[12]:
agraph.get_edge("A", "B").attr.update(color="seagreen", penwidth="2", arrowhead="normal")
graphcat.notebook.display(agraph)
Or you could add a supplemental label to a task:
[13]:
agraph.get_node("A").attr.update(xlabel="192.168.0.1")
agraph.graph_attr["rankdir"] = "TB"
graphcat.notebook.display(agraph)
Finally, you might want to render your graph as a bitmap image instead of an SVG, which can be done directly using the pygraphviz.AGraph
API:
[14]:
import IPython.display
IPython.display.display(IPython.display.Image(data=agraph.draw(prog="dot", format="png")))

Or, you might want to write the image directly to disk:
[15]:
agraph.draw(path="test.svg", prog="dot", format="svg")

Development¶
Getting Started¶
If you haven’t already, you’ll want to get familiar with the Graphcat repository at http://github.com/shead-custom-design/graphcat … there, you’ll find the Graphcat source code, issue tracker, discussions, and wiki.
You’ll need to install Graphviz and pandoc, neither of which can be installed via pip. If you use Conda (which we strongly recommend), you can install them as follows:
$ conda install graphviz pandoc
Next, you’ll need to install all of the extra dependencies needed for Graphcat development:
$ pip install graphcat[all]
Then, you’ll be ready to obtain Graphcat’s source code and install it using “editable mode”. Editable mode is a feature provided by pip that links the Graphcat source code into the install directory instead of copying it … that way you can edit the source code in your git sandbox, and you don’t have to keep re-installing it to test your changes:
$ git clone https://github.com/shead-custom-design/graphcat.git
$ cd graphcat
$ pip install --editable .
Versioning¶
Graphcat version numbers follow the Semantic Versioning standard.
Coding Style¶
The Graphcat source code follows the PEP-8 Style Guide for Python Code.
Running Regression Tests¶
To run the Graphcat test suite, simply run regression.py from the top-level source directory:
$ cd graphcat
$ python regression.py
The tests will run, providing feedback on successes / failures.
Test Coverage¶
When you run the test suite with regression.py, it also automatically generates code coverage statistics. To see the coverage results, open graphcat/.cover/index.html in a web browser.
Building the Documentation¶
To build the documentation, run:
$ cd graphcat/docs
$ make html
Once the documentation is built, you can view it by opening graphcat/docs/_build/html/index.html in a web browser.

API Reference¶
Contents:
graphcat module¶
Functionality for managing and executing computational graphs.
graphcat.common module¶
Helper classes and functions that can be used with more than one graph type.
-
class
graphcat.common.
Array
(value)[source]¶ Bases:
object
Task function callable that returns a caller-supplied array.
Parameters: value ( numpy.ndarray
convertable object, required) – The array to use as the output for this callable.See also
-
class
graphcat.common.
ArrayExtent
[source]¶ Bases:
object
Helper for creating
Array
compatible streaming extents.To generate extents, use any numpy-compatible indexing notation:
extent = ArrayExtent[0:4096] extent = ArrayExtent[::2] extent = ArrayExtent[:, 0] ...
These extents are compatible with
Array
.
-
class
graphcat.common.
Constant
(value)[source]¶ Bases:
object
Task function callable that returns a caller-supplied value.
Parameters: value (any Python object, required) – The value to use as the output for this callable. See also
-
class
graphcat.common.
Delay
(seconds)[source]¶ Bases:
object
Task function callable that sleeps for a fixed time.
This is mainly useful for testing and debugging.
Parameters: seconds (number, required) – The number of seconds to sleep when executed. See also
-
exception
graphcat.common.
DeprecationWarning
[source]¶ Bases:
Warning
Warning category for deprecated code.
-
class
graphcat.common.
Input
[source]¶ Bases:
enum.Enum
Enumerates special
graphcat.graph.Graph
named inputs.-
IMPLICIT
= 1¶ Named input for links that are generated automatically for use as implicit dependencies, not data sources.
-
-
class
graphcat.common.
Logger
(graph, log_exceptions=True, log_inputs=True, log_outputs=True, log_extents=True, log=<Logger graphcat.common (WARNING)>)[source]¶ Bases:
object
Log graph events.
Create a
Logger
object to see the behavior of the graph during updates, using the Pythonlogging
module:logger = graphcat.Logger(graph)
This is useful for debugging and pedagogy. The logger will generate output for five types of event:
- cycle - a cycle is detected during updating.
- executed - a task is executed.
- failed - a task raises an exception during execution.
- finished - a task executes successfully.
- updated - a task is updated.
Update events happen regardless of the state of a task. Execute events only happen if the task isn’t already finished. Failed and finished events only happen if a task is executed.
Parameters: graph (class:graphcat.graph.Graph, required) – The graph whose events will be logged.
-
class
graphcat.common.
Passthrough
(input)[source]¶ Bases:
object
Task function callable that always returns an upstream input.
Parameters: input (hashable object, required) – Name of the input to be returned when this task is executed. Note that there must be exactly one connection to the named input, or an exception will be raised. See also
-
class
graphcat.common.
PerformanceMonitor
(graph)[source]¶ Bases:
object
Tracks the performance of graph tasks as they’re executed.
Parameters: graph ( graphcat.graph.Graph
, required) – Graph whose performance will be monitored.
-
class
graphcat.common.
RaiseException
(exception)[source]¶ Bases:
object
Task function callable that raises an exception when executed.
This is mainly useful for testing and debugging.
Parameters: exception ( Exception
, required) – The exception to be raised when the task is executed.See also
-
class
graphcat.common.
TaskState
[source]¶ Bases:
enum.Enum
Enumerates
graphcat.graph.Graph
task states.Every task within a
graphcat.graph.Graph
will always be in one of the following states.See also
-
FAILED
= 2¶ The task or one of it’s dependencies failed during the last update.
-
FINISHED
= 3¶ The task executed successfully during the last update.
-
UNFINISHED
= 1¶ The task is out-of-date and will be executed during the next update.
-
-
class
graphcat.common.
UpdatedTasks
(graph)[source]¶ Bases:
object
Maintains a list of graph tasks that have been updated.
Parameters: graph ( graphcat.graph.Graph
, required) – Graph to watch for task updates.
-
graphcat.common.
array
(value)[source]¶ Factory for task functions that return array values when executed.
Note
This callable is designed to be compatible with
ArrayExtent
extents when used in agraphcat.streaming.StreamingGraph
.Parameters: value ( numpy.ndarray
-convertable value, required) – The array to return when the task is executed.Returns: fn – Task function that will always return value when executed. Return type: Array
-
graphcat.common.
automatic_dependencies
(fn)[source]¶ Function decorator that automatically tracks dependencies.
Use this to decorate task functions that need dependency tracking, such as
evaluate()
.See also
graphcat.graph.Graph.set_expression()
- Convenience method that configures a task to evaluate expressions and automatically track dependencies.
-
graphcat.common.
builtins
(graph, name, inputs, extent=None)[source]¶ Returns standard builtin symbols for expression tasks.
See also
-
graphcat.common.
constant
(value)[source]¶ Factory for task functions that return constant values when executed.
This is useful when creating a task that will act as a parameter for a downstream task:
graph.add_task("theta", constant(math.pi))
To change the parameter later, use
constant()
again, withGraph.set_task_fn()
to specify a new function:graph.set_task_fn("theta", constant(math.pi / 2))
Parameters: value (any value, required) – The value to return when the task is executed. Returns: fn – Task function that will always return value when executed. Return type: Constant
-
graphcat.common.
consume
(graph, name, inputs, extent=None)[source]¶ Task function that retrieves all its inputs, but otherwise does nothing.
This is mainly useful for debugging
dynamic graphs
, since the defaultnull()
task function won’t execute upstream nodes.
-
graphcat.common.
delay
(seconds)[source]¶ Factory for task functions that sleep for a fixed time.
This is mainly useful for testing and debugging.
Parameters: seconds (number, required) – Number of seconds to sleep when executed. Returns: fn – Task function that will always sleep for seconds when executed. Return type: function
-
graphcat.common.
evaluate
(code, symbols=None)[source]¶ Factory for task functions that evaluate Python expressions.
If your expressions can access the output from other tasks in the graph, you will want to use this function with the
automatic_dependencies()
decorator, or usegraphcat.graph.Graph.set_expression()
which sets up dependency tracking for you.See also
graphcat.graph.Graph.set_expression()
- Convenience method that configures a task to evaluate expressions and automatically track dependencies.
Parameters: - code (string, required) – Python code to be executed when the task is executed.
- symbols (callable, optional) – Function that returns a Python dict containing symbols that will be
available to the expression when it’s executed. If
None
(the default), thebuiltins()
function will be used, which gives the expression access to the same graph, name, inputs, and extent objects as a normal task function.
Returns: fn – Task function that will execute Python code when the task is executed.
Return type: function
-
graphcat.common.
null
(graph, name, inputs, extent=None)[source]¶ Task function that does nothing.
This is the default if you don’t specify a function for
Graph.add_task()
orGraph.set_task_fn()
, and is useful in debugging and pedagogy.
-
graphcat.common.
passthrough
(input=None)[source]¶ Factory for task functions that pass-through incoming data.
Callers can use this function to temporarily bypass tasks in the graph.
Parameters: input (hashable object, required) – The named input that will pass-through to the task output. Returns: fn – Task function that will pass the input value named input to its output. Return type: function
-
graphcat.common.
raise_exception
(exception)[source]¶ Factory for task functions that raise an exception when executed.
This is mainly useful for testing and debugging.
Parameters: exception ( BaseException
derivative, required) – The exception to raise when the task is executed.Returns: fn – Task function that will always raise exception when executed. Return type: function
graphcat.diagram module¶
Functionality for drawing diagrams of computational graphs.
-
graphcat.diagram.
draw
(graph, hide=None, rankdir='LR')[source]¶ Create a diagram of a computational graph.
This is extremely useful for understanding and debugging computational graphs. The structure and current state is converted to a PyGraphviz graph. By default, each task is rendered as a box with the task label. Arrows are drawn between tasks, pointing from upstream producers of data to downstream consumers. Arrows are labelled to show named inputs, if any. The color of each box shows its state: white for unfinished tasks, red for tasks that are failed, and black for tasks that are finished.
Callers can customize the appearance of the graph by modifying the result before rendering it to an image or Jupyter notebook.
Parameters: - graph (
graphcat.graph.Graph
derivative orpygraphviz.AGraph
, required) – The graph to be visualized. - hide (Python callable, optional) – Python callable that can be used to hide tasks in the displayed figure.
If
None
(the default), all tasks will be displayed. Ignored if graph is an instance ofpygraphviz.AGraph
. - rankdir (
str
, optional) – Graphviz rankdir attribute that determines the direction of data flow within the diagram. Default:"LR"
, which is left-to-right flow. Ignored if graph is an instance ofpygraphviz.AGraph
.
Returns: diagram – Diagrammatic representation of graph. Callers can modify diagram as needed before using its layout and drawing methods to produce a final image.
Return type: See also
- graph (
-
graphcat.diagram.
leaves
(graph, node)[source]¶ Filter function that hides all leaf nodes when displaying a graph using
draw()
.
-
graphcat.diagram.
none
(graph, node)[source]¶ Do-nothing filter function used to display an entire graph using
draw()
.
-
graphcat.diagram.
performance
(agraph, monitor)[source]¶ Add performance monitor information to a graph diagram.
Parameters: - agraph (
pygraphviz.AGraph
, required) – Diagram originally created usingdraw()
. - monitor (
graphcat.common.PerformanceMonitor
, required) – Performance monitor object containing performance results to be added to agraph
Returns: diagram – Input diagram supplemented with performance results from monitor.
Return type: - agraph (
graphcat.dynamic module¶
Implements computational graphs using dynamic dependency analysis.
-
class
graphcat.dynamic.
DynamicGraph
[source]¶ Bases:
graphcat.graph.Graph
Manages a dynamic computational graph.
The graph is a collection of named tasks, connected by links that define dependencies between tasks. Updating a task implicitly updates all of its transitive dependencies. When an unfinished task is updated, it executes a user-supplied function and stores the function return value as the task output. Outputs of upstream tasks are automatically passed as inputs to downstream tasks.
-
add_links
(source, targets)¶ Add links between source and targets.
Note that calling
add_links()
multiple times will create multiple, parallel links between tasks.See also
set_links()
,which()
Parameters: - source (hashable object, required) – Name of the task that will act as a data source.
- targets (tuple, or list of tuples, required) – Each (task, input) tuple specifies the target of a link.
Raises: ValueError
– If source or target don’t exist.
-
add_task
(name, fn=None)¶ Add a task to the graph.
This function will raise an exception if the task already exists.
See also
set_task()
- idempotent alternative to
add_task()
.
Parameters: - name (hashable object, required) – Unique label that will identify the task.
- fn (callable, optional) – The fn object will be called whenever the task is executed. It must take two keyword arguments
as parameters, label and inputs. name will contain the unique task name. inputs will
be a dict mapping named inputs to a sequence of outputs returned from upstream tasks.
If
None
(the default),graphcat.common.null()
will be used.
Raises: ValueError
– If label already exists.
-
clear_links
(source, target)¶ Remove links from the graph.
This method will remove all links from source to target.
Parameters: - source (hashable object, required) – Source task name.
- target (hashable object, required) – Target task name.
Raises: ValueError
– If source or task don’t exist.
-
clear_tasks
(names=None)¶ Remove tasks from the graph, along with all related links.
Note that downstream tasks will become unfinished.
Parameters: names ( None
, hashable object, or list|set of hashable objects, optional) – Names identifying the tasks to remove. IfNone
(the default), all tasks are removed, emptying the graph.
-
links
(names=None)¶ Return every link originating with the given names.
Parameters: names ( None
, hashable object, or list|set of hashable objects, optional) – Names identifying the source tasks from which to retrieve links.Returns: links – (source, target, input) tuple for every link in the graph. Return type: list
-
mark_unfinished
(names=None)¶ Set the unfinished state for tasks and all downstream dependents.
Normally, the unfinished state is set automatically when changes are made to the graph. This method is provided for callers who need to set the unfinished state in response to some outside event that the graph isn’t aware of; this should only happen in extremely rare situations.
Parameters: names ( None
, hashable object, or list|set of hashable objects, required) – Task names to be marked as unfinished. IfNone
(the default), the entire graph is marked unfinished.
-
on_changed
¶ Signal emitted whenever a part of the graph becomes unfinished.
Functions invoked by this signal must have the signature fn(graph), where graph is this object.
Returns: signal Return type: blinker.base.Signal
-
on_cycle
¶ Signal emitted if a cycle is detected during updating.
Functions invoked by this signal must have the signature fn(graph, name), where graph is this object.
Returns: signal Return type: blinker.base.Signal
-
on_execute
¶ Signal emitted before a task is executed.
Functions invoked by this signal must have the signature fn(graph, name, input), where graph is this object, name is the name of the task to be executed, and input is a dict containing the task inputs.
Returns: signal Return type: blinker.base.Signal
-
on_failed
¶ Signal emitted when a task fails during execution.
Functions invoked by this signal must have the signature fn(graph, name, exception), where graph is this object, name is the name of the task that failed, and exception is the exception raised by the task.
Returns: signal Return type: blinker.base.Signal
-
on_finished
¶ Signal emitted when a task executes successfully.
Functions invoked by this signal must have the signature fn(graph, name, output), where graph is this object, name is the name of the task that executed successfully, and output is the return value from the task function.
Returns: signal Return type: blinker.base.Signal
-
on_task_renamed
¶ Signal emitted when a task is renamed.
Functions invoked by this signal must have the signature fn(graph, oldname, newname), where graph is this object, oldname is the original name of the task, and newname is its current name.
Returns: signal Return type: blinker.base.Signal
-
on_update
¶ Signal emitted when a task is updated.
Functions invoked by this signal must have the signature fn(graph, name), where graph is this object and name is the name of the task to be updated.
Returns: signal Return type: blinker.base.Signal
-
output
(name)[source]¶ Retrieve the output from a task.
This implicitly updates the graph, so the returned value is guaranteed to be up-to-date.
Parameters: name (hashable object, required) – Unique task name.
Returns: output – The value returned when the task function was last executed, or
None
.Return type: any object
Raises: ValueError
– If name doesn’t exist.Exception
– Any exception raised by a task function will be re-raised byoutput()
.
-
rename_task
(oldname, newname)¶ Change an existing task’s name.
This modifies an existing task’s name and modifies any related links as-necessary. In addition, the task and any downstream dependents will become unfinished.
Parameters: - oldname (hashable object, required) – Existing original task name.
- newname (hashable object, required) – Unique new task name.
Raises: ValueError
– If the task with oldname doesn’t exist, or a task with newname already exists.
-
set_expression
(name, expression, symbols=None)¶ Create a task that evaluates a Python expression, returning its value.
The task will automatically track implicit dependencies that arise from executing the expression.
Parameters: - name (hashable object, required) – Unique name for the new expression task.
- expression (string, required) – Python expression that will be evaluated whenever the task is executed.
- symbols (callable, optional) – Function that returns a Python dict containing symbols that will be
available to the expression when it’s executed. If
None
(the default), thegraphcat.common.builtins()
function will be used, which gives the expression access to graph, name, inputs, and extent objects that match the arguments to a normal task function.
-
set_links
(source, targets)¶ Set links between source and targets.
Note
This function overrides all links from source.
Parameters: - source (hashable object, required) – Name of the task that will act as a data source.
- targets (tuple, or list of tuples, required) – Each (task, input) tuple specifies the target of a link.
Raises: ValueError
– If source or target don’t exist.
-
set_parameter
(target, input, source, value)¶ Create and link a ‘parameter’ task in one step.
Because they’re so ubiquitous, this method simplifies the creation of “parameter” tasks - tasks that return a value for use as a parameter in some other task. It consolidates creating the parameter task and linking it with an existing computational task into one step.
Parameters: - target (hashable object, required) – Name of the task that will use the parameter.
- input (hashable object, required) – Named input that will receive the parameter.
- source (hashable object, required) – Name of the task that will store the parameter.
- value (any Python object, required) – Parameter value.
Raises: ValueError
– If target doesn’t exist.
-
set_task
(name, fn)¶ Add a task to the graph if it doesn’t exist, and set its task function.
Note that this will mark downstream tasks as unfinished.
Parameters: - name (hashable object, required) – Unique name that will identify the task.
- fn (callable, required) – The fn object will be called whenever the task is executed. It must take two keyword arguments as parameters, name and inputs. name will contain the unique task name. inputs will be a dict mapping named inputs to sequences of outputs returned from upstream tasks.
-
state
(name)¶ Return the current state of a task.
Parameters: name (hashable object, required) – Unique name that identifies the task. Returns: state – Enumeration describing the current task state. Return type: graphcat.common.TaskState
Raises: ValueError
– If name doesn’t exist.
-
tasks
()¶ Return the name of every task in the graph.
Returns: tasks – Names for every task in the graph. Return type: set
-
update
(name)[source]¶ Update a task and all of its transitive dependencies.
Parameters: name (hashable object, required) – Name identifying the task to be updated.
Raises: ValueError
– If the task with name doesn’t exist.Exception
– Any exception raised by a task function will be re-raised byupdate()
.
-
-
class
graphcat.dynamic.
NamedInputs
(graph, name)[source]¶ Bases:
object
Access named inputs for a graph task.
Parameters: - graph (
DynamicGraph
, required) – Graph containing a task. - name (hashable object, required) – Existing task unique name.
-
get
(name, default=None)[source]¶ Return a single input value.
Use this method to return a value when you expect to have either zero or one input that matches name.
Parameters: - name (hashable object, required) – Name of the input value to return.
- default (any Python value, optional) – If an input matching name doesn’t exist, this value will be
returned instead. Defaults to
None
.
Returns: value – The value of input name, or default.
Return type: any Python value
Raises: KeyError
: if more than one input matches name.
-
getall
(name)[source]¶ Return multiple input values.
Use this method to return every input value that matches name.
Parameters: name (hashable object, required) – Name of the input value to return. Returns: values – Values from every input that matches name. Returns an empty list if there are none. Return type: list of Python values
-
getone
(name)[source]¶ Return a single input value.
Use this method to return a value when you expect to have exactly one input that matches name.
Parameters: name (hashable object, required) – Name of the input value to return. Returns: value – The value of input name. Return type: any Python value Raises: KeyError
: if more or less than one input matches name.
-
items
()[source]¶ Return names and values for every input attached to this task.
Note
For each (name, value) pair returned by this method, the value is a callable that returns the actual value from the upstream task.
Returns: values – The name and value of every input attached to this task. Return type: sequence of (hashable object, callable) tuples
-
keys
()[source]¶ Return names for every input attached to this task.
Returns: names – The name of every input attached to this task. Note that the same name may appear more than once in the sequence. Return type: sequence of hashable objects
-
values
()[source]¶ Return values for every input attached to this task.
Note
Each value returned by this method is a callable that returns the actual value from the upstream task.
Returns: values – The value of every input attached to this task, in the same order as keys()
.Return type: sequence of callables
- graph (
graphcat.graph module¶
Abstract interfaces for computational graphs.
-
class
graphcat.graph.
Graph
[source]¶ Bases:
abc.ABC
Abstract base class for computational graphs.
The graph is a collection of named tasks, connected by links that define dependencies between tasks. Updating a task implicitly updates all of its transitive dependencies. When an unfinished task is updated, it executes a user-supplied function and stores the function return value as the task output. Outputs of upstream tasks are automatically passed as inputs to downstream tasks.
-
add_links
(source, targets)[source]¶ Add links between source and targets.
Note that calling
add_links()
multiple times will create multiple, parallel links between tasks.See also
set_links()
,which()
Parameters: - source (hashable object, required) – Name of the task that will act as a data source.
- targets (tuple, or list of tuples, required) – Each (task, input) tuple specifies the target of a link.
Raises: ValueError
– If source or target don’t exist.
-
add_task
(name, fn=None)[source]¶ Add a task to the graph.
This function will raise an exception if the task already exists.
See also
set_task()
- idempotent alternative to
add_task()
.
Parameters: - name (hashable object, required) – Unique label that will identify the task.
- fn (callable, optional) – The fn object will be called whenever the task is executed. It must take two keyword arguments
as parameters, label and inputs. name will contain the unique task name. inputs will
be a dict mapping named inputs to a sequence of outputs returned from upstream tasks.
If
None
(the default),graphcat.common.null()
will be used.
Raises: ValueError
– If label already exists.
-
clear_links
(source, target)[source]¶ Remove links from the graph.
This method will remove all links from source to target.
Parameters: - source (hashable object, required) – Source task name.
- target (hashable object, required) – Target task name.
Raises: ValueError
– If source or task don’t exist.
-
clear_tasks
(names=None)[source]¶ Remove tasks from the graph, along with all related links.
Note that downstream tasks will become unfinished.
Parameters: names ( None
, hashable object, or list|set of hashable objects, optional) – Names identifying the tasks to remove. IfNone
(the default), all tasks are removed, emptying the graph.
-
links
(names=None)[source]¶ Return every link originating with the given names.
Parameters: names ( None
, hashable object, or list|set of hashable objects, optional) – Names identifying the source tasks from which to retrieve links.Returns: links – (source, target, input) tuple for every link in the graph. Return type: list
-
mark_unfinished
(names=None)[source]¶ Set the unfinished state for tasks and all downstream dependents.
Normally, the unfinished state is set automatically when changes are made to the graph. This method is provided for callers who need to set the unfinished state in response to some outside event that the graph isn’t aware of; this should only happen in extremely rare situations.
Parameters: names ( None
, hashable object, or list|set of hashable objects, required) – Task names to be marked as unfinished. IfNone
(the default), the entire graph is marked unfinished.
-
on_changed
¶ Signal emitted whenever a part of the graph becomes unfinished.
Functions invoked by this signal must have the signature fn(graph), where graph is this object.
Returns: signal Return type: blinker.base.Signal
-
on_cycle
¶ Signal emitted if a cycle is detected during updating.
Functions invoked by this signal must have the signature fn(graph, name), where graph is this object.
Returns: signal Return type: blinker.base.Signal
-
on_execute
¶ Signal emitted before a task is executed.
Functions invoked by this signal must have the signature fn(graph, name, input), where graph is this object, name is the name of the task to be executed, and input is a dict containing the task inputs.
Returns: signal Return type: blinker.base.Signal
-
on_failed
¶ Signal emitted when a task fails during execution.
Functions invoked by this signal must have the signature fn(graph, name, exception), where graph is this object, name is the name of the task that failed, and exception is the exception raised by the task.
Returns: signal Return type: blinker.base.Signal
-
on_finished
¶ Signal emitted when a task executes successfully.
Functions invoked by this signal must have the signature fn(graph, name, output), where graph is this object, name is the name of the task that executed successfully, and output is the return value from the task function.
Returns: signal Return type: blinker.base.Signal
-
on_task_renamed
¶ Signal emitted when a task is renamed.
Functions invoked by this signal must have the signature fn(graph, oldname, newname), where graph is this object, oldname is the original name of the task, and newname is its current name.
Returns: signal Return type: blinker.base.Signal
-
on_update
¶ Signal emitted when a task is updated.
Functions invoked by this signal must have the signature fn(graph, name), where graph is this object and name is the name of the task to be updated.
Returns: signal Return type: blinker.base.Signal
-
output
(name)[source]¶ Retrieve the output from a task.
This implicitly updates the graph, so the returned value is guaranteed to be up-to-date.
Parameters: name (hashable object, required) – Unique task name.
Returns: output – The value returned when the task function was last executed, or
None
.Return type: any object
Raises: ValueError
– If name doesn’t exist.Exception
– Any exception raised by a task function will be re-raised byoutput()
.
-
rename_task
(oldname, newname)[source]¶ Change an existing task’s name.
This modifies an existing task’s name and modifies any related links as-necessary. In addition, the task and any downstream dependents will become unfinished.
Parameters: - oldname (hashable object, required) – Existing original task name.
- newname (hashable object, required) – Unique new task name.
Raises: ValueError
– If the task with oldname doesn’t exist, or a task with newname already exists.
-
set_expression
(name, expression, symbols=None)[source]¶ Create a task that evaluates a Python expression, returning its value.
The task will automatically track implicit dependencies that arise from executing the expression.
Parameters: - name (hashable object, required) – Unique name for the new expression task.
- expression (string, required) – Python expression that will be evaluated whenever the task is executed.
- symbols (callable, optional) – Function that returns a Python dict containing symbols that will be
available to the expression when it’s executed. If
None
(the default), thegraphcat.common.builtins()
function will be used, which gives the expression access to graph, name, inputs, and extent objects that match the arguments to a normal task function.
-
set_links
(source, targets)[source]¶ Set links between source and targets.
Note
This function overrides all links from source.
Parameters: - source (hashable object, required) – Name of the task that will act as a data source.
- targets (tuple, or list of tuples, required) – Each (task, input) tuple specifies the target of a link.
Raises: ValueError
– If source or target don’t exist.
-
set_parameter
(target, input, source, value)[source]¶ Create and link a ‘parameter’ task in one step.
Because they’re so ubiquitous, this method simplifies the creation of “parameter” tasks - tasks that return a value for use as a parameter in some other task. It consolidates creating the parameter task and linking it with an existing computational task into one step.
Parameters: - target (hashable object, required) – Name of the task that will use the parameter.
- input (hashable object, required) – Named input that will receive the parameter.
- source (hashable object, required) – Name of the task that will store the parameter.
- value (any Python object, required) – Parameter value.
Raises: ValueError
– If target doesn’t exist.
-
set_task
(name, fn)[source]¶ Add a task to the graph if it doesn’t exist, and set its task function.
Note that this will mark downstream tasks as unfinished.
Parameters: - name (hashable object, required) – Unique name that will identify the task.
- fn (callable, required) – The fn object will be called whenever the task is executed. It must take two keyword arguments as parameters, name and inputs. name will contain the unique task name. inputs will be a dict mapping named inputs to sequences of outputs returned from upstream tasks.
-
state
(name)[source]¶ Return the current state of a task.
Parameters: name (hashable object, required) – Unique name that identifies the task. Returns: state – Enumeration describing the current task state. Return type: graphcat.common.TaskState
Raises: ValueError
– If name doesn’t exist.
-
tasks
()[source]¶ Return the name of every task in the graph.
Returns: tasks – Names for every task in the graph. Return type: set
-
update
(name)[source]¶ Update a task and all its transitive dependencies.
Parameters: name (hashable object, required) – Name identifying the task to be updated.
Raises: ValueError
– If the task with name doesn’t exist.Exception
– Any exception raised by a task function will be re-raised byupdate()
.
-
graphcat.notebook module¶
Integration with Jupyter notebooks, https://jupyter.org
-
graphcat.notebook.
display
(graph, hide=None, rankdir='LR')[source]¶ Display a computational graph inline in a Jupyter notebook.
This is extremely useful for understanding and debugging graphs. The structure and current state of the graph is displayed as an inline SVG graphic. See
graphcat.diagram.draw()
for details.Parameters: - graph (
graphcat.Graph
orpygraphviz.AGraph
, required) – The graph to be visualized. - hide (Python callable, optional) – Python callable that can be used to hide tasks in the displayed figure.
If
None
(the default), all tasks will be displayed. Ignored if graph is apygraphviz.AGraph
. - rankdir (
str
, optional) – Graphviz rankdir attribute that determines the direction of data flow within the diagram. Default:"LR"
, which is left-to-right flow. Ignored if graph is apygraphviz.AGraph
.
- graph (
graphcat.optional module¶
Helpers for implementing optional functionality.
-
graphcat.optional.
module
(name)[source]¶ Quietly load a module by name, ignoring errors.
Note that dotted names, e.g. pkg.mod will return the top-level package, just like the import statement.
Parameters: name ( str
) – Name of the module to be loaded.Returns: module Return type: loaded module if successful, or None
.
graphcat.require module¶
Functionality for testing preconditions and assertions.
-
graphcat.require.
loaded_module
(modules)[source]¶ Function decorator that tests whether module(s) have already been loaded.
Parameters: modules ( str
or sequence ofstr
, required) – Names of the modules that must already be loaded for the wrapped function to execute.Raises: RuntimeError
– If any module in modules isn’t already loaded.
graphcat.static module¶
Implements computational graphs using static dependency analysis.
-
class
graphcat.static.
NamedInputs
(graph, name)[source]¶ Bases:
object
Access named inputs for a graph task.
Parameters: - graph (
StaticGraph
, required) – Graph containing a task. - name (hashable object, required) – Existing task unique name.
-
get
(name, default=None)[source]¶ Return a single input value.
Use this method to return a value when you expect to have either zero or one input that matches name.
Parameters: - name (hashable object, required) – Name of the input value to return.
- default (any Python value, optional) – If an input matching name doesn’t exist, this value will be
returned instead. Defaults to
None
.
Returns: value – The value of input name, or default.
Return type: any Python value
Raises: KeyError
: if more than one input matches name.
-
getall
(name)[source]¶ Return multiple input values.
Use this method to return every input value that matches name.
Parameters: name (hashable object, required) – Name of the input value to return. Returns: values – Values from every input that matches name. Returns an empty list if there are none. Return type: list of Python values
-
getone
(name)[source]¶ Return a single input value.
Use this method to return a value when you expect to have exactly one input that matches name.
Parameters: name (hashable object, required) – Name of the input value to return. Returns: value – The value of input name. Return type: any Python value Raises: KeyError
: if more or less than one input matches name.
-
items
()[source]¶ Return names and values for every input attached to this task.
Note
For each (name, value) pair returned by this method, the value is a callable that returns the actual value from the upstream task.
Returns: values – The name and value of every input attached to this task. Return type: sequence of (hashable object, callable) tuples
-
keys
()[source]¶ Return names for every input attached to this task.
Returns: names – The name of every input attached to this task. Note that the same name may appear more than once in the sequence. Return type: sequence of hashable objects
-
values
()[source]¶ Return values for every input attached to this task.
Note
Each value returned by this method is a callable that returns the actual value from the upstream task.
Returns: values – The value of every input attached to this task, in the same order as keys()
.Return type: sequence of callables
- graph (
-
class
graphcat.static.
StaticGraph
[source]¶ Bases:
graphcat.graph.Graph
Manages a static computational graph.
The graph is a collection of named tasks, connected by links that define dependencies between tasks. Updating a task implicitly updates all of its transitive dependencies. When an unfinished task is updated, it executes a user-supplied function and stores the function return value as the task output. Outputs of upstream tasks are automatically passed as inputs to downstream tasks.
-
add_links
(source, targets)¶ Add links between source and targets.
Note that calling
add_links()
multiple times will create multiple, parallel links between tasks.See also
set_links()
,which()
Parameters: - source (hashable object, required) – Name of the task that will act as a data source.
- targets (tuple, or list of tuples, required) – Each (task, input) tuple specifies the target of a link.
Raises: ValueError
– If source or target don’t exist.
-
add_task
(name, fn=None)¶ Add a task to the graph.
This function will raise an exception if the task already exists.
See also
set_task()
- idempotent alternative to
add_task()
.
Parameters: - name (hashable object, required) – Unique label that will identify the task.
- fn (callable, optional) – The fn object will be called whenever the task is executed. It must take two keyword arguments
as parameters, label and inputs. name will contain the unique task name. inputs will
be a dict mapping named inputs to a sequence of outputs returned from upstream tasks.
If
None
(the default),graphcat.common.null()
will be used.
Raises: ValueError
– If label already exists.
-
clear_links
(source, target)¶ Remove links from the graph.
This method will remove all links from source to target.
Parameters: - source (hashable object, required) – Source task name.
- target (hashable object, required) – Target task name.
Raises: ValueError
– If source or task don’t exist.
-
clear_tasks
(names=None)¶ Remove tasks from the graph, along with all related links.
Note that downstream tasks will become unfinished.
Parameters: names ( None
, hashable object, or list|set of hashable objects, optional) – Names identifying the tasks to remove. IfNone
(the default), all tasks are removed, emptying the graph.
-
links
(names=None)¶ Return every link originating with the given names.
Parameters: names ( None
, hashable object, or list|set of hashable objects, optional) – Names identifying the source tasks from which to retrieve links.Returns: links – (source, target, input) tuple for every link in the graph. Return type: list
-
mark_unfinished
(names=None)¶ Set the unfinished state for tasks and all downstream dependents.
Normally, the unfinished state is set automatically when changes are made to the graph. This method is provided for callers who need to set the unfinished state in response to some outside event that the graph isn’t aware of; this should only happen in extremely rare situations.
Parameters: names ( None
, hashable object, or list|set of hashable objects, required) – Task names to be marked as unfinished. IfNone
(the default), the entire graph is marked unfinished.
-
on_changed
¶ Signal emitted whenever a part of the graph becomes unfinished.
Functions invoked by this signal must have the signature fn(graph), where graph is this object.
Returns: signal Return type: blinker.base.Signal
-
on_cycle
¶ Signal emitted if a cycle is detected during updating.
Functions invoked by this signal must have the signature fn(graph, name), where graph is this object.
Returns: signal Return type: blinker.base.Signal
-
on_execute
¶ Signal emitted before a task is executed.
Functions invoked by this signal must have the signature fn(graph, name, input), where graph is this object, name is the name of the task to be executed, and input is a dict containing the task inputs.
Returns: signal Return type: blinker.base.Signal
-
on_failed
¶ Signal emitted when a task fails during execution.
Functions invoked by this signal must have the signature fn(graph, name, exception), where graph is this object, name is the name of the task that failed, and exception is the exception raised by the task.
Returns: signal Return type: blinker.base.Signal
-
on_finished
¶ Signal emitted when a task executes successfully.
Functions invoked by this signal must have the signature fn(graph, name, output), where graph is this object, name is the name of the task that executed successfully, and output is the return value from the task function.
Returns: signal Return type: blinker.base.Signal
-
on_task_renamed
¶ Signal emitted when a task is renamed.
Functions invoked by this signal must have the signature fn(graph, oldname, newname), where graph is this object, oldname is the original name of the task, and newname is its current name.
Returns: signal Return type: blinker.base.Signal
-
on_update
¶ Signal emitted when a task is updated.
Functions invoked by this signal must have the signature fn(graph, name), where graph is this object and name is the name of the task to be updated.
Returns: signal Return type: blinker.base.Signal
-
output
(name)[source]¶ Retrieve the output from a task.
This implicitly updates the graph, so the returned value is guaranteed to be up-to-date.
Parameters: name (hashable object, required) – Unique task name.
Returns: output – The value returned when the task function was last executed, or
None
.Return type: any object
Raises: ValueError
– If name doesn’t exist.Exception
– Any exception raised by a task function will be re-raised byoutput()
.
-
rename_task
(oldname, newname)¶ Change an existing task’s name.
This modifies an existing task’s name and modifies any related links as-necessary. In addition, the task and any downstream dependents will become unfinished.
Parameters: - oldname (hashable object, required) – Existing original task name.
- newname (hashable object, required) – Unique new task name.
Raises: ValueError
– If the task with oldname doesn’t exist, or a task with newname already exists.
-
set_expression
(name, expression, symbols=None)¶ Create a task that evaluates a Python expression, returning its value.
The task will automatically track implicit dependencies that arise from executing the expression.
Parameters: - name (hashable object, required) – Unique name for the new expression task.
- expression (string, required) – Python expression that will be evaluated whenever the task is executed.
- symbols (callable, optional) – Function that returns a Python dict containing symbols that will be
available to the expression when it’s executed. If
None
(the default), thegraphcat.common.builtins()
function will be used, which gives the expression access to graph, name, inputs, and extent objects that match the arguments to a normal task function.
-
set_links
(source, targets)¶ Set links between source and targets.
Note
This function overrides all links from source.
Parameters: - source (hashable object, required) – Name of the task that will act as a data source.
- targets (tuple, or list of tuples, required) – Each (task, input) tuple specifies the target of a link.
Raises: ValueError
– If source or target don’t exist.
-
set_parameter
(target, input, source, value)¶ Create and link a ‘parameter’ task in one step.
Because they’re so ubiquitous, this method simplifies the creation of “parameter” tasks - tasks that return a value for use as a parameter in some other task. It consolidates creating the parameter task and linking it with an existing computational task into one step.
Parameters: - target (hashable object, required) – Name of the task that will use the parameter.
- input (hashable object, required) – Named input that will receive the parameter.
- source (hashable object, required) – Name of the task that will store the parameter.
- value (any Python object, required) – Parameter value.
Raises: ValueError
– If target doesn’t exist.
-
set_task
(name, fn)¶ Add a task to the graph if it doesn’t exist, and set its task function.
Note that this will mark downstream tasks as unfinished.
Parameters: - name (hashable object, required) – Unique name that will identify the task.
- fn (callable, required) – The fn object will be called whenever the task is executed. It must take two keyword arguments as parameters, name and inputs. name will contain the unique task name. inputs will be a dict mapping named inputs to sequences of outputs returned from upstream tasks.
-
state
(name)¶ Return the current state of a task.
Parameters: name (hashable object, required) – Unique name that identifies the task. Returns: state – Enumeration describing the current task state. Return type: graphcat.common.TaskState
Raises: ValueError
– If name doesn’t exist.
-
tasks
()¶ Return the name of every task in the graph.
Returns: tasks – Names for every task in the graph. Return type: set
-
update
(name)[source]¶ Update a task and all its transitive dependencies.
Parameters: name (hashable object, required) – Name identifying the task to be updated.
Raises: ValueError
– If the task with name doesn’t exist.Exception
– Any exception raised by a task function will be re-raised byupdate()
.
-
graphcat.streaming module¶
Implements computational graphs using dynamic dependency analysis and streaming.
-
class
graphcat.streaming.
NamedInputs
(graph, name)[source]¶ Bases:
object
Access named inputs for a graph task.
Parameters: - graph (
StreamingGraph
, required) – Graph containing a task. - name (hashable object, required) – Existing task unique name.
-
get
(name, extent=None, default=None)[source]¶ Return a single input value.
Use this method to return a value when you expect to have either zero or one input that matches name.
Parameters: - name (hashable object, required) – Name of the input value to return.
- extent (hashable object, optional) – Domain object specifying the subset of the input’s value to return.
- default (any Python value, optional) – If an input matching name doesn’t exist, this value will be
returned instead. Defaults to
None
.
Returns: value – The value of input name, or default.
Return type: any Python value
Raises: KeyError
: if more than one input matches name.
-
getall
(name, extent=None)[source]¶ Return multiple input values.
Use this method to return every input value that matches name.
Parameters: - name (hashable object, required) – Name of the input value to return.
- extent (hashable object, optional) – Domain object specifying the subset of each input’s value to return.
Returns: values – Values from every input that matches name. Returns an empty list if there are none.
Return type: list of Python values
-
getone
(name, extent=None)[source]¶ Return a single input value.
Use this method to return a value when you expect to have exactly one input that matches name.
Parameters: - name (hashable object, required) – Name of the input value to return.
- extent (hashable object, optional) – Domain object specifying the subset of each input’s value to return.
Returns: value – The value of input name.
Return type: any Python value
Raises: KeyError
: if more or less than one input matches name.
-
items
()[source]¶ Return names and values for every input attached to this task.
Note
For each (name, value) pair returned by this method, the value is a callable that returns the actual value from the upstream task.
Returns: values – The name and value of every input attached to this task. Return type: sequence of (hashable object, callable) tuples
-
keys
()[source]¶ Return names for every input attached to this task.
Returns: names – The name of every input attached to this task. Note that the same name may appear more than once in the sequence. Return type: sequence of hashable objects
-
values
()[source]¶ Return values for every input attached to this task.
Note
Each value returned by this method is a callable that returns the actual value from the upstream task.
Returns: values – The value of every input attached to this task, in the same order as keys()
.Return type: sequence of callables
- graph (
-
class
graphcat.streaming.
StreamingGraph
[source]¶ Bases:
graphcat.graph.Graph
Manages a dynamic streaming computational graph.
The graph is a collection of named tasks, connected by links that define dependencies between tasks. Updating a task implicitly updates all of its transitive dependencies. When an unfinished task is updated, it executes a user-supplied function and stores the function return value as the task output. Outputs of upstream tasks are automatically passed as inputs to downstream tasks.
-
add_links
(source, targets)¶ Add links between source and targets.
Note that calling
add_links()
multiple times will create multiple, parallel links between tasks.See also
set_links()
,which()
Parameters: - source (hashable object, required) – Name of the task that will act as a data source.
- targets (tuple, or list of tuples, required) – Each (task, input) tuple specifies the target of a link.
Raises: ValueError
– If source or target don’t exist.
-
add_task
(name, fn=None)¶ Add a task to the graph.
This function will raise an exception if the task already exists.
See also
set_task()
- idempotent alternative to
add_task()
.
Parameters: - name (hashable object, required) – Unique label that will identify the task.
- fn (callable, optional) – The fn object will be called whenever the task is executed. It must take two keyword arguments
as parameters, label and inputs. name will contain the unique task name. inputs will
be a dict mapping named inputs to a sequence of outputs returned from upstream tasks.
If
None
(the default),graphcat.common.null()
will be used.
Raises: ValueError
– If label already exists.
-
clear_links
(source, target)¶ Remove links from the graph.
This method will remove all links from source to target.
Parameters: - source (hashable object, required) – Source task name.
- target (hashable object, required) – Target task name.
Raises: ValueError
– If source or task don’t exist.
-
clear_tasks
(names=None)¶ Remove tasks from the graph, along with all related links.
Note that downstream tasks will become unfinished.
Parameters: names ( None
, hashable object, or list|set of hashable objects, optional) – Names identifying the tasks to remove. IfNone
(the default), all tasks are removed, emptying the graph.
-
links
(names=None)¶ Return every link originating with the given names.
Parameters: names ( None
, hashable object, or list|set of hashable objects, optional) – Names identifying the source tasks from which to retrieve links.Returns: links – (source, target, input) tuple for every link in the graph. Return type: list
-
mark_unfinished
(names=None)¶ Set the unfinished state for tasks and all downstream dependents.
Normally, the unfinished state is set automatically when changes are made to the graph. This method is provided for callers who need to set the unfinished state in response to some outside event that the graph isn’t aware of; this should only happen in extremely rare situations.
Parameters: names ( None
, hashable object, or list|set of hashable objects, required) – Task names to be marked as unfinished. IfNone
(the default), the entire graph is marked unfinished.
-
on_changed
¶ Signal emitted whenever a part of the graph becomes unfinished.
Functions invoked by this signal must have the signature fn(graph), where graph is this object.
Returns: signal Return type: blinker.base.Signal
-
on_cycle
¶ Signal emitted if a cycle is detected during updating.
Functions invoked by this signal must have the signature fn(graph, name), where graph is this object.
Returns: signal Return type: blinker.base.Signal
-
on_execute
¶ Signal emitted before a task is executed.
Functions invoked by this signal must have the signature fn(graph, name, input), where graph is this object, name is the name of the task to be executed, and input is a dict containing the task inputs.
Returns: signal Return type: blinker.base.Signal
-
on_failed
¶ Signal emitted when a task fails during execution.
Functions invoked by this signal must have the signature fn(graph, name, exception), where graph is this object, name is the name of the task that failed, and exception is the exception raised by the task.
Returns: signal Return type: blinker.base.Signal
-
on_finished
¶ Signal emitted when a task executes successfully.
Functions invoked by this signal must have the signature fn(graph, name, output), where graph is this object, name is the name of the task that executed successfully, and output is the return value from the task function.
Returns: signal Return type: blinker.base.Signal
-
on_task_renamed
¶ Signal emitted when a task is renamed.
Functions invoked by this signal must have the signature fn(graph, oldname, newname), where graph is this object, oldname is the original name of the task, and newname is its current name.
Returns: signal Return type: blinker.base.Signal
-
on_update
¶ Signal emitted when a task is updated.
Functions invoked by this signal must have the signature fn(graph, name), where graph is this object and name is the name of the task to be updated.
Returns: signal Return type: blinker.base.Signal
-
output
(name, extent=None)[source]¶ Retrieve the output from a task.
This implicitly updates the graph, so the returned value is guaranteed to be up-to-date.
Parameters: - name (hashable object, required) – Unique task name.
- extent (hashable object, optional) – Domain object specifying the subset of the task’s output to return.
Returns: output – The value returned when the task function was last executed, or
None
.Return type: any object
Raises: ValueError
– If name doesn’t exist.Exception
– Any exception raised by a task function will be re-raised byoutput()
.
-
rename_task
(oldname, newname)¶ Change an existing task’s name.
This modifies an existing task’s name and modifies any related links as-necessary. In addition, the task and any downstream dependents will become unfinished.
Parameters: - oldname (hashable object, required) – Existing original task name.
- newname (hashable object, required) – Unique new task name.
Raises: ValueError
– If the task with oldname doesn’t exist, or a task with newname already exists.
-
set_expression
(name, expression, symbols=None)¶ Create a task that evaluates a Python expression, returning its value.
The task will automatically track implicit dependencies that arise from executing the expression.
Parameters: - name (hashable object, required) – Unique name for the new expression task.
- expression (string, required) – Python expression that will be evaluated whenever the task is executed.
- symbols (callable, optional) – Function that returns a Python dict containing symbols that will be
available to the expression when it’s executed. If
None
(the default), thegraphcat.common.builtins()
function will be used, which gives the expression access to graph, name, inputs, and extent objects that match the arguments to a normal task function.
-
set_links
(source, targets)¶ Set links between source and targets.
Note
This function overrides all links from source.
Parameters: - source (hashable object, required) – Name of the task that will act as a data source.
- targets (tuple, or list of tuples, required) – Each (task, input) tuple specifies the target of a link.
Raises: ValueError
– If source or target don’t exist.
-
set_parameter
(target, input, source, value)¶ Create and link a ‘parameter’ task in one step.
Because they’re so ubiquitous, this method simplifies the creation of “parameter” tasks - tasks that return a value for use as a parameter in some other task. It consolidates creating the parameter task and linking it with an existing computational task into one step.
Parameters: - target (hashable object, required) – Name of the task that will use the parameter.
- input (hashable object, required) – Named input that will receive the parameter.
- source (hashable object, required) – Name of the task that will store the parameter.
- value (any Python object, required) – Parameter value.
Raises: ValueError
– If target doesn’t exist.
-
set_task
(name, fn)¶ Add a task to the graph if it doesn’t exist, and set its task function.
Note that this will mark downstream tasks as unfinished.
Parameters: - name (hashable object, required) – Unique name that will identify the task.
- fn (callable, required) – The fn object will be called whenever the task is executed. It must take two keyword arguments as parameters, name and inputs. name will contain the unique task name. inputs will be a dict mapping named inputs to sequences of outputs returned from upstream tasks.
-
state
(name)¶ Return the current state of a task.
Parameters: name (hashable object, required) – Unique name that identifies the task. Returns: state – Enumeration describing the current task state. Return type: graphcat.common.TaskState
Raises: ValueError
– If name doesn’t exist.
-
tasks
()¶ Return the name of every task in the graph.
Returns: tasks – Names for every task in the graph. Return type: set
-
update
(name, extent=None)[source]¶ Update a task and all of its transitive dependencies.
Parameters: - name (hashable object, required) – Name identifying the task to be updated.
- extent (hashable object, optional) – Domain object specifying the subset of the task’s output to compute.
Raises: ValueError
– If the task with name doesn’t exist.Exception
– Any exception raised by a task function will be re-raised byupdate()
.
-

Compatibility¶
A quick disclaimer on backwards-compatibility for Graphcat users:
Graphcat follows the Semantic Versioning standard for assigning version numbers in a way that has specific meaning. Graphcat version numbers follow <major>.<minor>.<patch> numbering. Releases with different major API numbers are API incompatible. Minor version numbers signify new features that are backwards-compatible with the current major version. Patch numbers indicate bug fixes and documentation changes that are backwards-compatible with the current major and minor version.

Release Notes¶
Graphcat 1.0.5 - November 20, 2022¶
- Cleanup and organize documentation.
- Add numpy and pygraphviz as optional dependencies.
Graphcat 1.0.4 - November 18, 2022¶
- Reduced the amount of boilerplate for optional functionality.
- Switched to pyproject.toml for packaging.
- Switched to flit for building.
- Minimum Python version is 3.8, due to upstream changes.
- Began testing with Python 3.11.
- Added optional dependencies for documentation and testing.
Graphcat 1.0.3 - October 21, 2021¶
- Added a diagram filter for hiding “parameter” nodes.
- Added Python 3.10 to the CI build.
- Updated the way we collect code coverage data.
- Switched from Zulip to Github Discussions for support.
Graphcat 1.0.2 - October 13, 2021¶
- Switched from Travis-CI to Github Actions for regression tests.
- Organized and streamlined the documentation.
Graphcat 1.0.1 - March 1, 2021¶
- Many documentation updates.
- Improve diagram edge label layout.
Graphcat 1.0.0 - February 2, 2021¶
- First stable release of the Graphcat API!
Graphcat 0.13.0 - January 16, 2021¶
- Fix a bug marking failed tasks in static graphs.
- Suppress unnecessary updates using graphcat.passthrough(), graphcat.delay(), and graphcat.raise_exception().
- Improve consistency throughout the regression test suite.
- Make it easier to display customized graph diagrams.
- Static graphs emit the on_cycle signal when a cycle is detected.
- Expose standard task function arguments in expressions, but give domain developers the ability to override or remove them.
- Expression tasks sometimes create redundant implicit dependencies.
- Deprecate graphcat.execute() in favor of graphcat.evaluate().
Graphcat 0.12.0 - December 19, 2020¶
- Expose the rankdir attribute when drawing graph diagrams.
- Alter graph diagram appearance based on graph type.
- Added a “User Guide” section to the documentation.
- Added graph.streaming.StreamingGraph.
- Greatly reduced code duplication among graph types.
- Calls to set_task() only mark the task unfinished if the new callable compares unequal to the old.
- Add support for visualizing performance data in graph diagrams.
Graphcat 0.11.0 - December 13, 2020¶
- Cycles are detected during dynamic graph updates.
- A new signal notifies callers when cycles occur.
- Static and dynamic graphs behave consistently when tasks fail.
- Moved graph drawing into a separate module, so callers can customize graph diagrams.
- Added graphcat.common.consume task function, for debugging dynamic graphs.
- Made pygraphviz an optional dependency, instead of required.
- Missing optional dependencies cause runtime failures, instead of failures at import.
Graphcat 0.10.0 - December 3, 2020¶
- Introduced graphcat.DynamicGraph, which executes a computational graph with dynamic dependency checking.
- Introduced NamedInputs helpers to provide a cleaner / more consistent API for accessing task inputs.
Graphcat 0.9.0 - November 30, 2020¶
- Deprecated graphcat.Graph, and added graphcat.StaticGraph instead.
Graphcat 0.8.0 - November 23, 2020¶
- Added graphcat.PerformanceMonitor for evaluating task performance.
- graphcat.notebook.display() can optionally hide nodes that meet some criteria.
- Corrected typos in setup.py and release-notes.rst.
Graphcat 0.7.0 - November 10, 2020¶
- Breaking change: pass the graph as a parameter for task functions.
- Deprecated the graphcat.AutomaticDependencies class in favor of the graphcat.automatic_dependencies function decorator.
Graphcat 0.6.0 - November 8, 2020¶
- Update dependencies every time an expression task executes.
- Handle automatic dependency tracking for tasks that are renamed.
- Deprecated graphcat.Graph.move_task() in favor of graphcat.Graph.rename_task().
Graphcat 0.5.0 - November 2, 2020¶
- Make graphcat.notebook.display() output diagrams more compact.
- Add API to test whether the graph contains a task with a given name.
- Deprecated graphcat.ExpressionTask in favor of graphcat.Graph.set_expression().
- Clarify the graphcat.clear_links() documentation.
- Added graphcat.passthrough() for temporarily disabling tasks.
Graphcat 0.4.0 - October 15, 2020¶
- Added graphcat.Graph.clear_links().
- graphcat.Graph.output() and graphcat.Graph.update() re-raise exceptions thrown by task functions.
- Allow parallel links between tasks.
- Deprecated graphcat.VariableTask.
- Added graphcat.Graph.set_parameter().
Graphcat 0.3.0 - October 11, 2020¶
- Emit a signal when the graph is changed.
- Added an image processing use-case to the documentation.
- Refactor the API and deprecate add_relationship(), relabel_task(), remove_relationship(), remove_task(), set_input(), and set_task_fn().
Graphcat 0.2.0 - October 7, 2020¶
- Fixed missing dependencies.
- Minor documentation tweaks.
Graphcat 0.1.0 - October 6, 2020¶
- Initial Release.

Support¶
The Graphcat documentation:
Visit our GitHub repository for access to source code, test results, issue tracker, and the wiki:
Our coverage statistics are updated automatically when modifications are committed:
For Graphcat questions, comments, or suggestions, get in touch with the team at:
Otherwise, you can contact Tim directly:
- Timothy M. Shead - tim@shead-custom-design