PolarSPARC

Introduction to Apache Parquet


Bhaskar S 10/07/2022


Overview

Data collected over a period of time (in any domain and referred to as a data set) is often used to perform data analysis for gaining insights and for future predictions. One can store the data set in CSV files for future consumption.

Data analysis typically focuses on a limited set of columns from the rows of a data set. This implies reading all the rows from the CSV file before pruning unwanted columns from the data set. What if the data set contains millions (or billions) of rows ???

Is there a better and efficient way to store large data sets for future data processing ???

Please welcome the honorable storage format - the Apache Parquet !!!

Apache Parquet is an open-source data storage format that uses an hybrid of row-column oriented storage for efficient storage and retrieval of data AND work with any choice of data processing framework, data model, or programming language.

Given a data set (of rows and columns), there are two common data processing patterns:

To get a better understanding, let us consider the following sample data set of rows (R1, R2, R3) and columns (C1, C2, C3):


Rows and Columns
Figure-1

Since OLTP processing works on a row at a time (inserts, updates, deletes), it is more efficient to store the data set in a row oriented format as shown in the illustration below:


Row Oriented
Figure-2

On the other hand, OLAP processing works by performing selections (queries with criteria) on few columns from the rows and hence, it is more efficient to store the data set in a column oriented format as shown in the illustration below:


Column Oriented
Figure-3

The Parquet format is a hybrid of the row oriented and the column oriented storage, where columns for a chunk of rows (with metadata) is stored together as shown in the illustration below:


Parquet Format
Figure-4

The hybrid storage format not only enables for efficient data storage but also for efficient data retrieval using selection criteria.

Parquet Format Internals

In order to better understand the Parquet file format, we will use the storage layout as shown in the illustration below:


Parquet Format Layout
Figure-5

A Parquet file starts with a file header, followed by one or more Row Groups, and ending with a file footer (referred to as the FileMetaData). A Row Group in turn consists of a data chunk for every column in the row of the data set (often referred to as the Column Chunk). The Column Chunk in turn is made up of one or more data Pages which contains the actual column data along with some metadata.

The Parquet file format described above is visually shown in the illustration below:


Parquet Format Visual
Figure-6

The following are some of the core parts of the file storage format:

The metadata in the parquet file allows for fast traversal (with minimal I/O) to the desired columns (using FileMetaData) and the within the columns to filter out values that do not match the criteria (using the Page statistics).

Installation and Setup

The installation is on a Ubuntu 22.04 LTS based Linux desktop.

We need to install the following python packages - pandas, pyarrow, and parquet-cli from the Ubuntu repository.

To install the mentioned packages, execute the following commands:

$ sudo apt update

$ sudo apt install pandas pyarrow parquet-cli -y

The package parquet-cli provides a basic command-line tool called parq for exploring parquet files.

Hands-on with Parquet

For the demonstration, we will create a parquet file from the Palmer Penguins data set using pandas. The following is the Python code to create three types of file formats - csv, uncompressed parquet, and compressed parquet files:


sample-1.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   07 Oct 2022
#

import logging
import pandas as pd

logging.basicConfig(format='%(levelname)s %(asctime)s - %(message)s', level=logging.INFO)


# Load the Palmer Penguins data set, clean missing values and return a dataframe to the data set
def get_palmer_penguins():
    logging.info('Ready to load the Palmer Penguins dataset...')
    url = 'https://vincentarelbundock.github.io/Rdatasets/csv/palmerpenguins/penguins.csv'
    df = pd.read_csv(url)
    logging.info('Ready to cleanse the Palmer Penguins dataset...')
    df = df.drop(df.columns[0], axis=1)
    df = df.drop([3, 271], axis=0)
    df.loc[[8, 10, 11], 'sex'] = 'female'
    df.at[9, 'sex'] = 'male'
    df.at[47, 'sex'] = 'female'
    df.loc[[178, 218, 256, 268], 'sex'] = 'female'
    logging.info('Completed cleansing the Palmer Penguins dataset...')
    return df


