User driven Data Pipelines

Emils Solmanis
RVU Labs
Published in
5 min readMay 27, 2021

--

A significant proportion of teams at RVU deal in structured data, but that data isn’t produced by the organisation. We ingest it from various feed providers, most of which have differing formats, and different levels of cleanup and stitching required.

This post discusses our User driven Data Pipeline that shifts the responsibility of managing the pipeline from Engineering to Product, reducing our Lead Time to Product (how long it takes to spin up a new Product) from multiple weeks to days.

All the work involved in pulling in Data powering a Product was managed by engineering. A large amount of time and consequently effort was spent in maintaining complex business logic involved in joining up various pieces of data coming in from third party feeds. This also meant that every time a new business vertical was launched, engineers had to look at some new feed, familiarise themselves with the domain, implement the parsing, and then maintain yet another feed implementation.

This ultimately led to a lot of frustration:

  • engineers felt their skillset was being poorly utilised, maintaining field mappings and joining files is tedious and not unique or novel problem solving
  • business stakeholders felt some semantically simple changes were too expensive, and rightly so, the expectation should be that each new added product should be easier
  • business operations teams were being under utilised on the product aspect of their work by only being responsible for manual products and some curation

Using a DSL to change the focus of responsibility

In an effort to flip the responsibility back to product to reclaim engineering time, the team explored various options. A vendor analysis was carried out to understand if this was a capability we could buy. In our assessment, various CMSs and PaaSs were useful but did not solve the problem without some custom integrations meaning this would still be an engineering problem.

We eventually landed on building our own DSL in combination with using some PaaS tooling in FiveTran.

Our preferred solution was Apache Beam running on Google DataFlow. This solves all our other problems, such as

  • it supports both batch and streaming
  • it has native support for all the other tech we want to or already use: ElasticSearch, BigQuery, Postgres
  • it is very high level, things like joins are already there in the standard library, and basic functions like “renaming” can be simply implemented in Kotlin
  • it supports a graph-like data flow, you can join multiple data streams into one, or fork a single stream out into multiple different ones (e.g., based on a field value)
  • it has a fully managed runner (Dataflow), that handles all the infra side and various things like retrying, logs, error management (via GCP’s mechanisms), UI, and all the other supporting bits

To refocus the responsibility as a business problem we had to ensure that the barrier to entry was lower than writing Java (Apache Beam). We could not reasonably expect our business operations to adopt that.

The intended audience for this are people who are all at least reasonably “technical”, they work with JSON regularly, are all advanced spreadsheet users, but none have formal software engineering training, “Power users” is a term that might apply. We chose to do our DSL in JSON because it’s simple, popular, and most importantly structured, i.e., if someone omits a comma or does a typo, we can easily validate that and show them exactly where the mistake is. Fault tolerance in syntax was an important aspect and this is partly why we didn’t write our own grammar in ANTLR. Because Beam is just a graph structure of functions all we need is:

  • a way to represent said graph in JSON
  • a library of functions callable via JSON
  • ways to define inputs and outputs of the graph

Internally the system operates on loosely typed hash-maps, all sources produce these, all functions apply on them.

Inputs

For inputs, we just have some thin classes that wrap various Beam IO connectors, e.g., a file input would be:

{
"name": "my-content",
"type": "file",
"file": "my-files/feed-data/*.json"
}

which just instantiates a Beam TextIO.Read and passes along the argument. You get the added benefit of it supporting Google Cloud Storage natively, so if someone wants to start reading files from GCS, they just change their file attribute to “gs://my-bucket/my-files/*.json”.

We have various connectors exposed in this way, some more business-specific than others. Another added benefit is that, because the system operates on maps of data, you can swap sources (e.g., for testing), as long as you maintain the same data contracts, this allows you to switch between local files and a remote database without changing anything else about the defined pipeline.

Outputs

We have a single output operation that takes a string destination, and dispatches to an underlying Beam connector. Examples would be

{
"type": "output",
"to": "out.json"
}

or

{
"type": "output",
"to": "pubsub://projects/some-rvu-project/topics/my-stream-output"
}

Functions

The meat and potatoes, these are also just JSON structures that translate directly into a Beam ParDo of a DoFn implementation.

Examples would be

{
"type": "rename",
"renames": {
"old1": "new1",
"old2": "new2"
}
}

or

{
"type": "join",
"thisKey": "id",
"other": "other_stream",
"otherKey": "other_id"
}

the latter of which is a join referencing another stream, which I’ll cover next.

Streams

We wanted to represent a Beam graph in JSON. We’ve opted for a “data stream” concept, where a stream is basically just a list of transformations applied to data.

A final workflow might look something like

{
"sources": [{
"name": "my-source",
"type": "bigquery",
"table": "a-project:my-dataset.my-table"
}],
"pipeline": [{
"stream": "my-ingest",
"source": "my-source",
"ops": [{
"type": "output",
"to": "dump/my-data/out.json"
}]
}]
}

All streams and sources are named, and referred to with that name. The join operation above would refer to another stream name.

All streams must have a source, but that source can also be another stream. In addition to composition, this enables mechanics such as forking, where you would apply all the common functions to data in one stream, and then define 2 others with the former as a source — one to output the intermediate state, and another to do further processing.

The road so far

It’s been about a year since we’ve started implementing this. Since then, we have carried out training, we are still actively supporting the Ops teams, but ultimately this has achieved the goal that we set out to achieve — we’ve moved engineers out of the way of product ingestion, and enabled the people actually understanding the business domain to handle it. Our setup time for products has gone from multiple weeks to days and most importantly is now in the hands of product teams and is subject to their prioritisation.

We’ve had to build some supporting bits as well, including job scheduling, and surfacing errors to end users via common channels like Slack, but it’s important to recognise that those are all things we would have to do regardless, even if we made engineers take responsibility for the domain. There’s plenty more to do in this space, if these kind of problems interest you please have a look at our careers page.

--

--