This package:
The pipeline packageβs aim is to make it easy for users to adopt a functional approach to their analysis. Pipelines are defined using a reduced subset of R and, when run, use an intelligent approach to work planning and object caching. It was motivated by a desire to reduce the boilerplate I was writing with my R scripts and associated Makefiles. Whilst the goal is to be βgood enoughβ for many pieces of analysis, by encouraging a functional approach, it remains easy to move to more advanced alternatives such as targets.
Goals:
Out of scope:
The easiest way to get started with the package is to look at some of the included examples.
pipeline_skeleton() can be used to set up a pipeline based on the mtcars data set and across single, or multiple, scenarios.
We start with the single scenario example.
library(pipeline)
dir <- pipeline_skeleton(dir = tempfile(), single = TRUE)
We now have the following in dir:
#> /tmp/RtmpJVgDOF/fileaef62914bf17
#> βββ R
#> β βββ load_dat.R
#> β βββ plot_dat.R
#> β βββ wrangle_dat.R
#> βββ config.R
#> βββ data
#> β βββ mtcars.csv
#> βββ pipeline.R
The pipeline we are going to run is described in pipeline.R:
out <- pipeline_run({
raw <- load_dat(CONFIG$in_csv)
clean <- wrangle_dat(raw, CONFIG$rows)
plot <- plot_dat(clean, CONFIG$out_plot)
})
pipeline_run() takes an expression where each assignment it contains represents a stage of the analysis.
There are a few things to note about the expression syntax:
R, contained in the base R namespace or, if present in another package, be explicitly namespaced (i.e. package::fun).config.R, are accessed by the special CONFIG value (a list which can be subset using the $ symbol).Configuration files are simply R scripts that are evaluated in a dedicated environment (with some allowances for multiple scenarios that we discuss at the end).
Within this environment we enable two special functions INFILE and OUTFILE:
INFILE() marks a file as an input.
This ensures that pipeline_run() knows to keep a hash of the object and to monitor it for future changes when the pipeline is rerun.
OUTFILE() marks a file name as an output. This ensures that outputs from different scenarios (see later) do not overwrite one another (output files get saved in a scenario-dependent directory). It also ensures that output files are recreated if they are deleted.
In our example we have the following configuration file:
in_csv <- INFILE("data/mtcars.csv")
out_plot <- OUTFILE("plot.png")
rows <- 10
Here in_csv points to our input data and out_plot to a png output file.
This is the directory where users define functions that represent stages of their analysis (e.g. cleaning data, modelling, plotting, β¦). Here we have:
load_dat <- function(file) {
# Note: If function not in the base/stats namespace or function directory
# we must explictly namespace the call using `::`.
utils::read.csv(file)
}
This file contains a single function which takes the filename of raw data as input before doing a little wrangling and returning the output. Note the comment about namespacing functions; anything not in the base/stats namespace or the function directory needs to be explicitly namespaced.
Next we have R/wrangle_dat.R:
wrangle_dat <- function(dat, rows) {
stopifnot(is.data.frame(dat), nrow(dat) >= rows, rows >= 0)
head(dat, rows)
}
This function curtails the number of rows of data.
Here we also make use of assertions in the function,
checking that dat is a data frame and rows is between 0 and the number of rows in the data frame (inclusive).
Assertions like these can be thought of unit tests for your analysis.
Whilst here we have them at the start of the function to check inputs,
they can be used anywhere in your function to ensure things are running as expected.
If an assertion fails it will stop the pipeline running, bringing your attention to any issue at hand.
Finally there is
plot_dat <- function(dat, outfile) {
grDevices::png(outfile)
plot(factor(dat$cyl), dat$mpg, xlab = "cyl", ylab = "mpg")
grDevices::dev.off()
# return name of where image is saved
outfile
}
This function takes the manipulated data as input and creates a plot which is saved as a png. When saving output to disk, we recommend that the function returns the location where the file is saved for later use.
Running the pipeline we see the following
original_directory <- setwd(dir)
out <- pipeline_run({
raw <- load_dat(CONFIG$in_csv)
clean <- wrangle_dat(raw, CONFIG$rows)
plot <- plot_dat(clean, CONFIG$out_plot)
})
Each target output is saved within a single list that we have allocated to out. The name of the list is configurable but, by default, is βdefaultβ.
str(out$default$raw)
#> 'data.frame': 32 obs. of 12 variables:
#> $ car : chr "Mazda RX4" "Mazda RX4 Wag" "Datsun 710" "Hornet 4 Drive" ...
#> $ mpg : num 21 21 22.8 21.4 18.7 18.1 14.3 24.4 22.8 19.2 ...
#> $ cyl : int 6 6 4 6 8 6 8 4 4 6 ...
#> $ disp: num 160 160 108 258 360 ...
#> $ hp : int 110 110 93 110 175 105 245 62 95 123 ...
#> $ drat: num 3.9 3.9 3.85 3.08 3.15 2.76 3.21 3.69 3.92 3.92 ...
#> $ wt : num 2.62 2.88 2.32 3.21 3.44 ...
#> $ qsec: num 16.5 17 18.6 19.4 17 ...
#> $ vs : int 0 0 1 1 0 1 0 1 1 1 ...
#> $ am : int 1 1 1 0 0 0 0 0 0 0 ...
#> $ gear: int 4 4 4 3 3 3 3 4 4 4 ...
#> $ carb: int 4 4 1 1 2 1 4 2 2 4 ...
str(out$default$clean)
#> 'data.frame': 10 obs. of 12 variables:
#> $ car : chr "Mazda RX4" "Mazda RX4 Wag" "Datsun 710" "Hornet 4 Drive" ...
#> $ mpg : num 21 21 22.8 21.4 18.7 18.1 14.3 24.4 22.8 19.2
#> $ cyl : int 6 6 4 6 8 6 8 4 4 6
#> $ disp: num 160 160 108 258 360 ...
#> $ hp : int 110 110 93 110 175 105 245 62 95 123
#> $ drat: num 3.9 3.9 3.85 3.08 3.15 2.76 3.21 3.69 3.92 3.92
#> $ wt : num 2.62 2.88 2.32 3.21 3.44 ...
#> $ qsec: num 16.5 17 18.6 19.4 17 ...
#> $ vs : int 0 0 1 1 0 1 0 1 1 1
#> $ am : int 1 1 1 0 0 0 0 0 0 0
#> $ gear: int 4 4 4 3 3 3 3 4 4 4
#> $ carb: int 4 4 1 1 2 1 4 2 2 4
out$default$plot
#> [1] "/tmp/RtmpJVgDOF/fileaef62914bf17/output/default/plot.png"
#> attr(,"OUTFILE")
#> [1] TRUE

