For a new project I’ve been thinking about an idea/abstraction/pattern that is really not new (few things are anyway)
but I thought might be worthwhile writing about before running off and executing on it: A
Pipeline in Elixir.
The current goal we want to achieve with the project is to tap into multiple buckets of data, transform it if needed, apply some validations and then lay the data to rest in a common format. That is the very high level description from about one week on said project. What I do know, is that we will tap into multiple sources, that reach of some different channels, such as
- a Web API with JSON-over-HTTP
- a zip file getting FTP’ed onto a folder
- or our system doing a query against a third-party system
The transformations we need to do are likely temporary, as the other systems converge on the format we like the most. Hence, we might need to add more transformations or remove other throughout the lifetime.
The validations on the other hand will depend on the data we pass through. They probably check the shape of the data, not necessarily the contents. For example, we will need to check that something is a non-negative currency in either U$D, EUR, GBP, or BRL, but not whether the value is greater than something stored in a DB somewhere.
We are thinking of two places where we want to put data to rest, with different retention characteristics.
The code I had in mind would roughly look something like this:
defmodule Partner1Job do def execute do pipeline = Pipeline.new(name: "Partner1 worker") |> Pipeline.from(Partner1Source, ["this-cool-table", starting_at: 213, page_size: 50]) |> Pipeline.through(Partner1Transformations) |> Pipeline.validate(GroupOfValidations) |> Pipeline.write_into(FileCabinet) Scheduler.run!(pipeline) end end
In the example above, we create a new
Pipeline with some parameters like a name (useful for logging later on?) and then configure
where the data comes from, what transformations to apply on the way, what validations to perform and finally where to write the data to.
We have pipelines already
True, but they are only a syntacitc language construct.
Elevating the idea of the pipeline allows us to have conversation about
Sinks of data.
What needs to be common among all partners? What will require a special-casing?
It also means we have a common place to attach behaviour that should apply to all pipelines.
For example, we can add logging to every single stage and use the
name from the pipeline in the logs.
We could also add a common place to write any kind of error to and thus standardize a common log format.
Just writing this out as a simple
|> means we’d have to be mindful of this in all places.
Change what needs changing
I imagine that most
Sinks will be the same across most pipelines.
The two things I foresee having some variations, are entry points (Web, FTP, timed trigger) and different transformations.
Let’s look at what a running a HTTP request from Phoenix through the pipeline could look like:
defmodule DataApiController do def init(...) do pipe = Pipeline.new(name: "Data-Ingestion-API") |> Pipeline.validate(GroupOfValidations) |> Pipeline.write_into(FileCabinet) [pipeline: pipe] end def some_action(conn, params, pipeline) do result = pipeline |> Pipeline.from(WebAPISource, params: params) |> Pipeline.through(PlugEnhancer, plug: plug) |> Scheduler.run!() json result end end
Here I’d maybe split the configuration of the pipeline into sections that are the same across all requests and sections that
require data from the request itself.
I’d still have the notion of a
source, just that this one comes from the
WebAPISource and contains the rules on how to convert
params into whatever flows through the pipeline.
through(PlugEnhancer, plug: plug) I’d enhance the pipeline with any details from the
plug itself, such as values of headers or hostnames or timestamps.
Scheduler would then take the pipe and run it. This is also a good expansion point to provide different ways of running the pipe.
Maybe we don’t care about the result and can do something like
Scheduler.run_later(pipeline) or maybe
Scheduler.run(pipeline, at: :fullhour).
Scheduler.run(pipeline, every: 15, :minutes) if we want to launch a background task for a periodic job.
If you know of a library that does this already in Elixir or any other language, please send me a Tweet, so I can use it as a source of inspiration. Watch this space!