# Create an uncompressed csv file
def write_uncompressed_csv(path, df):
    logging.info('Ready to write the Palmer Penguins dataset as csv...')
    df.to_csv(path, compression=None, index=False)
    logging.info('Completed writing the Palmer Penguins dataset as csv...')


# Create an uncompressed parquet file
def write_uncompressed_parquet(path, df):
    logging.info('Ready to write the Palmer Penguins dataset as Uncompressed Parquet...')
    df.to_parquet(path, compression=None, index=False)
    logging.info('Completed writing the Palmer Penguins dataset as Uncompressed Parquet...')


# Create a compressed parquet file
def write_compressed_parquet(path, df):
    logging.info('Ready to write the Palmer Penguins dataset as Compressed Parquet...')
    df.to_parquet(path, compression='snappy', index=False)
    logging.info('Completed writing the Palmer Penguins dataset as Compressed Parquet...')


def main():
    penguins_df = get_palmer_penguins()
    write_uncompressed_csv('./data/unc_pp.csv', penguins_df)
    write_uncompressed_parquet('./data/unc_pp.parquet', penguins_df)
    write_compressed_parquet('./data/com_pp.parquet', penguins_df)


if __name__ == '__main__':
    main()

Executing the above Python program sample-1.py will generate the three files in the subdirectory data. The following output shows the listing of the three files:

Output.1

-rw-rw-r-- 1 polarsparc polarsparc  8761 Oct  7 19:37 com_pp.parquet
-rw-rw-r-- 1 polarsparc polarsparc 16736 Oct  7 19:37 unc_pp.csv
-rw-rw-r-- 1 polarsparc polarsparc 10234 Oct  7 19:37 unc_pp.parquet

Notice the file sizes of the three files. The compressed parquet file is the most compact.

We will now examine the compressed parquet file using the command-line tool parq.

To display the metadata information, execute the following command:

$ parq ./data/com_pp.parquet

The following would be the typical output:

Output.2

# Metadata 
<pyarrow._parquet.FileMetaData object at 0x7f5fdc6de660>
  created_by: parquet-cpp-arrow version 9.0.0
  num_columns: 8
  num_rows: 342
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 4302

To display the schema information, execute the following command:

$ parq ./data/com_pp.parquet --schema

The following would be the typical output:

Output.3

# Schema 
<pyarrow._parquet.ParquetSchema object at 0x7f9748cc2cc0>
required group field_id=-1 schema {
  optional binary field_id=-1 species (String);
  optional binary field_id=-1 island (String);
  optional double field_id=-1 bill_length_mm;
  optional double field_id=-1 bill_depth_mm;
  optional double field_id=-1 flipper_length_mm;
  optional double field_id=-1 body_mass_g;
  optional binary field_id=-1 sex (String);
  optional int64 field_id=-1 year;
}

To display the first 5 rows from the data set, execute the following command:

$ parq ./data/com_pp.parquet --head 5

The following would be the typical output:

Output.4

  species     island  bill_length_mm  bill_depth_mm  flipper_length_mm  \
0  Adelie  Torgersen            39.1           18.7              181.0   
1  Adelie  Torgersen            39.5           17.4              186.0   
2  Adelie  Torgersen            40.3           18.0              195.0   
3  Adelie  Torgersen            36.7           19.3              193.0   
4  Adelie  Torgersen            39.3           20.6              190.0   

    body_mass_g     sex  year  
0       3750.0    male  2007  
1       3800.0  female  2007  
2       3250.0  female  2007  
3       3450.0  female  2007  
4       3650.0    male  2007

To display the last 5 rows from the data set, execute the following command:

$ parq ./data/com_pp.parquet --tail 5

The following would be the typical output:

Output.5

       species island  bill_length_mm  bill_depth_mm  flipper_length_mm  \
