# Copyright 2020 Timothy M. Shead
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper classes and functions that can be used with more than one graph type.
"""
import collections
import enum
import functools
import logging
import time
import warnings
import networkx
import graphcat.optional
import graphcat.require
numpy = graphcat.optional.module("numpy")
log = logging.getLogger(__name__)
[docs]class Array(object):
"""Task function callable that returns a caller-supplied array.
Parameters
----------
value: :class:`numpy.ndarray` convertable object, required
The array to use as the output for this callable.
See Also
--------
:func:`array` - factory function for :class:`Array` instances.
"""
def __init__(self, value):
self._value = value
def __call__(self, graph, name, inputs, extent=None):
return self._value[extent] if extent is not None else self._value
def __eq__(self, other):
return type(self) is type(other) and numpy.all(self._value == other._value)
[docs]class ArrayExtent(object):
"""Helper for creating :class:`Array` compatible streaming extents.
To generate extents, use any numpy-compatible
`indexing notation <https://numpy.org/doc/stable/reference/arrays.indexing.html>`_::
extent = ArrayExtent[0:4096]
extent = ArrayExtent[::2]
extent = ArrayExtent[:, 0]
...
These extents are compatible with :class:`Array`.
"""
def __class_getitem__(cls, key):
return key
[docs]class Constant(object):
"""Task function callable that returns a caller-supplied value.
Parameters
----------
value: any Python object, required
The value to use as the output for this callable.
See Also
--------
:func:`constant` - factory function for :class:`Constant` instances.
"""
def __init__(self, value):
self._value = value
def __call__(self, graph, name, inputs, extent=None):
return self._value
def __eq__(self, other):
return type(self) is type(other) and self._value == other._value
[docs]class Delay(object):
"""Task function callable that sleeps for a fixed time.
This is mainly useful for testing and debugging.
Parameters
----------
seconds: number, required
The number of seconds to sleep when executed.
See Also
--------
:func:`delay` - factory function for :class:`Delay` instances.
"""
def __init__(self, seconds):
self._seconds = seconds
def __call__(self, graph, name, inputs, extent=None):
time.sleep(self._seconds)
def __eq__(self, other):
return type(self) is type(other) and self._seconds == other._seconds
[docs]class DeprecationWarning(Warning):
"""Warning category for deprecated code."""
pass
[docs]class Logger(object):
"""Log graph events.
Create a :class:`Logger` object to see the behavior of
the graph during updates, using the Python :mod:`logging`
module::
logger = graphcat.Logger(graph)
This is useful for debugging and pedagogy. The logger will
generate output for five types of event:
* cycle - a cycle is detected during updating.
* executed - a task is executed.
* failed - a task raises an exception during execution.
* finished - a task executes successfully.
* updated - a task is updated.
Update events happen regardless of the state of a task. Execute events
only happen if the task isn't already finished. Failed and finished events
only happen if a task is executed.
Parameters
----------
graph: class:`graphcat.graph.Graph`, required
The graph whose events will be logged.
"""
def __init__(self, graph, log_exceptions=True, log_inputs=True, log_outputs=True, log_extents=True, log=log):
self._log_exceptions = log_exceptions
self._log_inputs = log_inputs
self._log_outputs = log_outputs
self._log_extents = log_extents
self._log = log
graph.on_cycle.connect(self.on_cycle)
graph.on_execute.connect(self.on_execute)
graph.on_failed.connect(self.on_failed)
graph.on_finished.connect(self.on_finished)
graph.on_update.connect(self.on_update)
[docs] def on_cycle(self, graph, name):
"""Called when a cycle is detected."""
self._log.info(f"Task {name} cycle detected.")
[docs] def on_execute(self, graph, name, inputs, extent=None):
"""Called when a task is executed."""
message = f"Task {name} executing."
if self._log_inputs:
message += f" Inputs: {inputs}"
if self._log_extents and graph.is_streaming:
message += f" Extent: {extent}"
self._log.info(message)
[docs] def on_failed(self, graph, name, exception):
"""Called when a task raises an exception during execution."""
if self._log_exceptions:
self._log.error(f"Task {name} failed. Exception: {exception}")
else:
self._log.error(f"Task {name} failed.")
[docs] def on_finished(self, graph, name, output):
"""Called when a task has executed sucessfully."""
if self._log_outputs:
self._log.info(f"Task {name} finished. Output: {output}")
else:
self._log.info(f"Task {name} finished.")
[docs] def on_update(self, graph, name):
"""Called when a task is updated."""
message = f"Task {name} updating."
self._log.info(message)
[docs]class Passthrough(object):
"""Task function callable that always returns an upstream input.
Parameters
----------
input: hashable object, required
Name of the input to be returned when this task is executed. Note that
there must be exactly one connection to the named input, or an
exception will be raised.
See Also
--------
:func:`passthrough` - factory function for :class:`Passthrough` instances.
"""
def __init__(self, input):
self._input = input
def __call__(self, graph, name, inputs, extent=None):
return inputs.getone(self._input)
def __eq__(self, other):
return type(self) is type(other) and self._input == other._input
[docs]class RaiseException(object):
"""Task function callable that raises an exception when executed.
This is mainly useful for testing and debugging.
Parameters
----------
exception: :class:`Exception`, required
The exception to be raised when the task is executed.
See Also
--------
:func:`raise_exception` - factory function for :class:`RaiseException` instances.
"""
def __init__(self, exception):
self._exception = exception
def __call__(self, graph, name, inputs, extent=None):
raise self._exception
def __eq__(self, other):
return type(self) is type(other) and self._exception == other._exception
[docs]class TaskState(enum.Enum):
"""Enumerates :class:`graphcat.graph.Graph` task states.
Every task within a :class:`graphcat.graph.Graph` will always be in one of
the following states.
See Also
--------
:meth:`graphcat.graph.Graph.state` - returns the state of a task.
"""
UNFINISHED = 1
"""The task is out-of-date and will be executed during the next update."""
FAILED = 2
"""The task or one of it's dependencies failed during the last update."""
FINISHED = 3
"""The task executed successfully during the last update."""
[docs]class UpdatedTasks(object):
"""Maintains a list of graph tasks that have been updated.
Parameters
----------
graph: :class:`graphcat.graph.Graph`, required
Graph to watch for task updates.
"""
def __init__(self, graph):
self._tasks = set()
graph.on_update.connect(self._on_update)
def _on_update(self, graph, name):
self._tasks.add(name)
@property
def tasks(self):
"""Graph tasks that have received updates since this object was created.
Returns
-------
tasks: :class:`set`
Python :class:`set` containing the name of every task that has been
updated.
"""
return self._tasks
graphcat.require.loaded_module("numpy")
[docs]def array(value):
"""Factory for task functions that return array values when executed.
Note
----
This callable is designed to be compatible with :class:`ArrayExtent` extents
when used in a :class:`graphcat.streaming.StreamingGraph`.
Parameters
----------
value: :class:`numpy.ndarray`-convertable value, required
The array to return when the task is executed.
Returns
-------
fn: :class:`Array`
Task function that will always return `value` when executed.
"""
return Array(value)
[docs]def automatic_dependencies(fn):
"""Function decorator that automatically tracks dependencies.
Use this to decorate task functions that need dependency tracking, such as
:func:`evaluate`.
See Also
--------
:meth:`graphcat.graph.Graph.set_expression`
Convenience method that configures a task to evaluate expressions and
automatically track dependencies.
"""
@functools.wraps(fn)
def implementation(graph, name, inputs, extent=None):
# Remove old implicit dependencies.
edges = list(graph._graph.out_edges(name, data="input", keys=True))
for target, source, key, input in edges:
if input == Input.IMPLICIT:
graph._graph.remove_edge(target, source, key)
# Keep track of all dependencies while the task executes.
updated = UpdatedTasks(graph)
result = fn(graph, name, inputs, extent)
# Filter out dependencies that are already explicitly captured.
dependencies = updated.tasks
dependencies = dependencies.difference(networkx.descendants(graph._graph, name))
dependencies = dependencies.difference([name])
# Create new implicit dependencies with what remains.
for source in dependencies:
graph._graph.add_edge(name, source, input=Input.IMPLICIT)
return result
return implementation
[docs]def builtins(graph, name, inputs, extent=None):
"""Returns standard builtin symbols for expression tasks.
See Also
--------
:func:`evaluate` - uses :func:`builtins` as the default for the `symbols` argument.
"""
return {
"graph": graph,
"name": name,
"inputs": inputs,
"extent": extent,
}
[docs]def constant(value):
"""Factory for task functions that return constant values when executed.
This is useful when creating a task that will act as a parameter
for a downstream task::
graph.add_task("theta", constant(math.pi))
To change the parameter later, use :func:`constant` again, with
:meth:`Graph.set_task_fn` to specify a new function::
graph.set_task_fn("theta", constant(math.pi / 2))
Parameters
----------
value: any value, required
The value to return when the task is executed.
Returns
-------
fn: :class:`Constant`
Task function that will always return `value` when executed.
"""
return Constant(value)
[docs]def consume(graph, name, inputs, extent=None):
"""Task function that retrieves all its inputs, but otherwise does nothing.
This is mainly useful for debugging :class:`dynamic graphs<graphcat.dynamic.DynamicGraph>`,
since the default :func:`null` task function won't execute upstream nodes.
"""
values = [value() for value in inputs.values()]
[docs]def delay(seconds):
"""Factory for task functions that sleep for a fixed time.
This is mainly useful for testing and debugging.
Parameters
----------
seconds: number, required
Number of seconds to sleep when executed.
Returns
-------
fn: function
Task function that will always sleep for `seconds` when executed.
"""
return Delay(seconds)
[docs]def evaluate(code, symbols=None):
"""Factory for task functions that evaluate Python expressions.
If your expressions can access the output from other tasks in the graph,
you will want to use this function with the :func:`automatic_dependencies`
decorator, or use :meth:`graphcat.graph.Graph.set_expression` which sets up dependency
tracking for you.
See Also
--------
:meth:`graphcat.graph.Graph.set_expression`
Convenience method that configures a task to evaluate expressions and
automatically track dependencies.
Parameters
----------
code: string, required
Python code to be executed when the task is executed.
symbols: callable, optional
Function that returns a Python dict containing symbols that will be
available to the expression when it's executed. If :any:`None` (the
default), the :func:`builtins` function will be used, which gives the
expression access to the same `graph`, `name`, `inputs`, and `extent`
objects as a normal task function.
Returns
-------
fn: function
Task function that will execute Python code when the
task is executed.
"""
if symbols is None:
symbols = builtins
def implementation(graph, name, inputs, extent=None):
try:
return eval(code, {}, symbols(graph, name, inputs, extent))
except Exception as e: # pragma: no cover
raise RuntimeError(f"Uncaught exception executing expression {code!r}: {e}")
return implementation
[docs]def null(graph, name, inputs, extent=None):
"""Task function that does nothing.
This is the default if you don't specify a function for
:meth:`Graph.add_task` or :meth:`Graph.set_task_fn`, and is useful in
debugging and pedagogy.
"""
pass
[docs]def passthrough(input=None):
"""Factory for task functions that pass-through incoming data.
Callers can use this function to temporarily bypass tasks in the graph.
Parameters
----------
input: hashable object, required
The named input that will pass-through to the task output.
Returns
-------
fn: function
Task function that will pass the input value named `input` to its output.
"""
return Passthrough(input)
[docs]def raise_exception(exception):
"""Factory for task functions that raise an exception when executed.
This is mainly useful for testing and debugging.
Parameters
----------
exception: :class:`BaseException` derivative, required
The exception to raise when the task is executed.
Returns
-------
fn: function
Task function that will always raise `exception` when executed.
"""
return RaiseException(exception)