Introduction to pipeline

This package:

Background

The pipeline package’s aim is to make it easy for users to adopt a functional approach to their analysis. Pipelines are defined using a reduced subset of R and, when run, use an intelligent approach to work planning and object caching. It was motivated by a desire to reduce the boilerplate I was writing with my R scripts and associated Makefiles. Whilst the goal is to be β€œgood enough” for many pieces of analysis, by encouraging a functional approach, it remains easy to move to more advanced alternatives such as targets.

Scope

Goals:

Out of scope:

A motivating example

The easiest way to get started with the package is to look at some of the included examples. pipeline_skeleton() can be used to set up a pipeline based on the mtcars data set and across single, or multiple, scenarios. We start with the single scenario example.

library(pipeline)
dir <- pipeline_skeleton(dir = tempfile(), single = TRUE)
#> βœ” Template pipeline created in '/tmp/RtmpJVgDOF/fileaef62914bf17'.

We now have the following in dir:

#> /tmp/RtmpJVgDOF/fileaef62914bf17
#> β”œβ”€β”€ R
#> β”‚   β”œβ”€β”€ load_dat.R
#> β”‚   β”œβ”€β”€ plot_dat.R
#> β”‚   └── wrangle_dat.R
#> β”œβ”€β”€ config.R
#> β”œβ”€β”€ data
#> β”‚   └── mtcars.csv
#> └── pipeline.R

The pipeline we are going to run is described in pipeline.R:

out <- pipeline_run({
    raw   <- load_dat(CONFIG$in_csv)
    clean <- wrangle_dat(raw, CONFIG$rows)
    plot  <- plot_dat(clean, CONFIG$out_plot)
})

pipeline_run() takes an expression where each assignment it contains represents a stage of the analysis. There are a few things to note about the expression syntax:

The configuration file

Configuration files are simply R scripts that are evaluated in a dedicated environment (with some allowances for multiple scenarios that we discuss at the end).

Within this environment we enable two special functions INFILE and OUTFILE:

In our example we have the following configuration file:

in_csv   <- INFILE("data/mtcars.csv")
out_plot <-  OUTFILE("plot.png")
rows     <-  10

Here in_csv points to our input data and out_plot to a png output file.

The R directory

This is the directory where users define functions that represent stages of their analysis (e.g. cleaning data, modelling, plotting, …). Here we have:

load_dat <- function(file) {
    # Note: If function not in the base/stats namespace or function directory
    #       we must explictly namespace the call using `::`.
    utils::read.csv(file)
}

This file contains a single function which takes the filename of raw data as input before doing a little wrangling and returning the output. Note the comment about namespacing functions; anything not in the base/stats namespace or the function directory needs to be explicitly namespaced.

Next we have R/wrangle_dat.R:

wrangle_dat <- function(dat, rows) {
    stopifnot(is.data.frame(dat), nrow(dat) >= rows, rows >= 0)
    head(dat, rows)
}

This function curtails the number of rows of data. Here we also make use of assertions in the function, checking that dat is a data frame and rows is between 0 and the number of rows in the data frame (inclusive). Assertions like these can be thought of unit tests for your analysis. Whilst here we have them at the start of the function to check inputs, they can be used anywhere in your function to ensure things are running as expected. If an assertion fails it will stop the pipeline running, bringing your attention to any issue at hand.

Finally there is

plot_dat <- function(dat, outfile) {
    grDevices::png(outfile)
    plot(factor(dat$cyl), dat$mpg, xlab = "cyl", ylab = "mpg")
    grDevices::dev.off()
    # return name of where image is saved
    outfile
}

This function takes the manipulated data as input and creates a plot which is saved as a png. When saving output to disk, we recommend that the function returns the location where the file is saved for later use.

Putting it together

Running the pipeline we see the following

