No items found.
Product

Putting Snowflake’s Automatic Concurrency Scaling to the Test

At Periscope Data, we know that performance is of critical importance to our customers. You shouldn't need to wait for queries to load when you’re making decisions about the direction of your company.

Performance isn't just about how fast queries run, it's also about how many queries can run at the same time. Customers that start with modest deployments of Periscope Data in a single department often grow to have 100x their initial user count, as more and more users and departments come onto the platform. This 100x increase brings a similar increase in query volume.

Guaranteeing query performance as customers make data available to more users is hard. That's why we chose to include Snowflake in our recent launch of Data Engine: Snowflake's automatic concurrency scaling makes increasing the compute in front of data warehouses easy. This ensures our customers can scale query concurrency with their user count (and query volume) dynamically throughout the day.

We measured the throughput of typical analytics queries in a single cluster with and without auto-scaling to see just how well Snowflake's query concurrency scaling works:


Our tests showed Snowflake’s automatic concurrency scaling improved overall concurrent query performance by up to 84%. To put this into context, this means that a query that once ran for over 3 minutes can now complete in about 33 seconds.

What is automatic concurrency scaling?

A Snowflake multi-cluster warehouse consists of one or more clusters of servers that execute queries. For a given warehouse, a Snowflake customer can set both the minimum and maximum number of compute clusters to allocate to that warehouse.

In automatic scaling mode, Snowflake will start and stop these clusters based on the volume of queries being issued to the warehouse, increasing your computing power when you need it, and decreasing it when you don’t.

What’s the benefit?

In traditional data warehouses, clusters serve as both the compute resources and the data storage. Because your data already lives in the compute infrastructure, there’s no need for data transfer. Therefore, individual queries will typically execute more quickly than if the data were stored separately.

A limitation to traditional warehouses is that those resources are fixed, so the same resources are used whether you’re running one query or 100 queries.

Unlike traditional warehouses, Snowflake was built from the ground up for the cloud, which enables compute and storage to work independently. With Snowflake, you can instantly add and resize warehouses manually or automatically. Gone are the days of scheduling ETL jobs at night to avoid contention with BI workloads during the day. Now you can separate these workloads and run them in parallel using multiple compute clusters (virtual warehouses).

Using the automatic scaling mode makes this even easier. With this mode, Snowflake will automatically add and remove compute clusters based on the query workload. Since this scaling happens instantly, both up AND down, customers use the resources only when they need them and stop paying for the resources when the query workloads lower.

We tested Snowflake to understand the benefits of their scaling model.

The Test Drive

We wanted to see how Snowflake handled an increasing volume of queries issued simultaneously. We used the Snowflake Connector for Python to automate this benchmark.

import os
import snowflake.connector
connection = snowflake.connector.connect(
  user = os.environ['SNOWFLAKE_USER'],
  password = os.environ['SNOWFLAKE_PW'],
  account = os.environ['SNOWFLAKE_ACC'],
  region = os.environ['SNOWFLAKE_REG'],
  database = os.environ['SNOWFLAKE_DB'],
  warehouse = os.environ['SNOWFLAKE_WH'],
  schema = os.environ['SNOWFLAKE_SCH']
)

In our test, we used Snowflake's sample database called SNOWFLAKE_SAMPLE_DATA. We used the 10TB sample set (TPCSD_SF10TCL) and chose a query that took around 3 seconds to run on the cluster to mimic runtimes of common analytics queries.

select count(1)
from web_returns
where wr_order_number % 3 = 0
and year(current_timestamp) > 0

We included current_timestamp in our query to avoid returning a cached result. Since we were testing performance, a cached result would have been cheating.

Then we built the test. We queried the warehouse with 1, 2, 4, 8, 16, 32, 64 and 128 queries concurrently. Since Snowflake allowed us to scale between one and ten clusters, we wanted to run the whole gamut from one to ten. Here's the code:

# set the maximum number of clusters available to the warehouse
def set_max_clusters(i):
  connection.cursor().execute(
      'alter warehouse "' + os.environ['SNOWFLAKE_WH'] +
      '" set min_cluster_count = 1 max_cluster_count = ' + str(i)
  )
for num_clusters in range(1, 11):
  set_max_clusters(num_clusters)
  for concurrent_queries in [2**x for x in range(0, 8)]:
      for query_num in concurrent_queries:
          connection.cursor().execute(
              'select count(1) from web_returns ' +
              'where wr_order_number % 3 = 0 ' +
              'and year(current_timestamp) > 0'
          )

This worked except for one small issue. The queries were issued in sequence rather than concurrently! We used threads to issue these queries asynchronously and saved the results:

results = []
class ConcurrentQuery (threading.Thread):
  def __init__(self, total, counter, clusters):
      threading.Thread.__init__(self)
      self.counter = counter
      self.total = total
      self.clusters = clusters
  def run(self):
      cur = connection.cursor()
      start_ts = time.time()
      cur.execute(
          'select count(1) ' +
          'from web_returns ' +
          'where wr_order_number % 3 = 0 ' +
          'and year(current_timestamp) > 0'
      )
      end_ts = time.time()
      results.append(
          {
              'clusters': self.clusters,
              'total': self.total,
              'counter': self.counter,
              'start_time': time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(start_ts)),
              'execution_time': end_ts - start_ts,
              'query_id': cur.sfqid
          }
      )