337  Chinstrap  Dream            55.8           19.8              207.0   
338  Chinstrap  Dream            43.5           18.1              202.0   
339  Chinstrap  Dream            49.6           18.2              193.0   
340  Chinstrap  Dream            50.8           19.0              210.0   
341  Chinstrap  Dream            50.2           18.7              198.0   

      body_mass_g     sex  year  
337       4000.0    male  2009  
338       3400.0  female  2009  
339       3775.0    male  2009  
340       4100.0    male  2009  
341       3775.0  female  2009

The following is the Python code to programmatically display information (using pyarrow) about the two parquet files (uncompressed and compressed):


sample-2.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   07 Oct 2022
#

import logging
import pyarrow.parquet as pq

logging.basicConfig(format='%(levelname)s %(asctime)s - %(message)s', level=logging.INFO)


def read_uncompressed_parquet(path):
    logging.info('Ready to read the Palmer Penguins dataset from Uncompressed Parquet...\n')
    pq_unc = pq.ParquetFile(path)
    logging.info('Schema -> %s', pq_unc.schema)
    logging.info('No. of Row Groups -> %d\n', pq_unc.num_row_groups)
    logging.info('Metadata -> %s\n', pq_unc.metadata)
    logging.info('First Row Group (Metadata) -> %s\n', pq_unc.metadata.row_group(0))
    logging.info('First Row Group (Content) -> %s\n', pq_unc.read_row_group(0))
    logging.info('First Row Group, Second Column Chunk (Metadata) -> %s', pq_unc.metadata.row_group(0).column(1))
    logging.info('Completed reading the Palmer Penguins dataset from Uncompressed Parquet...')


def read_compressed_parquet(path):
    logging.info('Ready to read the Palmer Penguins dataset from Compressed Parquet...\n')
    pq_com = pq.ParquetFile(path)
    logging.info('Schema -> %s', pq_com.schema)
    logging.info('No. of Row Groups -> %d\n', pq_com.num_row_groups)
    logging.info('Metadata -> %s\n', pq_com.metadata)
    logging.info('First Row Group (Metadata) -> %s\n', pq_com.metadata.row_group(0))
    logging.info('First Row Group (Content) -> %s\n', pq_com.read_row_group(0))
    logging.info('First Row Group, Second Column Chunk (Metadata) -> %s', pq_com.metadata.row_group(0).column(1))
    logging.info('Completed reading the Palmer Penguins dataset from Compressed Parquet...')


def main():
    read_uncompressed_parquet('./data/unc_pp.parquet')
    logging.info('\n')
    logging.info('--------------------------------------------------\n')
    read_compressed_parquet('./data/com_pp.parquet')


if __name__ == '__main__':
    main()

Executing the above Python program sample-2.py will produce the following output:

Output.6

INFO 2022-10-07 19:39:23,799 - Ready to read the Palmer Penguins dataset from Uncompressed Parquet...

INFO 2022-10-07 19:39:23,800 - Schema -> <pyarrow._parquet.ParquetSchema object at 0x7f2d64991e80>
required group field_id=-1 schema {
  optional binary field_id=-1 species (String);
  optional binary field_id=-1 island (String);
  optional double field_id=-1 bill_length_mm;
  optional double field_id=-1 bill_depth_mm;
  optional double field_id=-1 flipper_length_mm;
  optional double field_id=-1 body_mass_g;
  optional binary field_id=-1 sex (String);
  optional int64 field_id=-1 year;
}

INFO 2022-10-07 19:39:23,800 - No. of Row Groups -> 1

INFO 2022-10-07 19:39:23,800 - Metadata -> <pyarrow._parquet.FileMetaData object at 0x7f2d60181210>
  created_by: parquet-cpp-arrow version 9.0.0
  num_columns: 8
  num_rows: 342
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 4302

INFO 2022-10-07 19:39:23,800 - First Row Group (Metadata) -> <pyarrow._parquet.RowGroupMetaData object at 0x7f2d5bf46bb0>
  num_columns: 8
  num_rows: 342
  total_byte_size: 5174