original_directory <- setwd(dir)
out <- pipeline_run({
    raw   <- load_dat(CONFIG$in_csv)
    clean <- wrangle_dat(raw, CONFIG$rows)
    plot  <- plot_dat(clean, CONFIG$out_plot)
})
#> 
#> ── Starting pipeline with `default` configuration ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
#> β„Ή Running function to generate object `raw`.
#> β„Ή Saving `raw` to disk.
#> β„Ή Running function to generate object `clean`.
#> β„Ή Saving `clean` to disk.
#> β„Ή Running function to generate object `plot`.
#> β„Ή Saving `plot` to disk.

Each target output is saved within a single list that we have allocated to out. The name of the list is configurable but, by default, is β€˜default’.

str(out$default$raw)
#> 'data.frame':	32 obs. of  12 variables:
#>  $ car : chr  "Mazda RX4" "Mazda RX4 Wag" "Datsun 710" "Hornet 4 Drive" ...
#>  $ mpg : num  21 21 22.8 21.4 18.7 18.1 14.3 24.4 22.8 19.2 ...
#>  $ cyl : int  6 6 4 6 8 6 8 4 4 6 ...
#>  $ disp: num  160 160 108 258 360 ...
#>  $ hp  : int  110 110 93 110 175 105 245 62 95 123 ...
#>  $ drat: num  3.9 3.9 3.85 3.08 3.15 2.76 3.21 3.69 3.92 3.92 ...
#>  $ wt  : num  2.62 2.88 2.32 3.21 3.44 ...
#>  $ qsec: num  16.5 17 18.6 19.4 17 ...
#>  $ vs  : int  0 0 1 1 0 1 0 1 1 1 ...
#>  $ am  : int  1 1 1 0 0 0 0 0 0 0 ...
#>  $ gear: int  4 4 4 3 3 3 3 4 4 4 ...
#>  $ carb: int  4 4 1 1 2 1 4 2 2 4 ...
str(out$default$clean)
#> 'data.frame':	10 obs. of  12 variables:
#>  $ car : chr  "Mazda RX4" "Mazda RX4 Wag" "Datsun 710" "Hornet 4 Drive" ...
#>  $ mpg : num  21 21 22.8 21.4 18.7 18.1 14.3 24.4 22.8 19.2
#>  $ cyl : int  6 6 4 6 8 6 8 4 4 6
#>  $ disp: num  160 160 108 258 360 ...
#>  $ hp  : int  110 110 93 110 175 105 245 62 95 123
#>  $ drat: num  3.9 3.9 3.85 3.08 3.15 2.76 3.21 3.69 3.92 3.92
#>  $ wt  : num  2.62 2.88 2.32 3.21 3.44 ...
#>  $ qsec: num  16.5 17 18.6 19.4 17 ...
#>  $ vs  : int  0 0 1 1 0 1 0 1 1 1
#>  $ am  : int  1 1 1 0 0 0 0 0 0 0
#>  $ gear: int  4 4 4 3 3 3 3 4 4 4
#>  $ carb: int  4 4 1 1 2 1 4 2 2 4
out$default$plot
#> [1] "/tmp/RtmpJVgDOF/fileaef62914bf17/output/default/plot.png"
#> attr(,"OUTFILE")
#> [1] TRUE

Box plot of cyl (x-axis) versus mpg (y-axis). There is a downward trend showing in mpg as cyl increases.

1 The resulting plot in the β€˜output’ directory.

The outputs, along with some additional information that we can use for cache invalidation checks, are also saved to disk in the folder ./pipeline_cache. We do not expect uses to interact directly with this folder and it’s contents should not be altered.

#> /tmp/RtmpJVgDOF/fileaef62914bf17/.pipeline_cache
#> β”œβ”€β”€ 1a4133e91419099378eca6f6d3cc875e.rds
#> β”œβ”€β”€ 258e6d4bd71d98704c0e3f3d335c29bc.rds
#> β”œβ”€β”€ 40251d9f11786ec280b6c5c056687e5d.rds
#> └── default_cache_.rds

Changing things