Once we were set up to run multiple threads to issue queries, we proceeded with the test.

for num_clusters in range(1, 11):
  set_max_clusters(num_clusters)
  time.sleep(60) # it takes a bit of time for a cluster to spin up
  for concurrent_queries in [2**x for x in range(0, 8)]:
      threads = []
      for query_num in range(0, concurrent_queries):
          thread = ConcurrentQuery(concurrent_queries, query_num, num_clusters)
          threads.append(thread)
      for thread in threads:
          thread.start()
      for thread in threads:
          thread.join()
df = pandas.DataFrame(results)
df.to_csv('results/concurrency_test.csv', index=False)

To analyze our results, we used Periscope Data’s handy CSV importer to load both our output file (concurrency_test.csv) and an export of Snowflake’s query logs.

Results

We had data that ran in two dimensions: 1) number of available clusters and 2) number of concurrent queries. To visualize all of those results at once, we used Periscope’s one-click R integration to build a facet grid.

library(tidyverse)
library(ggplot2)
library(reshape2)
library(grid)
library(gtable)
df.m <- melt(df, id=c('clusters', 'queries'))
p <- df.m %>% ggplot() +
geom_bar(aes(x='Q', y=value, fill=variable), stat='identity') +
scale_fill_manual(values=c('#57b3e3','#b468d8'), name='') +
facet_grid(clusters~queries, switch='y') +
coord_flip() +
theme_light() +
scale_alpha(range = c(0.5, 1)) +
scale_y_continuous(limits = c(0, 210), breaks = c(0, 200)) +
theme(axis.title=element_blank(), axis.text=element_blank(), axis.ticks=element_blank()) +
theme(strip.text.y = element_text(angle = 180, size=10, color='black')) +
theme(strip.background =element_rect(fill="white"))+
theme(strip.text.x = element_text(size=10, color='black')) +
theme(legend.position="bottom")
z <- ggplotGrob(p)
# add label for left strip
z <- gtable_add_cols(z, unit(.5, 'null'), 0)
z <- gtable_add_grob(z,
                    list(rectGrob(gp = gpar(col = NA)),
                         textGrob('Available Clusters', rot = 90)),
                    14, 1, 20, 5, name = paste(runif(2)))
# add label for top strip
z <- gtable_add_rows(z, unit(1, 'null'), 4)
z <- gtable_add_grob(z,
                    list(rectGrob(gp = gpar(col = NA)),
                         textGrob('Concurrent Queries')),
                   6, 4, 3, 25, name = paste(runif(2)))
png(1000,500)
grid.draw(z)
periscope.image(z)

The facet grid allows us to subset our results and plot them together. The columns correspond to the number of concurrent queries issued, while the rows correspond to the number of available clusters. Traversing a single column of the grid reveals the changes in query performance as we made more clusters available. Traversing a row reveals changes in query performance as we add more queries to a fixed number of clusters. As evidenced by the grid, adding queries increased query time, while adding clusters decreased query time.

A user’s experience is the sum of two times: execution time (blue) and queued time (purple). Execution time is the time it takes to actually pull back the result of the query. Queued time is the time the query spends waiting to be executed. Queries end up in the queue if there are too many queries for the warehouse to handle at once. If a warehouse can only handle 10 queries at a time, an 11th query will sit in the queue until one of the first 10 queries finishes executing.

In this analysis, it’s important that we separate execution time from queued time. At the highest query volumes, queries spend a high percentage of their total run time in queue rather than executing. Our visualization highlights the distinction. On one cluster especially, we see a large amount of time spent in the queue. We can examine this further with a proportional bar graph:

At 128 queries on a single cluster, the average query spends a whopping 88% of its runtime in queue rather than execution. This indicates the primary reason for increased query times is a lack of resources. Jumping back to the first visualization, as we increased the number of available clusters, query times decreased primarily due to lower queue times and, to a significantly lesser extent, lower execution times. Let’s examine the case of 128 queries more closely.

Execution time doesn’t change much at all. But we see drastic decreases in average queue time — down about 95% from one cluster to ten clusters, equaling an 84% decrease in total query time.

Conclusion

At high query volumes, Snowflake’s automatic concurrency scaling provides a significant performance boost. Even though a portion of that boost relates to lower execution times, the bulk stems from radically lower queue times.

Snowflake's automatic concurrency scaling makes it easy to scale our platform to keep up with increasing query concurrency. Not just as customers grow but even as the load changes throughout the day.

You can try Snowflake's automatic concurrency scaling yourself by signing up for a free trial of Snowflake. For more information about how we use Snowflake to power our customers' analytics pipelines, take a look at Data Engine by Periscope Data or sign up for a free trial of Periscope Data!

Tags: 

Want to discuss this article? Join the Periscope Data Community!

Robert Friedland
Robert is a sales engineer at Periscope Data, where he shares his vision for data-driven decisions in all facets of life. His notable accomplishments include a script that automatically adds rooms to calendar events and an internal website that displays the day's lunch menu (with a star rating!).