Building the Periscope Data Cache with Amazon Redshift

Want to make data analysis fast for everyone?

Join Us!

Analysis queries should be fast. Very fast. That’s why we built the Periscope Data cache. Queries running our our cache run up to 150 times faster than on our customers’ database. A query that used to take over an hour can run in seconds on the cache.

The Periscope Data cache automatically ETLs customer databases into our Redshift clusters, giving them the power of a high performance data warehouse without any of the maintenance overhead.

Our cache clusters are constantly rebalanced and optimized for multi-tenant access, making them much faster than the clusters customers would buy for themselves. In addition to super-fast queries, the cache enables:

  1. Cross Database Joins: Querying multiple databases at once is easy when they’re all in the cache
  2. Materialized Views: Factor out complex aggregates from queries and reuse them later. The cache keeps the materialized views up to date
  3. Syntax Translation: Keep using your database’s syntax. The cache translates common SQL syntax of other databases to the Redshift equivalent
  4. CSV Uploads: Upload CSVs of data into the cache and query them like any other table

In this post we’ll discuss how we built the cache and why it’s so helpful to many of our customers. We call it a cache because it’s not the source of truth; our customers’ databases are. The cache is a highly optimized copy of their data.

Let’s get started!

It Starts With Redshift

Instead of writing our own database, we started on the shoulders of giants. The cache is composed of many Redshift clusters. The clusters vary by size, but most are 1-2 dozen dense compute nodes.

We’ve previously covered why Redshift is an excellent choice for a data warehouse, and we’ve been very happy building our cache on top if it.

For our workloads of thousands of fast queries, larger clusters of small nodes outperform smaller clusters of large nodes for the same price. Clusters are frequently resized to account for changing data volumes and query patterns, and we run our clusters at 50% utilization so there’s room for burst performance on large queries.

Multi-Tenant Clusters

Every cluster contains the data of multiple customers. No single customer needs the resources of an entire cluster all the time, so a multi-tenant architecture works very well. Every customer benefits from the scale of being on a cluster that’s much larger and faster than they could individually afford.

As customers grow, our cache clusters can get crowded. When this happens, we rebalance customers between clusters, or split large clusters into two medium clusters.

Rewriting Queries

The cache stores a copy of all our customers’ data across all of their databases. This enables cross-database joins because we represent different databases as schemas in Redshift.

select *
from db1.schema1.table1
  join db2.schema2.table2

We rename customer databases and schemas in our cache so that if a customer has multiple databases with similar schemas, they can all be in the same Redshift database without name collisions.

Schema Translation

Since the names of databases and schemas in the cache differ from customers’, one step in our query pipeline rewrites queries to reference the correct schemas on the cache. Here’s that example cross database join from above getting rewritten:

select *
from db_12_schema_34.table1
  join db_56_schema_78.table2

OtherSQL to Redshift Translation

This same rewrite step also translates some common functions and patterns that aren’t supported on Redshift to Redshift syntax. For example, we translate many MySQL functions to their Redshift equivalents, so that customers using MySQL databases can use the cache without learning a new dialect of SQL.

-- MySQL query
select sum(if(id < 10, 5, -15))

-- translated for Redshift
select sum(case when id < 10 then 5 else -15 end)

Periscope Data Syntax Translation

Periscope Data customers frequently use our custom syntax for joining tables, converting time zones, and manipulating dates. This syntax is a lot more convenient than the SQL equivalents, and we translate it to the dialect of the target database.

One of the benefits of using Periscope Data syntax for date and time zone conversions is that these expressions are guaranteed to also run on the cache. This query to get the current week in Pacific time runs on SQL Server and Redshift without any involvement from the user:

-- Periscope Data query
select [getdate():pst:week]

-- SQL Server translation
select convert(date, dateadd(dd, 1 -
  datepart(dw, dateadd(mi, -420, getdate())),
  dateadd(mi, -420, getdate())))

-- Redshift translation
select date_trunc('week', ((getdate() +
  interval '-7 hour'))::timestamp)::date

Cache Consistency

Getting customer data into the cache is relatively easy. The hard part is keeping the cache in sync with the origin database as customers change their data. Many customers don’t have the option to install software on their database server, so tailing replication logs usually isn’t an option.

Instead we populate and maintain the cache with SQL queries.

Fetching Data

We commonly use three different cache population strategies to keep our version of the customers’ data in sync with their databases. Many tables use multiple update strategies in parallel on different schedules to maximize freshness and minimize differences.

Full Updates

This is the most basic and heavyweight option. We download a complete copy of the table on a schedule, and replace the version in the cache. Every table starts with this strategy because at first the cache is empty, and this is the most efficient way of transferring all of the rows.

Full Updates has a high latency, but is necessary for tables that change throughout, or for warehouses that get completely rebuilt as part of nightly processes.

Incremental Updates

