Skip to contents

Warning

This package:

  • is a work-in-progress, proof-of-concept;
  • is unstable in both it’s API and output;
  • may not fully fledge past the prototype stage.

Background

The pipeline package’s aim is to makes it easy for users to adopt a functional approach to their analysis workflows. Pipelines are defined using a reduced subset of R and, when run, use an intelligent approach to work planning and object caching. It was motivated by a desire to reduce the boilerplate I was writing with my R scripts and associated Makefiles. It can be thought of a simpler, but less capable, alternative to targets. Whilst the goal is to be “good enough” for many pieces of analysis, by encouraging a functional approach, it remains easy to move to more advanced alternatives.

Scope

Goals:

  • 🗸 Intelligent caching to reduce unnecessary execution.
  • 🗸 Configuration files using a reduced subset of base R as their syntax.
  • 🗸 Simple and efficient approach to managing multiple scenarios.

Out of scope:

  • 🗴 HPC orchestration.
  • 🗴 Automatic parallelisation of the pipeline.

A motivating example

The easiest way to get started with the package is to look at some of the included examples. pipeline_skeleton() can be used to set up a pipeline based on the mtcars data set and across single or multiple scenarios. We start with the single scenario example which has been carefully constructed to illustrate the different functionality that pipeline has.

library(pipeline)
dir <- pipeline_skeleton(dir = tempfile(), single = TRUE)
#> ✔ Template pipeline created in '/tmp/Rtmp7aN1tO/file539d1d2fbb5a'.

We now have the following in dir:

#> /tmp/Rtmp7aN1tO/file539d1d2fbb5a
#> ├── R
#> │   ├── load_dat.R
#> │   ├── plot_dat.R
#> │   └── wrangle_dat.R
#> ├── config.R
#> ├── data
#> │   └── mtcars.csv
#> └── pipeline.R

The pipeline we are going to run is described in pipeline.R:

pipeline.R
out <- pipeline_run({
    raw   <- load_dat(CONFIG$in_csv)
    clean <- wrangle_dat(raw, CONFIG$rows)
    plot  <- plot_dat(clean, CONFIG$out_plot)
})

pipeline_run() takes an expression where each assignment it contains represents a stage of the analysis. There are a few things to note about the expression syntax:

  • The left-hand-side (target) of each assignment must be uniquely named.
  • Function calls on the right-hand-side (rhs) must be defined in the function directory R, contained in the base R namespace or, if present in another package, be explicitly namespaced (i.e. package::fun).
  • The target names cannot match the names of functions in base R, or those contained in the function directory.
  • The order of assignments is not important as the order of orchestration is determined by pipeline_run().
  • Values from the configuration file config.R are accessed by the special CONFIG value.

The configuration file

Configuration files are simply R scripts that are evaluated in a dedicated environment with some allowances for multiple scenarios that we discuss at the end.

Within this environment we enable two additional special functions INFILE and OUTFILE:

  • INFILE() marks a file as an input. This ensures that pipeline_run() knows to keep a hash of the object and to monitor it for future changes when the pipeline is rerun.

  • OUTFILE() marks a file name as an output. This ensures that outputs from different scenarios (see later) do not overwrite one another (output files get saved in a scenario-dependent directory). It also ensures that output files are recreated if they are deleted.

In our example we have the following configuration file:

config.r
in_csv   <- INFILE("data/mtcars.csv")
out_plot <-  OUTFILE("plot.png")
rows     <-  10

Here in_csv points to our input data and out_plot to a png output file.

The R directory

This is the directory where users define functions that represent stages of their analysis (e.g. cleaning data, modelling, plotting, …). Here we have:

R/load_dat.R
load_dat <- function(file) {
    # Note: If function not in the base namespace or function directory
    #       we must explictly namespace the call using `::`.
    utils::read.csv(file)
}

This file contains a single function which takes the filename of raw data as input before doing a little wrangling and returning the output. Note the comment about namespacing functions; anything not in the base R namespace or the function directory needs to be explicitly namespaced.

Next we have R/wrangle_dat.R:

R/wrangle_dat.R
wrangle_dat <- function(dat, rows) {
    stopifnot(is.data.frame(dat), nrow(dat) >= rows, rows >= 0)
    head(dat, rows)
}

This function curtails the number of rows of data. Here we also make use of assertions in the function, checking that dat is a data frame and rows is between 0 and the number of rows in the data frame (inclusive). Assertions like these can be thought of unit tests for your analysis. Whilst here we have them at the start of the function to check inputs, they can be used anywhere in your function to ensure things are running as expected. If an assertion fails it will stop the pipeline running, bringing your attention to any issue at hand.

Finally there is

R/plot_dat.R
plot_dat <- function(dat, outfile) {
    grDevices::png(outfile)
    plot(factor(dat$cyl), dat$mpg, xlab = "cyl", ylab = "mpg")
    grDevices::dev.off()
    # return name of where image is saved
    outfile
}

This function takes the manipulated data as input and creates a plot which is saved as a png. When saving output to disk, we recommend that the function returns the location where the file is saved for later use.

Putting it together

Running the pipeline we see the following

original_directory <- setwd(dir)
out <- pipeline_run({
    raw   <- load_dat(CONFIG$in_csv)
    clean <- wrangle_dat(raw, CONFIG$rows)
    plot  <- plot_dat(clean, CONFIG$out_plot)
})
#>
#> ── Starting pipeline with `default` configuration ──────────────────────────────
#> ℹ Running function to generate object `raw`.
#> ℹ Saving `raw` to disk.
#> ℹ Running function to generate object `clean`.
#> ℹ Saving `clean` to disk.
#> ℹ Running function to generate object `plot`.
#> ℹ Saving `plot` to disk.

Each target output is saved within a single list that we have allocated to out. The name of the list is configurable but, by default, is ‘default’.

str(out$default$raw)
#> 'data.frame':    32 obs. of  12 variables:
#>  $ car : chr  "Mazda RX4" "Mazda RX4 Wag" "Datsun 710" "Hornet 4 Drive" ...
#>  $ mpg : num  21 21 22.8 21.4 18.7 18.1 14.3 24.4 22.8 19.2 ...
#>  $ cyl : int  6 6 4 6 8 6 8 4 4 6 ...
#>  $ disp: num  160 160 108 258 360 ...
#>  $ hp  : int  110 110 93 110 175 105 245 62 95 123 ...
#>  $ drat: num  3.9 3.9 3.85 3.08 3.15 2.76 3.21 3.69 3.92 3.92 ...
#>  $ wt  : num  2.62 2.88 2.32 3.21 3.44 ...
#>  $ qsec: num  16.5 17 18.6 19.4 17 ...
#>  $ vs  : int  0 0 1 1 0 1 0 1 1 1 ...
#>  $ am  : int  1 1 1 0 0 0 0 0 0 0 ...
#>  $ gear: int  4 4 4 3 3 3 3 4 4 4 ...
#>  $ carb: int  4 4 1 1 2 1 4 2 2 4 ...
str(out$default$clean)
#> 'data.frame':    10 obs. of  12 variables:
#>  $ car : chr  "Mazda RX4" "Mazda RX4 Wag" "Datsun 710" "Hornet 4 Drive" ...
#>  $ mpg : num  21 21 22.8 21.4 18.7 18.1 14.3 24.4 22.8 19.2
#>  $ cyl : int  6 6 4 6 8 6 8 4 4 6
#>  $ disp: num  160 160 108 258 360 ...
#>  $ hp  : int  110 110 93 110 175 105 245 62 95 123
#>  $ drat: num  3.9 3.9 3.85 3.08 3.15 2.76 3.21 3.69 3.92 3.92
#>  $ wt  : num  2.62 2.88 2.32 3.21 3.44 ...
#>  $ qsec: num  16.5 17 18.6 19.4 17 ...
#>  $ vs  : int  0 0 1 1 0 1 0 1 1 1
#>  $ am  : int  1 1 1 0 0 0 0 0 0 0
#>  $ gear: int  4 4 4 3 3 3 3 4 4 4
#>  $ carb: int  4 4 1 1 2 1 4 2 2 4
out$default$plot
#> [1] "/tmp/Rtmp7aN1tO/file539d1d2fbb5a/output/default/plot.png"
#> attr(,"OUTFILE")
#> [1] TRUE