Let’s assume that after cleaning our data we wanted to sample rows rather than take as is. Let’s add a new function to our R directory:

sample_clean <- function(clean) {
    clean[sample(nrow(clean), replace = TRUE), ]
}
cat(
    "sample_clean <- function(clean) clean[sample(nrow(clean), replace = TRUE), ]",
    file = file.path("R", "sample_clean.R")
)

Updating our pipeline accordingly, we have

out <- pipeline_run({
    raw     <- load_dat(CONFIG$in_csv)
    clean   <- wrangle_dat(raw, CONFIG$rows)
    samples <- sample_clean(clean)
    plot    <- plot_dat(samples, CONFIG$out_plot)
})
#> 
#> ── Starting pipeline with `default` configuration ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
#> β„Ή Loading object `raw` from disk cache.
#> β„Ή Loading object `clean` from disk cache.
#> β„Ή Running function to generate object `samples`.
#> β„Ή Saving `samples` to disk.
#> β„Ή Running function to generate object `plot`.
#> β„Ή Saving `plot` to disk.

Note that where possible, rather than start from scratch, the cached results were loaded from disk. This caching takes account of changes in functions within the function directory but does not attempt to handle changes in the underlying R version or packages you use (in essence, it assumes environment stability is managed elsewhere).

Multiple scenarios

The package assists in running multiple scenarios in two ways. Firstly, users can specify different configurations to run in the call, i.e.

pipeline_run(
    ...,
    config = c("default", "production"),
    scenario_default = "default"
)

This in turn tells the function to parse the configuration file, looking for an entry called β€˜default’. This is used as the base scenario. Other values (in this case β€˜production’) in the configuration file are then layered on top of this (internally using modifyList()) to generate additional scenarios to run.

Secondly, we want to avoid unnecessary computations so, where possible, we try to share cached objects across scenarios. To make things more concrete, consider our second demo:

dir2 <- pipeline_skeleton(dir = tempfile(), single = FALSE)
setwd(dir2)
#> βœ” Template pipeline created in '/tmp/RtmpJVgDOF/fileaef673c26a71'.

This time the configuration file defines two scenarios, default, which subsets the data to 10 rows and production, which returns 32 rows. Note that when working across different scenarios, the entries in the configuration file must be appropriately named lists.

default <- list(
    in_csv   = INFILE("data/mtcars.csv"),
    out_plot = OUTFILE("plot.png"),
    rows     = 10
)

production <- list(
    rows = 32
)

To run both scenarios:

out <- pipeline_run(
    {
        raw <- load_dat(CONFIG$in_csv)
        clean <- wrangle_dat(raw, CONFIG$rows)
        plot <- plot_dat(clean, CONFIG$out_plot)
    },
    scenario = c("default", "production"),
    scenario_default = "default" # included here but note that this is
                                 # also the default value for the argument.
)
#> 
#> ── Starting pipeline with `default` configuration ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
#> β„Ή Running function to generate object `raw`.
#> β„Ή Saving `raw` to disk.
#> β„Ή Running function to generate object `clean`.
#> β„Ή Saving `clean` to disk.
#> β„Ή Running function to generate object `plot`.
#> β„Ή Saving `plot` to disk.
#> 
#> ── Starting pipeline with `production` configuration ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
#> β„Ή Loading object `raw` from disk cache.
#> β„Ή Running function to generate object `clean`.
#> β„Ή Saving `clean` to disk.
#> β„Ή Running function to generate object `plot`.
#> β„Ή Saving `plot` to disk.

We see above that the default scenario ran from scratch where as the production scenario was able to skip the first computation, loading the cached result from disk.

Two side by side box plots of cyl (x-axis) versus mpg (y-axis). The left-hand one represents the β€˜default’ configuration, the right-hand, β€˜production’. There is a downward trend showing in mpg as cyl increases.” # nolint: line_length_linter.

2 The resulting plots in the β€˜output’ directory.