For append-only tables, this is the obvious choice. As long at the table has a column with a monotonically increasing value (e.g. a numeric id or created_at timestamp), it’s easy to grab the newest rows every few minutes.

The Incremental Updates strategy is frequently paired with Full Updates for tables that are growing and changing at the same time. On a typical users table, this combination of update strategies gets new users into the cache very quickly, and occasionally updates all of the historical users in case their rows changed.

We grab the newest rows by comparing the maximum value of the ingest column between the cached and origin tables:

-- max value in the cache is 123456
select * from users where id > 123456

Bounded Updates

The third common strategy is for large tables that only update recent rows. This is especially useful when the table is too large to regularly do a Full Update, and is not truly append-only.

We’ve seen this most commonly with tables that hold objects soon to be changed by a background process, like email and transaction logs tables. These rows get marked as successful or failed a few minutes after they are created.

Every time it’s scheduled, the Bounded Updates strategy grabs rows created in the last N hours and replaces those rows in the cache. For example, one configuration fetches and replaces the last 24 hours of rows every hour.

Loading Data

The first time we download a table from a customer, we set dist and sort keys to optimize query performance, and make the column types compatible. Then we can upload that data into Redshift:

COPY my_schema.my_table
  FROM 's3://bucket_name/path/to/my_manifest'
  WITH CREDENTIALS
    'aws_access_key_id=<my_access_key>;
    aws_secret_access_key=<my_secret_key>'
  REGION 'us-east-1'
  MANIFEST
  GZIP
  ACCEPTANYDATE
  TRUNCATECOLUMNS
  ACCEPTINVCHARS

Key Selection

Currently, we choose dist and sort keys through a simple rules-based process. In an upcoming version of our caching infrastructure, we’ll analyze the many thousands of queries each customer runs to automatically pick optimal sort and dist keys. This will be especially powerful as query patterns change over time.

Column Types

Column data types are not consistent between databases. One tricky part of loading data into Redshift is setting the right column types, based on the origin column type. Redshift supports a subset of the myriad types available across Postgres, MySQL, SQL Server, etc.

Before data is loaded into the cache, we translate column types from their origin types into something Redshift can use. For example, we store Postgres’ json data type as a varchar in Redshift:

Origin Database Origin Type Redshift Type
Postgres json varchar
MySQL unsigned bigint numeric(20, 0)
SQL Server smalldatetime timestamp

Maintenance

We let Redshift choose the compression settings the first time the table is loaded, and we reuse those settings for each subsequent load to improve performance. A background task rechecks each table periodically to see if the data skew has changed and new settings would be more efficient.

-- Get current compression settings
select *
from pg_table_def
where tablename = 'events'

-- Compare to what Redshift suggests
analyze compression events

The cache is constantly updated to stay in sync with the origin database. These updates generate a lot of dead rows that can only be recovered by a vacuum. So there is a second background task vacuums tables during off-peak hours.

Uploading CSVs

A natural feature that comes from caching customer data is enabling customers to upload CSVs. Since we already have a place to store and query a copy of their data, it’s relatively easy to store uploaded CSVs in the same place. Here’s a query joining to a CSV uploaded as “marketing_spend”.

-- Periscope Data query
select
  marketing_spend.campaign,
  count(1) / max(marketing_spend.spend)
from leads
  join [marketing_spend]
    using (campaign_id)
group by 1

-- translated for Redshift
select
  marketing_spend.campaign,
  count(1) / max(marketing_spend.spend)
from leads
  join periscope_views.marketing_spend
    using (campaign_id)
group by 1

And since the CSVs are stored as tables, these CSVs can be joined to other tables in the same database. Whether it’s loading new data not already in the database, or uploading analysis done outside of Periscope Data, CSVs are a popular companion to cached tables.

Materializing Views

One of the most powerful optimizations in any warehouse is pre-aggregation. Periscope Data allows users to define views, which we materialize in the cache. Redshift doesn’t support materialized views right now, so we build and maintain tables and surface them as views.

Materializing views shares a lot of infrastructure with ingesting customer tables. They too need to have sort and dist keys defined, columns types detected, and compression settings applied.

Creating Views

The simplest possible view materialization creates a table from a select statement:

create table hourly_users as (
  select
    date_trunc('hour', created_at) hr,
    count(distinct user_id) ct
  from  activity
  group by 1
)

Unfortunately, this approach doesn’t let us set the sort or dist keys or column compression encodings. So instead we materialize views by inferring the schema and then loading the data.

View Schema

First, we make the view’s table via a select statement similar to what’s above. Instead of selecting the whole view into a table, we select the first few thousand rows.

create table periscope_views.hourly_users__periscope_temp__ as (
  select
    date_trunc('hour', created_at) hr,
    count(distinct user_id) ct
  from  activity
  group by 1
  limit 10000
)

This ensures the planner executes the query to get the correct types, while not wasting time materializing the whole table. Using a limit 0 on the above query will be faster and work most of the time, but the planner makes mistakes when complex functions are involved in column output.