The resulting plot in the ‘output’ directory.

The outputs, along with some additional information that we can use for cache invalidation checks, are also saved to disk in the folder ./pipeline_cache. We do not expect uses to interact directly with this folder and it’s contents should not be altered.

#> /tmp/Rtmp7aN1tO/file539d1d2fbb5a/.pipeline_cache
#> ├── 6d95a1c536b565f927cc7c5e9edfb17a.rds
#> ├── 94433eb0504ad33a41c79b71e3eb2920.rds
#> ├── bc84fd5137211c57b156a5dfbc4e1914.rds
#> └── default_cache_.rds

Changing things

Let’s assume that after cleaning our data we wanted to sample rows rather than take as is. Updating our pipeline accordingly, we have

out <- pipeline_run({
    raw     <- load_dat(CONFIG$in_csv)
    clean   <- wrangle_dat(raw, CONFIG$rows)
    samples <- clean[sample(nrow(clean), replace = TRUE), ]
    plot    <- plot_dat(samples, CONFIG$out_plot)
})
#>
#> ── Starting pipeline with `default` configuration ──────────────────────────────
#> ℹ Loading object `raw` from disk cache.
#> ℹ Loading object `clean` from disk cache.
#> ℹ Running function to generate object `samples`.
#> ℹ Saving `samples` to disk.
#> ℹ Running function to generate object `plot`.
#> ℹ Saving `plot` to disk.

Note that where possible, rather than start from scratch, the cached results were loaded from disk. This caching takes account of changes in functions within the function directory but does not attempt to handle changes in the underlying R version or packages you use (in essence, it assumes environment stability is managed elsewhere).

Multiple scenarios

The package assists in running multiple scenarios in two ways. Firstly, users can specify different configurations to run in the call, i.e.

pipeline_run(
    ...,
    config = c("default", "production"),
    scenario_default = "default"
)

This in turn tells the function to parse the configuration file, looking for an entry called ‘default’. This is used as the base scenario. Other values (in this case ‘production’) in the configuration file are then layered on top of this (internally using modifyList()) to generate additional scenarios to run.

Secondly, we want to avoid unnecessary computations so, where possible, we try to share cached objects across scenarios. To make things more concrete, consider our second demo:

dir2 <- pipeline_skeleton(dir = tempfile(), single = FALSE)
setwd(dir2)
#> ✔ Template pipeline created in '/tmp/Rtmp7aN1tO/file539d581f0a49'.

This time the configuration file defines two scenarios, default, which subsets the data to 10 rows and production, which returns 32 rows. Note that when working across different scenarios, the entries in the configuration file must be appropriately named lists.

config.R
default <- list(
    in_csv   = INFILE("data/mtcars.csv"),
    out_plot = OUTFILE("plot.png"),
    rows     = 10
)

production <- list(
    rows = 32
)

To run both scenarios:

out <- pipeline_run(
    {
        raw <- load_dat(CONFIG$in_csv)
        clean <- wrangle_dat(raw, CONFIG$rows)
        plot <- plot_dat(clean, CONFIG$out_plot)
    },
    scenario = c("default", "production"),
    scenario_default = "default" # included here but note that this is
                                 # also the default value for the argument.
)
#>
#> ── Starting pipeline with `default` configuration ──────────────────────────────
#> ℹ Running function to generate object `raw`.
#> ℹ Saving `raw` to disk.
#> ℹ Running function to generate object `clean`.
#> ℹ Saving `clean` to disk.
#> ℹ Running function to generate object `plot`.
#> ℹ Saving `plot` to disk.
#>
#> ── Starting pipeline with `production` configuration ───────────────────────────
#> ℹ Loading object `raw` from disk cache.
#> ℹ Running function to generate object `clean`.
#> ℹ Saving `clean` to disk.
#> ℹ Running function to generate object `plot`.
#> ℹ Saving `plot` to disk.

We see above that the default scenario ran from scratch where as the production scenario was able to skip the first computation, loading the cached result from disk.

The resulting plots in the ‘output’ directory.