INFO 2022-10-07 19:39:23,824 - First Row Group (Content) -> pyarrow.Table
species: string
island: string
bill_length_mm: double
bill_depth_mm: double
flipper_length_mm: double
body_mass_g: double
sex: string
year: int64
----
species: [["Adelie","Adelie","Adelie","Adelie","Adelie",...,"Chinstrap","Chinstrap","Chinstrap","Chinstrap","Chinstrap"]]
island: [["Torgersen","Torgersen","Torgersen","Torgersen","Torgersen",...,"Dream","Dream","Dream","Dream","Dream"]]
bill_length_mm: [[39.1,39.5,40.3,36.7,39.3,...,55.8,43.5,49.6,50.8,50.2]]
bill_depth_mm: [[18.7,17.4,18,19.3,20.6,...,19.8,18.1,18.2,19,18.7]]
flipper_length_mm: [[181,186,195,193,190,...,207,202,193,210,198]]
body_mass_g: [[3750,3800,3250,3450,3650,...,4000,3400,3775,4100,3775]]
sex: [["male","female","female","female","male",...,"male","female","male","male","female"]]
year: [[2007,2007,2007,2007,2007,...,2009,2009,2009,2009,2009]]

INFO 2022-10-07 19:39:23,824 - First Row Group, Second Column Chunk (Metadata) -> <pyarrow._parquet.ColumnChunkMetaData object at 0x7f2d601d72e0>
  file_offset: 299
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 342
  path_in_schema: island
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7f2d5bf6a750>
      has_min_max: True
      min: Biscoe
      max: Torgersen
      null_count: 0
      distinct_count: 0
      num_values: 342
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: UNCOMPRESSED
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 180
  data_page_offset: 226
  total_compressed_size: 119
  total_uncompressed_size: 119
INFO 2022-10-07 19:39:23,824 - Completed reading the Palmer Penguins dataset from Uncompressed Parquet...
INFO 2022-10-07 19:39:23,824 - 

INFO 2022-10-07 19:39:23,824 - --------------------------------------------------

INFO 2022-10-07 19:39:23,825 - Ready to read the Palmer Penguins dataset from Compressed Parquet...

INFO 2022-10-07 19:39:23,825 - Schema -> <pyarrow._parquet.ParquetSchema object at 0x7f2d64e9c0c0>
required group field_id=-1 schema {
  optional binary field_id=-1 species (String);
  optional binary field_id=-1 island (String);
  optional double field_id=-1 bill_length_mm;
  optional double field_id=-1 bill_depth_mm;
  optional double field_id=-1 flipper_length_mm;
  optional double field_id=-1 body_mass_g;
  optional binary field_id=-1 sex (String);
  optional int64 field_id=-1 year;
}

INFO 2022-10-07 19:39:23,825 - No. of Row Groups -> 1

INFO 2022-10-07 19:39:23,825 - Metadata -> <pyarrow._parquet.FileMetaData object at 0x7f2d601d74c0>
  created_by: parquet-cpp-arrow version 9.0.0
  num_columns: 8
  num_rows: 342
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 4302

INFO 2022-10-07 19:39:23,825 - First Row Group (Metadata) -> <pyarrow._parquet.RowGroupMetaData object at 0x7f2d5bf46bb0>
  num_columns: 8
  num_rows: 342
  total_byte_size: 5174

INFO 2022-10-07 19:39:23,825 - First Row Group (Content) -> pyarrow.Table
species: string
island: string
bill_length_mm: double
bill_depth_mm: double
flipper_length_mm: double
body_mass_g: double
sex: string
year: int64
----
species: [["Adelie","Adelie","Adelie","Adelie","Adelie",...,"Chinstrap","Chinstrap","Chinstrap","Chinstrap","Chinstrap"]]
island: [["Torgersen","Torgersen","Torgersen","Torgersen","Torgersen",...,"Dream","Dream","Dream","Dream","Dream"]]
bill_length_mm: [[39.1,39.5,40.3,36.7,39.3,...,55.8,43.5,49.6,50.8,50.2]]
bill_depth_mm: [[18.7,17.4,18,19.3,20.6,...,19.8,18.1,18.2,19,18.7]]
flipper_length_mm: [[181,186,195,193,190,...,207,202,193,210,198]]
body_mass_g: [[3750,3800,3250,3450,3650,...,4000,3400,3775,4100,3775]]
sex: [["male","female","female","female","male",...,"male","female","male","male","female"]]
year: [[2007,2007,2007,2007,2007,...,2009,2009,2009,2009,2009]]

