read_parquet(
"https://github.com/apache/arrow/raw/main/r/inst/v0.7.1.parquet"
)
6 Cloud
Bigger Data, Easier Workflows
In the previous chapters, we’ve discussed working with data which is stored on a local filesystem, but Arrow also can also work with data stored on a remote machine.
If you want to read a single file directly into memory, you can pass a URL directly into a file-reading function, and the file will be downloaded to a temporary directory and then loaded into your R session.
If you want to work with multi-file datasets, however, the HTTP protocol isn’t compatible with Arrow’s ability to scan files and read metadata before data is accessed to optimize what is eventually pulled into memory upon collecting a query. Working with multi-file datasets is possible though when accessing remote data kept in cloud storage services such as Amazon Simple Storage Service (S3) and Google Cloud Storage (GCS).
There are different reasons that you might be working with cloud data, for example:
- datasets which are too large to be stored on a local machine
- datasets being accessed as part of a process rather than interactively, e.g. data for a Shiny app deployed online
- datasets which belong to someone else that you have been granted access to or using open data hosted on cloud filesystems
In these circumstances, storing data in the cloud can offer multiple benefits:
- infrastructure can be scaled easily as the data grows
- using a managed environment can increase reliability and uptime, and lower the need for maintenance
- access can be provided to people in different locations easily
However, there are some challenges which come with this, in terms of data storage and retrieval costs, as well as the potential for slow transfer times, which becomes increasing likely with larger workloads.
If the data is static (i.e. not being updated) and of a reasonable size to store on disk, then a relatively simple workflow would be to download the entire dataset and run calculations on it locally. This isn’t always feasible if the dataset is too large or if the data transfer time would negatively impact performance, and so an alternative is needed.
Fortunately, Arrow can help. Since storing data in Parquet format uses much less space than the equivalent CSV file, using Arrow can reduce both data storage and transfer costs. On top of that, transfer costs can also be further reduced by taking advantage of Arrow’s use of partitioning, only transferring the minimum data required from cloud storage to complete the query.
In this chapter, we’ll take a look at how to work with data which is hosted on cloud platforms, outline some platform-specific considerations, and show you how to work the most efficiently with cloud data.
While we focus on Parquet datasets, the techniques shown here can be used on CSV datasets. CSVs work fine, but they’re slower and more expensive. While you can work with compressed CVSs, this solves part of the problem, but not all of it1.
Many examples will look at working with data hosted on Amazon S3, but the same principles can also be used with data in GCS. There are some subtle differences between S3 and GCS which we’ll highlight when they come up and outline any differences you need to be aware of. In the future, the Apache Arrow project plans to add functionality to work with additional cloud storage services like Azure Blob Storage—this implementation and future ones relating to any other cloud storage services will also follow this model.2
In cloud storage terminology, S3 and GCS refer to the place where the data is stored as a bucket. Other systems may use alternative terms, like “blob”, but we will use “bucket” here as a generic term.
6.1 Working with data in cloud storage
Working with cloud storage services is similar in many ways to working with data stored in local filesystems, and you can use the same file and dataset opening functions for both tasks. To open a file or dataset saved in cloud storage, instead of passing in a path to a local file to these functions, you can instead pass in the cloud storage path as a URI.
read_parquet(
paste0(
"s3://scaling-arrow-pums/person/year=2005/",
"location=ak/part-0.parquet"
)
)open_dataset("s3://scaling-arrow-pums/person/")
Note that when working with data stored on GCS, even when working with a publicly accessible bucket, you’ll need to provide a login name of “anonymous”. The equivalent of the above commands for GCS—these won’t run here as these buckets haven’t been set up—would be:
read_parquet(
paste0(
"gs://anonymous@scaling-arrow-pums/person/year=2005/",
"location=ak/part-0.parquet"
)
)open_dataset("gs://anonymous@scaling-arrow-pums/person/")
Now let’s take a closer look at running queries in the cloud. If we create a new dataset connecting to the S3 bucket and take a look at the object, we’ll see it looks the same as a local dataset.
<- open_dataset("s3://scaling-arrow-pums/person/")
person_data person_data
FileSystemDataset with 884 Parquet files
311 columns
SPORDER: int32
RT: dictionary<values=string, indices=int32>
SERIALNO: string
PUMA: string
ST: string
ADJUST: int32
PWGTP: int32
AGEP: int32
CIT: dictionary<values=string, indices=int32>
COW: dictionary<values=string, indices=int32>
DDRS: bool
DEYE: bool
DOUT: bool
DPHY: bool
DREM: bool
DWRK: bool
ENG: dictionary<values=string, indices=int32>
FER: bool
GCL: bool
GCM: dictionary<values=string, indices=int32>
...
291 more columns
Use `schema()` to see entire schema
The key difference here is that we know that the data is stored on the cloud. One of the advantages of working with arrow when dealing with cloud datasets is that we can take advantage of both partitioning and lazy evaluation—we can construct the query that we’re going to run on our dataset without pulling anything into memory or transferring the data from cloud storage to our local machine.
Let’s write a query which will calculate the highest age recorded in the dataset for the state of California in 2022.
<- person_data |>
max_age_ca_2022 filter(year == 2022, location == "ca") |>
summarize(max_age = max(AGEP))
max_age_ca_2022
FileSystemDataset (query)
max_age: int32
See $.data for the source Arrow object
Again, it looks just like the same query would when set up to run on a local copy of the data. Now when we call collect()
to pull the data into our R session, we will only download a subset of the data necessary to run our query.
collect(max_age_ca_2022)
An important question to ask here is how long it took to run the query. We compared running the same query from above on the same machine—a Posit Cloud instance with 1GB of RAM—with a local copy of the data compared to the cloud version of the same data. The results are shown in Table 6.1.
Location | Time (s) |
---|---|
Local | 0.2 |
Cloud | 24.1 |
There’s a huge difference between these results: it was 120 times faster to work with a local copy of the data! This was due to the need for data transfer; in the local query, Arrow could just scan the data and perform the necessary calculations, whereas in the cloud query, we needed to download the data first before we could return it to our R session.
A reasonable question to ask here might be why did it took 24 seconds to run a query which only had 1 row of data in the results? The answer to this question is that we actually downloaded more than 1 row of data—in fact, we downloaded all of the data for California in 2022—with the final aggregation being performed locally. Let’s take a look at the reasons for this, and see what we can do to minimize data transfer in our queries.
6.2 Working efficiently with cloud data
Pulling data from cloud storage can be slow—the main bottleneck is transferring data over the internet—and it takes longer than querying data locally. Given that increased data transfer results in increased costs and slower retrieval of results, it’s important to understand how to minimize the amount of data that needs to be downloaded.
6.2.1 Minimizing data transfer
In this section, we’ll look at how we can run queries on the cloud datasets but only download a relevant subset of the data, and discuss different strategies for minimizing data transfer when working with data in cloud storage.
Tools for measuring data transfer vary between different operating systems; in the code examples below, we’ll show the output from a Linux tool called nethogs. If you want to test out data transfer yourself, see Section A.3.1 for more information about the commands we ran to measure bandwidth.
6.2.1.1 Partitioning
We introduced strategies for efficient partitioning when working with datasets in Chapter 5, but this becomes even more important when working with data in cloud storage.
The full copy of the PUMS person dataset is just under 8GB of Parquet files. Let’s say we want to collect a subset of the PUMS person dataset, filtering to include only data from respondents in California in 2022. Let’s take a look at our local copy to see how many rows of data this is.
open_dataset("./data/person") |>
filter(year == 2022, location == "ca") |>
nrow()
[1] 391171
The resulting dataset contains just under 400,000 rows of data, which takes up just under 60MB on disk.
The crucial question we want to ask next is: how much data is transferred to our local machine when run the same query on the dataset stored in S3 and then retrieve the results? Let’s run the code to get the data:
<- open_dataset("s3://scaling-arrow-pums/person") |>
ca_2022 filter(year == 2022, location == "ca") |>
collect()
Running that query downloaded 61.8MB of data, closely matching the amount of space that the Parquet files in the dataset take up on disk. It’s slightly higher by a couple of megabytes, but this is due to other transfer overhead, such as connecting to the S3 bucket itself and reading the file headers.
Now, what if we want to filter to only return data for individuals who are the maximum age we found earlier— 94? Let’s count the rows of data.
open_dataset("s3://scaling-arrow-pums/person") |>
filter(year == 2022, location == "ca", AGEP == 94) |>
nrow()
[1] 3111
This is a much smaller subset—around 3,000 rows of data compared to 400,000. So how much data is transferred when we run this on S3?
<- open_dataset("s3://scaling-arrow-pums/person") |>
ca_2022_94 filter(year == 2022, location == "ca", AGEP == 94) |>
collect()
The amount of data transferred was 61.8MB, exactly the same as last time—so what’s going on here?
Arrow is able to use the partition variables year
and location
to work out which files contain the data needed, in both examples. In the second example, arrow needs access to the values in the files to be able to filter by AGEP
and so all values in the files have to be transferred first. In this dataset, there is one file per unique combination of year
and location
and so we know the data we need must be in a single file. If the data is split across multiple files, arrow can make use of the Parquet file metadata—more on that later—to work out whether that file needs downloading.
This shows the need for careful thought when deciding how to partition your data that you’ll be keeping in cloud storage—you can reduce transfer costs significantly by partitioning data on columns which are more commonly used in filters. This must, however, be balanced with not creating too many partitions, otherwise transfers may be slowed down significantly by the need to access large numbers of individual files.
As with the examples we discussed in Chapter 5, when deciding how to partition your data, experimentation can help in working out how to strike the right balance.
While partitioning can help reduce the total amount of data transferred when working with any arrow-compatible formats in cloud storage, working specifically with Parquet files brings some additional advantages, which we’ll take a look at in the next section.
6.2.1.2 Parquet statistics
Another way in which arrow can limit the amount of data transferred over the network is taking advantage of statistics stored in Parquet metadata.
Let’s say we wanted to take the entire dataset and retrieve a subset which only includes people aged 97 or older, across all years and locations. We can run the following query.
open_dataset("s3://scaling-arrow-pums/person/") |>
filter(AGEP >= 97) |>
write_dataset("./data/transient_data/olds")
The resulting file was 228KB in total, with 110 MB of data transferred even though the query itself is not limited to our specific partitioning columns: year
or location
.
So how is it possible that we only downloaded a subset of the full dataset despite filtering on a non-partitioning column? And why did it require 110MB of data to be transferred?
Parquet metadata contains information about minimum and maximum values in each of the columns in each file. This means that arrow can inspect this metadata and only return data from files which might contain relevant values, filtering this data further locally once it’s been downloaded from cloud storage.
If we take a look at our local copy of the data, we can apply the same filter, extract the names of the files from which the filtered rows appear in, and then look at their total size.
open_dataset("data/person/") |>
filter(AGEP >= 97) |>
transmute(filename = add_filename()) |>
distinct() |>
pull(as_vector = TRUE) |>
map_int(fs::file_size) |>
sum() |>
::as_fs_bytes() fs
4.88M
This is still way less than the 110MB of data transferred, so how do we account for the additional 105MB?
The problem here is that we haven’t accounted for the data transferred when arrow reads the file headers so it can use the statistics to work out whether the file contains relevant data to filter further locally.
To find out how much data is transferred to inspect the headers, we can take a baseline measure that looks at how much data is transferred if we run a query that results in 0 rows of data being saved to disk. We can filter the dataset to only include respondents with an age greater than 1097 years.
open_dataset("s3://scaling-arrow-pums/person/") |>
filter(AGEP >= 1097) |>
write_dataset("./data/transient_data/ancients")
Nothing was written to disk as the resulting dataset contained 0 rows, but 105MB of data was transferred. Those 105MB of data are our Parquet file headers; when we add that to the total sizes of the files containing relevant data, 5MB, we get the total amount of data transferred: 110MB.
The same principle can also be applied to missing values If your data has a lot of missing values, Parquet statistics contain metadata about how many values in each column are missing, so arrow can skip transferring files when there is no data present in a column.
6.2.2 Network location and transfer speed
Another consideration when working with cloud data like this is the relative geographic locations of where the data is stored, and the location of the computer which is accessing the data.
6.2.2.1 Selecting a bucket region
If you’re setting up a new cloud storage bucket, you’ll see faster performance when querying data if you choose a region which is geographically close to the machines from which individual users or apps will be accessing the data from, and even faster performance when within the same network.
To demonstrate this, we took a look at the speed of running a query which returned the data for California in 2022.
<- tempfile()
tf dir.create(tf)
open_dataset("s3://scaling-arrow-pums/person") |>
filter(year == 2022, location == "ca") |>
write_dataset(tf)
The resulting file was a 60MB Parquet file.
We ran this query on Nic’s laptop connecting to the following buckets:
- the original S3 bucket located in
us-east-1
region - an identical bucket located in the
eu-west-2
region, in London
We then tried the queried the original S3 bucket again, but from a Posit Cloud instance deployed on Amazon EC2 in the us-east-1
region.
The average times across 3 runs are shown in Table 6.2.
Bucket Location | Access Location | Time (seconds) |
---|---|---|
Virginia, US (us-east-1) | Manchester, UK | 56 |
London, UK (eu-west-2) | Manchester, UK | 50 |
Virginia, US (us-east-1) | Virginia, US (us-east-1) | 24 |
Using a bucket in the same geographical region resulted in a slight decrease in time to run the query and collect the data, when transferring the results to work with locally. However, geographic location alone wasn’t the sole factor determining transfer times.
The time to complete the query was significantly shorter on Posit Cloud deployed on EC2 than when transferring to a machine outside of the AWS network. The speed up is because of the interconnection between AWS data centers as well as the fact that the data is now being transferred within AWS’s internal network with optimized infrastructure, rather than over the internet.
It’s also worth keeping in mind the impact on cost—not just in terms of speed, but money too. At time of writing, it was free to transfer data from an S3 bucket to another AWS service like EC2 within the same AWS region, but there were charges associated with transferring data between regions, or out to the internet, which was the most expensive of all.
6.3 Working directly with a bucket object
The previous examples in this chapter all involved working with datasets by passing in a URI. This is the simplest path to working with data in cloud storage, though you might need a finer degree of control to go beyond the default configuration. In such cases, you can work directly with a bucket object.
You can create an object representing the connection to the bucket itself, which can then be manipulated further, allowing the possibility of passing in additional parameters, such as those relating to authentication.
<- s3_bucket("scaling-arrow-pums") bucket
Now that we’re connected to the bucket, let’s take a look around. We can use the ls()
method to list all the directories inside the bucket.
$ls() bucket
[1] "household" "person" "raw_csvs" "readme.html"
If we want to look further into an individual directory, we can pass in the name of the directory to ls()
to take a look inside. Let’s check out the contents of the person
directory.
$ls("person") bucket
[1] "person/year=2005" "person/year=2006" "person/year=2007" "person/year=2008" "person/year=2009" "person/year=2010" "person/year=2011"
[8] "person/year=2012" "person/year=2013" "person/year=2014" "person/year=2015" "person/year=2016" "person/year=2017" "person/year=2018"
[15] "person/year=2019" "person/year=2021" "person/year=2022"
If we want to work just with the data in this directory, we can use the path()
method to create a new object that points just to this directory, e.g.
<- bucket$path("person") person_bucket
And what if we want to list all of the files inside our bucket? We can pass the argument recursive = TRUE
to the ls()
method. Let’s take a look at the first 10 elements of the contents of the 2022 directory.
<- person_bucket$path("year=2022")
person_2022_data head(person_2022_data$ls(recursive = TRUE), n = 10)
[1] "location=ak/part-0.parquet" "location=ak" "location=al/part-0.parquet" "location=al"
[5] "location=ar/part-0.parquet" "location=ar" "location=az/part-0.parquet" "location=az"
[9] "location=ca/part-0.parquet" "location=ca"
Just as if we wanted to list all of the files in the local copy by calling fs::dir_ls("./data/person/year=2022)
, we can see that the call to the ls()
method above lists both the directories and files stored inside of them. In S3, this is the default, but if working with GCS, you must pass in the argument recursive = TRUE
to get all of the files and directories.
Now we’ve connected to the bucket, how do we actually work with the data? As mentioned earlier in this chapter, the simplest way, if you have a single file which you want to read entirely into memory is using the same read_*
functions you’d use to work with a local file, passing in the path to the file or dataset on cloud storage.
If you’ve created a bucket object, this can also be passed into read_parquet()
and other file-reading functions or open_dataset()
.
<- open_dataset(person_bucket) person_data
The examples we’ve looked at so far have all been on a bucket that hasn’t required us to provide any login details, but what about if we want to connect to a bucket which requires us to provide credentials? We’ll take a look at that in the next section.
6.4 Authentication
There are multiple options for how provide credential when connecting to S3 or GCS, and these methods of authentication vary between providers.
In this section, we’re going to talk about:
- anonymous login
- passing credentials in manually
- using a credentials files
- using environment variables
Different methods of authentication are more suitable for different circumstances. In short:
- anonymous login is fine for accessing publicly-accessible buckets but won’t work for private buckets where you need to supply credentials
- the simplest method is to pass in your credentials manually as parameters, but it is also the least secure
- passing in credentials via an environment variable is great for when you are using a script and don’t want the details hard-coded where other people can see them
- using a credentials file removes the need to manually pass in credentials once it’s been configured
If you already have been working with cloud storage services via another program or the command line, you might already have one of these options configured. It’s important to only use one method to prevent confusion if the values are in conflict.
There are other possible methods, which you can find more information about in the AWS docs or the GCS docs. We’ve included some examples of the most common methods below. At the time of writing these are how the methods work, but this might change. As always, look to the relevant docs for the most up-to-date methods and best practices.
Generally, we recommend using a credentials file when working locally, but environment variables when working with applications deployed online.
In the next section, we’ll walk through the different options.
6.4.1 Anonmyous
If you’re connecting to a publicly accessible bucket, you can log in anonymously, but how you do this differs between S3 and AWS.
6.4.1.1 S3
If you’re connecting to a public S3 bucket, you don’t need to pass in any credentials.
<- s3_bucket("scaling-arrow-pums") bucket
However, if you already have AWS credentials configured via another method, such as a credentials file, you should pass in the anonymous = TRUE
argument to prevent those credentials being automatically detected and used, otherwise access may fail.
<- s3_bucket("scaling-arrow-pums", anonymous = TRUE) bucket
6.4.1.2 GCS
In GCS, different host names are used depending on whether the user is logged in or not. This means that if you want to connect to a GCS instance without providing authentication credentials, you must manually set anonymous
to TRUE
to make sure that the correct host name is used.
<- gs_bucket(
bucket "scaling-arrow-pums/person/",
anonymous = TRUE
)
6.4.2 Manually pass in credentials
The simplest way to connect to a private bucket is to pass in credentials manually. These methods are fine for working with code interactively, but run the risk of accidentally being checked into version control and exposing these details to others. This isn’t arrow-specific advice, but rather, is general best practice. Putting secrets in your code means they’re in your command history, as well as possibly checked into source control and exposed.
6.4.2.1 S3
In AWS S3, this is done using your login, access_key
and password, secret_key
into s3_bucket()
when creating a new connection.
<- s3_bucket(
secret_data "secret_bucket_name",
access_key = "nic",
secret_key = "12345"
)
Similarly, you can pass all of these details in as a single URI string.
<- s3_bucket("s3://nic:12345@secret_bucket_name") secret_data
6.4.2.2 GCS
In GCS, you’ll need to get an access token and its expiration date, which you can then pass into the call to gs_bucket()
.
<- gs_bucket(
secret_data "secret_bucket_name",
access_token = "ab12.ABCS6ZRVmB7fkLtd1XTmq6mo0s-6Uw7p8vtgSwg",
expiration = as.POSIXct("2024-08-09 12:00:00")
)
6.4.3 A credentials file
Credentials files can be a convenient way of configuring your authentication and other configuration without having to manually set multiple environment variables.
6.4.3.1 S3
AWS credentials files can be a convenient way of configuring your authentication and other configuration without having to manually set multiple environment variables. AWS credentials files are typically stored for Linux and macOS users at ~/.aws/credentials
or C:\Users\<username>\.aws\credentials
for Windows users, though you can store them in another location and set the AWS_SHARED_CREDENTIALS_FILE
environment variable to point to their location to ensure they are automatically detected.
When the AWS SDK is initialized, it will look for this credentials file automatically, so you don’t need to make any changes in your code in order to be able to use them.
6.4.3.2 GCS
If you need to provide credentials, and have the Google Cloud CLI installed, you can set up a local credentials file by setting up Application Default Credential (ADC) by running the following code from the command line:
gcloud auth application-default login
6.4.4 Environment variables
A more secure method of authentication when working with cloud storage in applications deployed on continuous integration (CI) systems like Github Actions is the use of environment variables. Using this method means that the credentials don’t appear in your code or console logs anywhere, and thus can be a useful of ensuring your credentials remain secure if you want to share your code with others. By setting the environment variables outside of your script, you can share your code without sharing your credentials.
6.4.4.1 S3
You can set the environment variables AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
to your access key and secret key, and the AWS SDK will automatically check if these variables have been set, and if they are, use the values in them to authenticate.
6.4.4.2 GCS
It’s a little different with GCS: if deploying an application to a CI/CD system such as GitHub Actions, the GOOGLE_APPLICATION_CREDENTIALS
environment variable should be pointed to the location of the JSON credentials file. As it wouldn’t be secure to store this file in your repository, you’ll need to take an alternative approach such as encoding your service account key and storing this value as another environment variable, decoding it within the CI, and then setting the GOOGLE_APPLICATION_CREDENTIALS
variable to this location. The details of how to do this are beyond the scope of this book, but check out the Google Cloud documentation for more information.
6.5 Configuring bucket region
If you don’t specify the region that the data is stored in, then Arrow will work it out based on your configuration and a few different heuristics.
Providing this manually will speed up the initial bucket connection, though won’t have an effect on subsequent analyses.
<- s3_bucket("scaling-arrow-pums", region = "us-east-1")
bucket <- gs_bucket("scaling-arrow-pums", region = "US-EAST1") bucket
6.6 Enabling Logging in S3
When working specifically data stored in AWS S3, arrow provides an interface to official libraries supplied by AWS, which are capable of detailed logging. By default, the AWS logging level is set to FATAL
, meaning only critical errors that cause the code to fail will be shown.
However, if things aren’t working as expected, you may want to select a different logging level to get a better idea of exactly what’s going on. You can do this by setting the ARROW_S3_LOG_LEVEL
environment variable.
Sys.setenv("ARROW_S3_LOG_LEVEL" = "DEBUG")
To manually set the logging level, you need to do this before you use any S3-related functions. If you need to change it later, you’ll need to restart your R session first. This environment variable is read the first time during your R session that you use a function in arrow which uses the AWS SDK, initializing the SDK with settings which persist for the whole session.
The possible log levels, from least verbose to most verbose are: "OFF"
, "FATAL"
(the default), "ERROR"
, "WARN"
, "INFO"
, "DEBUG"
, and "TRACE"
.
While the default logging level is usually sufficient, if you encounter issues like a slow connection or credentials not working, increasing the logging level can help you diagnose the problem.
6.7 Summary
In this chapter, we looked at working with data in cloud storage using arrow, including reading files from S3 and GCS, strategies for working efficiently with cloud data, including partitioning data effectively so that arrow can entirely skip scanning files which aren’t relevant to the current query. We also saw how working with Parquet files enables arrow to use metadata in file headers to decide whether to download an individual file or not when executing a query. Additionally, we highlighted the importance of considering where the data is being accessed from, and configuring regions to optimize performance and reduce data transfer times. Finally, we covered working with bucket objects, and authentication.
Generally, if you’re regularly analyzing data stored on S3 with arrow and looking to minimize data transfer costs, it’s worth experimenting with dataset configuration to find the most efficient setup for your particular analysis needs.
This chapter provided an overview of the key practical steps and considerations for integrating cloud storage into your data workflows using Arrow. For more advanced functionality and detailed options, refer to the documentation for the FileSystem
classes.
The lack of metadata with this format reduces the number of optimizations that Arrow can take advantage of. You can read more about working with compressed CSVs in Section 4.2.3.↩︎
At the time of writing, the Arrow C++ library has introduced support for Azure Blob Storage. Users of PyArrow can query datasets on Azure from Python, and once bindings are added to the arrow R package, it will be available from R.↩︎