Code can be found at the end of this post.
A colleague recently introduced me to Luigi, an open-source project started in 2014 by Spotify. I was trying out different pipeline management
frameworks like kedro, mlflow, pipelinx, and a few other popular ones and none had what I needed. Luigi for many reasons was the only thing I’ve
found that provides both extreme flexibility while still constraining a fairly strict pattern of design that is maintainable, re-usable, and intuitive.
Luigi’s simple premise is:
If a task produces an output(returns non-None from Task.output()
), and if that output exists we do not run the task,
if the output does not exist we run the task to generate the output.
Early on I found myself wanting to be able to force a task to run every time, or sometimes, but for that to be a user defined action and not
for it to be solely defined by the default behaviors of the luigi.Task
class.
A quick example, so we have this task structure of 5 tasks where:
-
Task C
depend onTask A2
which depends onTask A1
-
Task C
depends onTask B2
which depends onTask B1
[Task A1] [Task B1]
| |
| |
[Task A2] [Task B2]
\ /
\ /
\ / # Wrapper task just means that it produces no output
[Wrapper Task C] # meaning Task C always runs
With the generic luigi task if I run task C, it will first run A1 and B1 then A2 and B2, then finally C.
If we run Task C a second time, since the outputs of all the tasks exist, none of the tasks will run.
So we run the pipeline once.
Start: 1st Run: 2nd Run:
[Task A1] [Task B1] [Task A1]* [Task B1]* [Task A1] [Task B1]
| | | | | |
| | | | | |
[Task A2] [Task B2] [Task A2]* [Task B2]* [Task A2]@ [Task B2]@
\ / \ / \ /
\ / \ / \ /
\ / \ / \ /
[Wrapper Task C] [Wrapper Task C]* [Wrapper Task C]*
* denotes task ran successfully
@ denotes output for task already exists so task not run in the 2nd run
So now if we want to run Wrapper Task C
again luigi will first need Task A2
and Task B2
but since the outputs for these two tasks exist already luigi
does not run them and uses the outputs that already exist. Luigi won’t even check if the outputs for Task A1
or Task B1
exist in this scenario as it stops
traversing down the tree the moment it has everything it needs to run Task C.
Often we want to rerun a task, and re-generate the output even if the output exists. A few reasons would be:
- The data has been updated
- The processing code has been changed
- Output targets have been corrupted by other processes (shouldn’t happen if we treat outputs as immutable)
There are probably many other reasons you may find yourself needing the ability to force a rerun.
One method of doing this is to have your task override the complete()
function. If complete()
returns false
then
that task will be run even if the output exists. If we don’t need manual control in forcing a run and we have a specific case
in which we want the task to rerun and to regenerate the output then we can programmatically do this check in our overide of complete()
.
Though this approach made sense at first, a few problems quickly emerged:
- Means that the task was no longer isolated as it had an external check which modifies task behavior. Could easily lead to a mess.
- Has to be individually implemented for each task that needs to be able to rerun.
- What if a upstream dependency needs to be updated.
- For example, in the earlier diagrams what if
Task A1
has an external datasource that has been updated. Downstream tasks will have a difficult time knowing if upstream tasks need to be re-run, and though it can be implemented, to me it seems like a pretty messy and potentially problematic solution.
- For example, in the earlier diagrams what if
The better option, at least the solution that made the most sense to me was to control forcible re-running manually through luigi’s built in configuration files.
I created an extension of the luigi.Task
called ForcibleTask
which accepted 3 parameters:
force: bool (default false)
- When true forces the task to run
force_upstream: bool (default false)
- When true forces task and all upstream ForcibleTasks to run
lock: bool (default false)
- When true locks the task from being forced by a downstream tasks.
The goal here was that if none of these parameters were defined the task would behave as a regular luigi task, but
if defined provide ability to manually force run behavior.
I can use the luigi configuration file, which is built into luigi. The config system is nice because
we don’t need to define these parameters when we run a task, instead we can define them in a config file which is just an .ini
file, below is a
small example of how that can look:
ForcibleTask behaviors
[TaskA1]* [TaskB1]*
| |
| |
[TaskA2]* [TaskB2]*
\ /
\ /
\ / # Wrapper task just means that it produces no output
[Wrapper Task C] # meaning Task C always runs
* denotes task ran successfully
Going back to our original sample suppose TaskA1
, TaskA2
, TaskB1
, and TaskB2
all extend this new ForcibleTask
class.
Example 1
### example.cfg file ###
[TaskA1]
; we don't need to define any of the 3 parameters
; we don't even need to define this config section since it is empty
; in this example A1, B1, and B2 will behave as a normal luigi task
[TaskA2]
force = True
[TaskB1]
[TaskB2]
In this example we’ve already run our pipeline, meaning our outputs from A1
, A2
, B1
, and B2
all exist and
luigi’s default behavior would be to not run them if we run the pipeline again.
Since now these extend ForcibleTask
the behavior of running our pipeline again will be different using the above config.
TaskA2
will be forced to run and re-generate the outputs.
TaskA1
,TaskB1
, and TaskB2
will not run because they were not configured to be forced and their outputs already exist. If their outputs didn’t exist
then they would still run as they normally would.
Example 2
### example.cfg file ###
[TaskA2]
force_upstream = True
Using this config we not only force TaskA2
to run, but we also force all upstream tasks to run. In this scenario both
TaskA1
and TaskA2
will be re-run.
Example 3
### example.cfg file ###
[TaskA2]
force_upstream = True
[TaskA1]
lock = True
Sometimes we have tasks that do important things and we want to be really careful about not accidentally forcing them to re-run.
An example from my work is with tasks that create/modify/remove database content and I wan’t to be really careful with managing when they are run.
Unlike the previous example in this case only TaskA2
will run, TaskA1
is locked and will not be forced by force_upstream
.
Example 4
### example.cfg file ###
[TaskA1]
force = True
What happens in this scenario?
Well in this scenario since TaskA2
is not forced, it is not run nor does it even check if TaskA1
is complete. Because
of this TaskA1
won’t be run nor will any of the other A
or B
tasks.