1 The resulting plot in the βoutputβ directory.
The outputs, along with some additional information that we can use for cache invalidation checks, are also saved to disk in the folder ./pipeline_cache.
We do not expect uses to interact directly with this folder and itβs contents should not be altered.
#> /tmp/RtmpJVgDOF/fileaef62914bf17/.pipeline_cache
#> βββ 1a4133e91419099378eca6f6d3cc875e.rds
#> βββ 258e6d4bd71d98704c0e3f3d335c29bc.rds
#> βββ 40251d9f11786ec280b6c5c056687e5d.rds
#> βββ default_cache_.rds
Letβs assume that after cleaning our data we wanted to sample rows rather than take as is.
Letβs add a new function to our R directory:
sample_clean <- function(clean) {
clean[sample(nrow(clean), replace = TRUE), ]
}
cat(
"sample_clean <- function(clean) clean[sample(nrow(clean), replace = TRUE), ]",
file = file.path("R", "sample_clean.R")
)
Updating our pipeline accordingly, we have
out <- pipeline_run({
raw <- load_dat(CONFIG$in_csv)
clean <- wrangle_dat(raw, CONFIG$rows)
samples <- sample_clean(clean)
plot <- plot_dat(samples, CONFIG$out_plot)
})
Note that where possible, rather than start from scratch, the cached results were loaded from disk. This caching takes account of changes in functions within the function directory but does not attempt to handle changes in the underlying R version or packages you use (in essence, it assumes environment stability is managed elsewhere).
The package assists in running multiple scenarios in two ways. Firstly, users can specify different configurations to run in the call, i.e.
pipeline_run(
...,
config = c("default", "production"),
scenario_default = "default"
)
This in turn tells the function to parse the configuration file, looking for an entry called βdefaultβ.
This is used as the base scenario.
Other values (in this case βproductionβ) in the configuration file are then layered on top of this (internally using modifyList()) to generate additional scenarios to run.
Secondly, we want to avoid unnecessary computations so, where possible, we try to share cached objects across scenarios. To make things more concrete, consider our second demo:
dir2 <- pipeline_skeleton(dir = tempfile(), single = FALSE)
setwd(dir2)
This time the configuration file defines two scenarios, default, which subsets the data to 10 rows and production, which returns 32 rows. Note that when working across different scenarios, the entries in the configuration file must be appropriately named lists.
default <- list(
in_csv = INFILE("data/mtcars.csv"),
out_plot = OUTFILE("plot.png"),
rows = 10
)
production <- list(
rows = 32
)
To run both scenarios:
out <- pipeline_run(
{
raw <- load_dat(CONFIG$in_csv)
clean <- wrangle_dat(raw, CONFIG$rows)
plot <- plot_dat(clean, CONFIG$out_plot)
},
scenario = c("default", "production"),
scenario_default = "default" # included here but note that this is
# also the default value for the argument.
)
We see above that the default scenario ran from scratch where as the production scenario was able to skip the first computation, loading the cached result from disk.

2 The resulting plots in the βoutputβ directory.