INFO 2022-10-07 19:39:23,826 - First Row Group, Second Column Chunk (Metadata) -> <pyarrow._parquet.ColumnChunkMetaData object at 0x7f2d5bf6b240>
  file_offset: 306
  file_path: 
  physical_type: BYTE_ARRAY
  num_values: 342
  path_in_schema: island
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7f2d5bf6b290>
      has_min_max: True
      min: Biscoe
      max: Torgersen
      null_count: 0
      distinct_count: 0
      num_values: 342
      physical_type: BYTE_ARRAY
      logical_type: String
      converted_type (legacy): UTF8
  compression: SNAPPY
  encodings: ('RLE_DICTIONARY', 'PLAIN', 'RLE')
  has_dictionary_page: True
  dictionary_page_offset: 183
  data_page_offset: 231
  total_compressed_size: 123
  total_uncompressed_size: 119
INFO 2022-10-07 19:39:23,826 - Completed reading the Palmer Penguins dataset from Compressed Parquet...

The following is the Python code to programmatically (using pyarrow) select specific columns (species and body_mass_g) from the parquet file and filter rows using a criteria (body_mass_g > 4500):


sample-3.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   07 Oct 2022
#

import logging
import pyarrow.parquet as pq

logging.basicConfig(format='%(levelname)s %(asctime)s - %(message)s', level=logging.INFO)


def select_parquet_columns(file):
    return pq.read_table(file, columns=['species', 'body_mass_g'])


def filter_parquet_columns(file):
    return pq.read_table(file, columns=['species', 'body_mass_g'], filters=[('body_mass_g', '>', 4500)])


def main():
    logging.info('Reading columns species and body_mass_g from the Palmer Penguins dataset from Parquet...')
    df = select_parquet_columns('./data/com_pp.parquet').to_pandas()
    logging.info('Displaying top 5 rows from the data set...\n')
    logging.info(df.head(5))
    logging.info('--------------------------------------------------\n')
    logging.info('Filtering column body_mass_g > 4500 from the Palmer Penguins dataset from Parquet...')
    df = filter_parquet_columns('./data/com_pp.parquet').to_pandas()
    logging.info('Displaying top 5 rows from the filtered data set...\n')
    logging.info(df.head(5))


if __name__ == '__main__':
    main()

Executing the above Python program sample-3.py will produce the following output:

Output.7

INFO 2022-10-07 19:41:28,135 - Reading columns species and body_mass_g from the Palmer Penguins dataset from Parquet...
INFO 2022-10-07 19:41:28,344 - Displaying top 5 rows from the data set...

INFO 2022-10-07 19:41:28,345 -   species  body_mass_g
0  Adelie       3750.0
1  Adelie       3800.0
2  Adelie       3250.0
3  Adelie       3450.0
4  Adelie       3650.0
INFO 2022-10-07 19:41:28,348 - --------------------------------------------------

INFO 2022-10-07 19:41:28,348 - Filtering column body_mass_g > 4500 from the Palmer Penguins dataset from Parquet...
INFO 2022-10-07 19:41:28,351 - Displaying top 5 rows from the filtered data set...

INFO 2022-10-07 19:41:28,351 -   species  body_mass_g
0  Adelie       4675.0
1  Adelie       4650.0
2  Adelie       4600.0
3  Adelie       4700.0
4  Adelie       4725.0

The following is the link to the Github Repo that provides the data files (csv, and parquet) as well as the Python code used in this article:


References

Apache Parquet

Reading and Writing the Apache Parquet



© PolarSPARC