Usage¶
Yenta can be used as a library or invoked from the command line.
Library¶
It is straightforward to use Yenta as a library.
from yenta.tasks import task
from yenta.pipeline import Pipeline, TaskResult
from datetime import datetime
@task
def foo():
return {'values': {'x': 1}}
@task
def bar():
return {'values': {'y': 2}, 'artifacts': {}}
@task(depends_on['foo', 'bar'])
def baz(u: 'foo__values__x', v: 'bar__values__y'):
with open('baz_results', 'w') as f:
f.write(u + v)
return TaskResult(values={'sum': u + v},
artifacts={'sum_file': FileArtifact(location='baz_results',
date_created=str(datetime.now()))})
pipeline = Pipeline(foo, bar, baz)
result = pipeline.run()
print(result.values('baz', 'sum'))
>>> 3
with open('baz_results', 'r') as f:
results = f.read()
print(results)
>>> 3
Task Signatures¶
Tasks are defined by decorating functions with the @task decorator. The decorator optionally accepts an
arguments called depends_on which lists the names of the tasks on which that task depends. Above, the
tasks foo and bar do not depend on anything, while the task baz depends on both of them.
The signature of a task function is a key feature of Yenta. Tasks can receive their arguments using three different
strategies: they can either receive the results of task execution of their dependencies as one state blob, they
can select slices of the state via annotations, or they can select slices of state via selector functions. In the
above example, the baz task takes as its arguments two values, u and v, which are annotated
as paths into the state. The format of the annotation is:
'<task_name>__<values|artifacts>__<value_name|artifact_name>'
Thus, in the above example, the argument u will take the value named x from the result obtained by
executing task foo, and likewise the v argument will take the value named y obtained by
executing task bar. Note that the names in the annotations refer to the output names in the
TaskResults returned by the dependencies; they do not have to match the names of the parameters of the task
being executed. Note further the use of double-underbars as a separator, which should be familiar to anyone
who has worked with Django. If we wanted to instead get the entire state slice corresponding to the results of the
two dependencies of baz, we could have written:
@task(depends_on['foo', 'bar'])
def baz(past_results: PipelineResult):
u = past_results.values('foo', 'x')
v = past_results.values('bar', 'y')
with open('baz_results', 'w') as f:
f.write(u + v)
return TaskResult(values={'sum': u + v},
artifacts={'sum_file': FileArtifact(location='baz_results',
date_created=str(datetime.now()))})
Currently the two parameter styles cannot be mixed, so you must choose at task definition time whether you want
some specific slice of the previous state or the whole thing. If we had a fourth task that depended on baz,
we could retrieve, say, the baz artifact like so:
@task(depends_on['baz'])
def glorp(artifact: 'baz__artifacts__sum_file'):
# do whatever
or equivalently
@task(depends_on['baz'])
def glorp(past_results):
artifact = past_results.artifacts('baz', 'sum_file')
# do whatever
Warning
A task will only receive those slices of state which are indicated as part of its dependency chain. If you want state for a given task, your downstream task must have that other task as a dependency.
Finally, you can also use selector functions to select pieces of state and possibly so something with them before passing them as arguments to the downstream task. To see how this is accomplished, consider the following snippet:
@task
def foo() -> TaskResult:
return TaskResult({'x': [1, 2, 3]})
@task
def bar():
return TaskResult({'y': [4, 5, 6]})
def foo_x_selector(result: PipelineResult):
return sum(result.values('foo', 'x'))
def bar_y_selector(result: PipelineResult):
return sum(result.values('bar', 'y'))
@task(depends_on=['foo', 'bar'], selectors={'x': foo_x_selector, 'y': bar_y_selector})
def baz(x, y):
sum_x_y = x + y
return TaskResult({'sum': sum_x_y})
pipeline = Pipeline(foo, bar, baz)
result = pipeline.run()
print(result.values('baz', 'sum'))
>>> 21
The selectors argument to the task decorator above is a dictionary whose keys are parameter names on the
receiving task and whose values are functions which are to be called on the previous state. These functions receive
the previous state in the form of a PipelineResult object and can return any value
at all. As the above example demonstrates, selectors can optionally perform some operations on the slice of state
they extract.
Warning
Selectors must be pure functions, i.e. they must not modify the state. If supplied, selectors will take precedence over annotations. Although you can do meaningful work in the selector function, you mostly should not; the purpose of selectors is to reshape the state into the form expected by the downstream task, but of course you can always do that inside the task anyway.
Return Values¶
Tasks can return their results in two ways, both of which are shown above. The first way is as a simple dictionary
whose keys are the names of the returned values, and whose values are… the values. Each value must have its own
name in the result set. A second way is to return the task result directly via a
TaskResult object. In general, the second way is preferable since it is the most
explicit; however, under the hood, Yenta transforms the first format into
TaskResult anyway.
The results of a Yenta task come in two flavors: values and Artifacts. A value is any basic Python value that is
computed during the execution of the task and should be returned to the pipeline. Any Python object that can be pickled
can be a value. Artifacts represent any modifications to external stores that might be created by the task; an example
(currently the only example) of an Artifact is the FileArtifact, which represents an
external file generated during the task execution.
Warning
Values must be picklable by Python. The usual caveats about unpickling untrusted code apply. In the previous version of Yenta, you could only use JSON-serializable values, but that restriction has been lifted.
Caching TaskResults and “Functional” Pipelines¶
The first time that Yenta runs, it will execute every task and, assuming task execution succeeds, serialize the
results to the directory indicated by YENTA_STORE_PATH. If you run the pipeline
a second time, the graphical output will show a yellow bar next to the task names, indicating that the previous
results of the run have been reused. This is a key feature of Yenta.
In a loose sense, every Yenta task can be thought of as the reducer from Redux: the job of a task is to take the state, some parameters, and produce the next state. If the input to the task is identical to the input it received the last time it was run, then the task should produce the same output, and Yenta assumes that it does. Therefore, in such cases Yenta will simply pass the previous results to the next stage of the pipeline without invoking the task. This is, more or less, a flavor of referential transparency, with the caveat that “the state” of the pipeline includes any external artifacts that are generated by the tasks but which are not themselves “stored in” the pipeline cache.
Obviously, some tasks will not fit this paradigm. One example is any task that relies on random numbers, unless care is taken to explicitly reuse the same seed each time the task is run. Another issue where you might need to take extra care is floating point computations, which, depending on the precise software doing the math and configuration thereof may not be deterministically rounded the same way each time.
Named Pipelines¶
As of version 0.3.0, Yenta supports the use of multiple pipelines in a single project, which can be distinguished by
their name parameter, specified at creation, e.g. pipeline = Pipeline(*tasks, name='my_pipeline').
If you use a single pipeline, by default it will have the name default, but you can use as many pipelines as you
like and they will all operate independently of each other. Task dependency between pipelines is not currently
supported.
Command Line Usage¶
Yenta may also be invoked via the command line script yenta, which takes a number of commands and options.
Usage: yenta [OPTIONS] COMMAND [ARGS]...
Options:
--config-file PATH The config file from which to read settings.
--pipeline PATH The file to which the pipeline will be cached.
--entry-point PATH The file containing the task definitions.
--log-file PATH The file to which the logs should be written.
--help Show this message and exit.
Commands:
dump-task-graph Dump the task graph to a file; requires Matplotlib.
list-tasks List all available tasks.
rm Remove a task from the pipeline cache.
run Run the pipeline.
show-config Show the current configuration.
task-info Show information about a specific task.
Most of these options are self-explanatory. The most important one is the --entry-point option, which tells
Yenta where to find your task definitions. Currently, all task definitions must reside in a single file.
Warning
Removing a task from the cache only removes its results; if the task generated any artifacts, they will not be removed.