Unloading Data

Once we have the schema from the first step, we can execute the complete view in an unload command. This will dump the view’s data to S3 so that we can load it with Redshift’s copy command.

UNLOAD ('select
           date_trunc(''hour'', created_at) hr,
           count(distinct user_id) ct
         from  activity
         group by 1')
TO 's3://bucket_name/path/to/my_filename_prefix'
WITH CREDENTIALS
  'aws_access_key_id=<my_access_key>;
   aws_secret_access_key=<my_secret_key>'
MANIFEST
GZIP
ALLOWOVERWRITE
ESCAPE
NULL AS '\\N'

The schema lets us define the table to receive the load from S3, and Redshift will take care of setting compression settings on the way in.

create table periscope_views.hourly_users__periscope_temp__ (
  hr timestamp delta,
  ct int mostly16
)

Setting Keys

For these materialized views, sort and dist keys are inferred based on the query. An order by in the outermost query of a view is a strong indication that the ordered columns should be used as the sort keys.

select
  date_trunc('hour', created_at) hr,
  count(distinct user_id) ct
from  activity
group by 1
order by 1 -- hr will become the sort key

Refreshing Views

Once a view has been created the first time, it’s much easier and faster to refresh. The steps to infer the schema can be skipped. Instead, we copy the schema and encodings from the existing view’s table to a new table.

create table periscope_views.hourly_users__periscope_temp__
  like (periscope_views.hourly_users)

Then we select the updated data into the new table, drop the existing view, and rename the new one as the original.

insert into periscope_views.hourly_users__periscope_temp__ (
  select
    date_trunc('hour', created_at) hr,
    count(distinct user_id) ct
  from  activity
  group by 1
  order by 1
);

-- drop and swap
drop periscope_views.hourly_users;
alter table periscope_views.hourly_users__periscope_temp__
  rename to periscope_views.hourly_users;

Refresh Scheduling

Many of our customers are prolific view creators. Not only are the performance benefits huge, views also serve as a great place to define and control common logic. These views define official versions of data that’s been scrubbed, or store the results of large, complex joins and rollups for easy access later.

Here’s a view we’ll call channel_activity that depends on another view, lead_events, and the table channels:

select
  date(lead_events.created_at),
  channels.name,
  count(1)
from [lead_events]
  join channels
    using (channel_id)
group by 1, 2

With so many views, it’s not possible to update them all at the same time, and keeping the update latency low is critical. To solve that, we analyze the SQL of the views and build a dependency tree.

From here we see channel_activity depends on the channels table and lead_events view, and that lead_events depends on the tables web_pings and user_accounts.

We know which tables and views each view depends on, and when each was last updated in the cache. With this information, it’s easy to sort views into a dependent order to that updates propagate quickly through the tree.

Querying the Cache

Once the data is in the cache, it can be queried from Periscope Data. Since we’ve renamed the schemas from their original names, we must also be sure to use those names when setting search paths.

Falling Back

Queries in Periscope Data first try to query the cache, and if that fails for any reason, the queries fall back to the origin database.

Queries fall back when customers use exotic syntax not supported on Redshift, or are querying tables explicitly withheld from the cache.

This automatic fallback gives users the best of high-performance analysis on the cache, without sacrificing syntax flexibility or freshness for specific use cases.

Queuing

As each cache cluster is multi-tenant, there is a risk that one customer will issue thousands queries and inadvertently starve out another customer.

To handle these scenarios, we built a prioritized round-robin queuing system for queries. Each cluster can handle a small number of concurrent queries. And if more than one customer is issuing a lot of queries at the same time, query slots get distributed fairly as they free up.

At the same time, queries have different priorities. Background queries are lower priority than users refreshing charts on dashboards, and those refreshes have a lower priority than interactive analysis taking place in the Periscope Data editor.

The queue managers takes all these factors into account when deciding which query to run next on the cluster.

Background Refreshes

All charts in Periscope Data are refreshed in the background. This way customers see fresh data whenever they log in to Periscope Data. Since there are many charts and freshness is very important, background updates are always running on the cache clusters.

Here’s one cluster under normal load, background refreshes interspersed with view materialization and interactive queries:

These background refreshes have the fortuitous side effect of keeping the RAM-cache on the cluster warm for each customer. Since background updates follow the same round-robin queuing as app queries, it’s not possible for one customer on a cluster to evict another customer’s data completely from RAM.

This naturally prioritizes the data in RAM to the data that’s most frequently accessed. The most popular columns of of each customer’s data stays hot in RAM. In effect, this stabilizes the performance of the cluster for each customer and optimizes for the average query time across all customers.

Improving the Cache

The cache described above is only the beginning. There’s lots more to do to make queries faster and SQL analysis more powerful better. If you’re excited by Periscope Data’s cache sign up or join our team!


Thank you