Otimizando o desempenho no PySpark com com arquivos Parquet - Parte II

Desbloqueando a eficiência: Transformando de CSV para Parquet para uma redução de 84% no tamanho dos dados e um aumento de 98,38% no desempenho.

🕒 Tempo estimado de leitura: 12 minutos

Após algumas semanas trabalhando neste texto, minha ideia inicial de escrever algo mais simples evoluiu significativamente, remetendo-me aos dias em que escrevia minha dissertação de Mestrado. Embora o rigor de um texto acadêmico não tenha sido empregado aqui, procurei incorporar o máximo de detalhes e evidências para sustentar os números e conclusões apresentadas. Espero que vocês gostem do conteúdo e que ele seja útil para vocês.

Apache Spark é um poderoso sistema de computação distribuída conhecido por processar grandes conjuntos de dados rapidamente. No entanto, a eficiência do processamento de dados no Spark não é determinada apenas pelo mecanismo Spark em si, mas também pela escolha dos formatos de dados usados ​​para armazenamento e troca de dados.

Dois formatos de dados amplamente usados ​​são CSV (Comma-Separated Values) e Apache Parquet. Cada um tem suas próprias vantagens e desvantagens, particularmente em termos de desempenho, e entendê-los pode impactar significativamente a eficiência das suas aplicações no Spark.

No artigo Otimizando o desempenho no PySpark com com arquivos Parquet - Parte I tivemos uma visão geral de formatos de armazenamento baseado em linhas como os arquivos CSV e armazenamento baseado em colunas como o formato Parquet que vale a pena uma visita, especialmente para compreender as técnicas aplicadas no formato Parquet.

A seguir apresentamos um breve resumo dos dois formatos:

CSV (Comma-Separated Values)

Os arquivos CSV, ou Comma-Separated Values, são amplamente usados para armazenar dados devido à sua simplicidade e compatibilidade com muitos aplicativos. Eles apresentam um formato de texto simples, que é facilmente legível por humanos, permitindo que os dados sejam acessados e manipulados em várias plataformas sem a necessidade de ferramentas especiais.

Contudo, uma característica dos arquivos CSV é a ausência de metadados inerentes, o que significa que eles não armazenam informações sobre os tipos de dados contidos no arquivo. Isso pode exigir uma análise adicional para interpretar corretamente os dados. Além disso, os arquivos CSV adotam um armazenamento baseado em linha, o que implica que cada linha do arquivo representa um registro ou uma entrada distinta. Essa estrutura facilita a leitura sequencial dos dados, mas pode não ser a mais eficiente para operações que requerem o acesso aleatório ou buscas complexas dentro do dataset.

  • Vantagens:

    • Simplicidade e universalidade.

    • Fácil de criar e editar manualmente com editores de texto básicos.

  • Desvantagens:

    • Normalmente maior em tamanho.

    • Sem imposição de esquema.

    • Menos eficiente para processamento de dados em larga escala.

Apache Parquet

O formato de armazenamento em coluna do Parquet é otimizado para operações analíticas, armazenando dados coluna por coluna, o que resulta em uma eficiente organização dos dados para análises. Além disso, os arquivos Parquet mantêm metadados avançados, incluindo informações sobre tipos de dados, o que melhora a aplicação do esquema e facilita o manejo dos dados. Outro benefício significativo do Parquet é o suporte a esquemas de compressão altamente eficientes, o que reduz significativamente o tamanho dos dados armazenados, tornando o armazenamento de grandes volumes de dados mais econômico e ágil.

  • Vantagens:

    • Compressão e codificação superiores.

    • Leitura eficiente de colunas específicas.

    • Aplicação de esquema integrada.

  • Desvantagens:

    • Complexidade na criação manual de arquivos.

    • Requer bibliotecas específicas para acesso.

    • Considerações de desempenho no Apache Spark.

Tamanho dos dados

Arquivos Parquet, por design, são mais compactos devido às suas capacidades de codificação e compressão, o que é crucial ao lidar com conjuntos de dados massivos, pois reduz os custos de armazenamento. Por exemplo, um arquivo CSV de 10 GB pode ser reduzido a apenas 2 GB em formato Parquet. Essa redução no tamanho dos arquivos impacta diretamente também nas operações de leitura e escrita de dados, resultando em tempos de leitura e gravação mais rápidos, e na largura de banda da rede, uma vez que arquivos menores requerem menos dados para serem transferidos.

Como forma de evidenciar estas características, criamos no Google Colab aqui 👨‍🔬 um exemplo que compara dois arquivos (customers.csv e orders.csv) no formato original em CSV com o seu equivalente no formato Parquet sem compressão e utilizando os seguintes algoritmos de compressão: snappy, gzip, lz4 e zstd.

A seguir apresentamos a estrutura (ou schema) dos dados contidos nos arquivos customers.csv e orders.csv lidos por meio do PySpark.

Estrutura do arquivo customers.csv 

df_customers_csv.printSchema()
root
 |-- CustomerID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Phone: string (nullable = true)

Estrutura do arquivo orders.csv 

df_orders_csv.printSchema()
root
 |-- OrderID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)

Comparação dos algoritmos de compressão

Comparação dos algoritmos de compressão para o arquivo customers.csv

Comparação dos algoritmos de compressão para o arquivo orders.csv

Dados

Formato

customers (MB)

orders (MB)

CSV

1057,22

3411,84

Parquet-uncompressed

1105,21

544,68

Parquet-snappy

690,47

542,50

Parquet-gzip

434,44

512,44

Parquet-lz4

710,86

542,92

Parquet-zstd

428,77

510,38

Aqui fica evidente as capacidades do Parquet, mesmo sem uso dos algoritmos de compressão, na capacidade de redução do volume de dados armazenados. No exemplo, ao reduzir de CSV para Parquet-uncompressed (Parquet não comprimido) o arquivo orders.csv reduziu de 3411,84 MB para 544,68 MB, uma expressiva redução de 84%.

No arquivo orders.csv por exemplo temos a coluna Product que contém de forma textual o nome dos produtos adquiridos e o nome destes produtos se repetem milhões de vezes ao longo do arquivo e, aqui, a técnica Codificação de dicionário pode ter sido aplicada. O mesmo para a coluna CustomerID que se repete inúmeras vezes onde tanto a técnica Codificação de dicionário quanto Run-Length Encoding pode ter sido aplicada. Adicionalmente temos neste arquivo a coluna OrderDate que por se tratar de uma data, a técnica de Codificação Delta pode estar empregada e, com a aplicação destas técnicas foi possível alcançar a redução de 84% no tamanho do arquivo.

Caso queira compreender um pouco mais sobre as técnicas aqui mencionadas recomendamos a leitura do artigo Otimizando o desempenho no PySpark com com arquivos Parquet - Parte I .

partitionBy

Em projetos de Big Data os dados são frequentemente divididos em várias partições e essas partições permitem que os frameworks de processamento como o Spark por exemplo, processe dados em paralelo em diferentes nós do cluster. A escolha de como os dados são particionados pode impactar significativamente o desempenho das suas aplicações e o partitionBy é um método do Spark usado para especificar como os dados devem ser divididos (ou particionados) em seu cluster.

Por que usar partitionBy?

  • Desempenho: O particionamento adequado pode reduzir o embaralhamento de dados — uma operação cara — no cluster. Se as tarefas precisarem acessar dados localizados no mesmo nó em vez de embaralhá-los pela rede, elas podem ser concluídas mais rapidamente.

  • Tratamento de distorção: se os dados não forem distribuídos uniformemente, algumas partições podem ser muito maiores do que outras, levando a gargalos de desempenho. O partitionBy pode ajudar a distribuir os dados de forma mais uniforme entre as partições.

  • Colocation: às vezes, você pode querer processar ou unir dados onde linhas com a mesma chave precisam estar na mesma partição. partitionBy garante que todos os registros com a mesma chave estejam localizados juntos.

Quando você grava dados em um sistema de arquivos suportado pelo Spark (como HDFS, S3, etc.), o partitionBy permite que você especifique uma ou mais colunas pelas quais os dados devem ser particionados. Isso é particularmente útil para criar estruturas de diretório que otimizam o salto de dados (data skipping).

Usar partitionBy de forma eficiente é crucial para grandes conjuntos de dados e pode fazer uma diferença significativa na velocidade e eficiência dos seus jobs Spark. No entanto, o particionamento excessivo pode levar a arquivos pequenos, o que também pode degradar o desempenho, então é essencial testar e encontrar o equilíbrio certo para seu caso de uso específico.

No nosso exemplo optamos por aplicar a técnica de particionamento dos dados via partitionBy no dataset orders. Para isso criamos duas novas colunas, uma para o ano (year) e mês (month) a partir da data do pedido e particionamos os dados por meio dessas duas novas colunas.

Snippet do código utilizado para o particionamento

# Importa todas as funções do módulo pyspark.sql.functions para serem usadas no código
from pyspark.sql.functions import *

# Adiciona uma nova coluna chamada 'year' ao DataFrame 'df_orders_csv'
# A coluna 'year' é derivada da coluna 'OrderDate', extraindo somente o ano
df_orders_csv = df_orders_csv.withColumn('year', year(df_orders_csv.OrderDate))

# Adiciona outra nova coluna chamada 'month' ao DataFrame 'df_orders_csv'
# A coluna 'month' é derivada da coluna 'OrderDate', extraindo somente o mês
df_orders_csv = df_orders_csv.withColumn('month', month(df_orders_csv.OrderDate))

# Inicia o processo de escrita dos dados do DataFrame 'df_orders_csv'
df_orders_csv.write \
    # Define que os dados devem ser particionados pelas colunas 'year' e 'month'
    .partitionBy("year", "month")\
    # Define que, se já existirem dados no local de destino, eles devem ser sobrescritos
    .mode("overwrite") \
    # Define que o formato de saída será 'parquet', um formato de armazenamento colunar eficiente
    .format("parquet") \
    # Define a opção de compressão para o arquivo Parquet, neste caso, usando o algoritmo 'zstd' que é geralmente eficiente na compressão
    .option("compression", "zstd") \
    # Salva os dados no caminho especificado, onde as suas subpastas serão criadas com base nas partições por ano e mês
    .save(f"{folder_path}/orders/partitionBy")

Diretórios criados após o particionamento

A imagem abaixo ilustra a estrutura de diretórios criada após o particionamento executado pelo comando acima, que utiliza partitionBy para dividir os dados por ano (year) e mês (month). Assim, todas as vendas realizadas em janeiro de 2020 estarão no diretório year=2020/month=1. Dessa forma, ao buscar dados desse período específico (janeiro de 2020), o Spark saberá exatamente onde encontrar as informações, evitando procurar em locais desnecessários.

Diretórios criados após o particionamento.

Salto de dados (data skipping)

O salto de dados é uma técnica de otimização de desempenho usada em sistemas de processamento de dados. O objetivo principal do salto de dados é reduzir a quantidade de dados que precisam ser lidos e processados, acelerando assim os tempos de execução das consultas sobre os dados.

Como o salto de dados normalmente funciona:

  • Indexação de metadados: durante o processo de gravação de dados, metadados como estatísticas (por exemplo, valores mínimo e máximo) são coletados e armazenados junto com os dados. Eles podem ser estruturas indexadas ou arquivos de resumo que descrevem a distribuição de dados em blocos (por exemplo, arquivos ou partições).

  • Filtragem com base em consultas: quando uma consulta é executada, o planejador de consultas pode usar os metadados indexados para determinar quais blocos de dados podem ser ignorados. Por exemplo, se uma consulta solicitar linhas em que um valor de coluna esteja entre certos limites, blocos com valores mínimo e máximo fora desse intervalo podem ser ignorados completamente.

  • Operações de leitura e escrita reduzidas: ao pular blocos de dados que não atendem aos critérios de consulta, o salto de dados reduz a quantidade de dados lidos do armazenamento, bem como a quantidade de dados transferidos pela rede. Isso leva a resultados de consulta mais rápidos, pois menos dados passam por todo o pipeline de processamento de dados.

  • Casos de uso comuns: o salto de dados é particularmente útil em cenários com grandes conjuntos de dados e consultas seletivas, como dados de séries temporais ou aplicativos de data warehousing. É mais eficaz quando os dados são bem particionados e classificados, pois isso aumenta a capacidade dos metadados de representar intervalos de dados com precisão.

  • Formatos de arquivo e sistemas suportados: o salto de dados geralmente é suportado por formatos de armazenamento em colunas como Parquet e ORC, e estruturas de processamento de dados como Apache Spark fornecem integrações com esses formatos para aproveitar suas capacidades de indexação.

No Spark, usar um formato de arquivo como Parquet com pushdown de predicado pode aproveitar automaticamente o salto de dados, pois o Parquet armazena metadados de coluna, permitindo que o Spark pule grupos de linhas inteiras se os metadados indicarem que não existem dados relevantes.

Desempenho de consulta

Com armazenamento em colunas, os arquivos Parquet permitem que o Spark leia apenas as colunas necessárias para uma consulta. Essa leitura seletiva reduz a quantidade de dados processados ​​e melhora a velocidade. Além disso, o formato Parquet em conjunto com o particionamento dos dados permite que o Spark filtre dados no nível de armazenamento, reduzindo a carga no executor (Predicate Pushdown). Em contraste, os arquivos CSV exigem a leitura de linhas inteiras, potencialmente incluindo dados desnecessários.

Agora que temos os dados no formato Parquet utilizando um algoritmo de compressão e com o dataset particionado por meio as colunas de ano (year) e mês (month), vamos avaliar o ganho de performance que isso trouxe para as nossas análises.

Os testes foram executados com o código que busca selecionar os Top 10 Produtos com mais vendas de unidades em 2024.

Comando escrito em PySpark

df.filter(F.col('year')==2024).groupBy('Product').sum('Total').orderBy('sum(Total)', ascending = False).show()

Resultados

Para cada formato de arquivo (CSV) ou (Parquet-ZSTD Particionado) utilizado foram realizadas 10 execuções em cada. Os resultados são apresentados nos gráficos abaixo e também na tabela com os dados obtidos.

Tempo das execuções (em segundos)

Média dos tempos de execução (em segundos)

Dados

#

CSV

Parquet-ZSTD Particionado

1

211,24

7,19

2

207,91

4,73

3

221,35

4,49

4

215,33

4,51

5

224,40

3,89

6

225,15

2,05

7

207,30

1,88

8

219,15

2,05

9

221,71

1,82

10

207,75

2,44

Média

216,13

3,50

Desvio padrão

7,13

1,77

Ao analisar a média dos tempos de execução, observamos que, usando dados no formato CSV, o tempo foi de 216,13 segundos, enquanto, com dados no formato Parquet-ZSTD particionado, foi reduzido para 3,50 segundos.

Isso representa uma redução significativa de 98,38% no tempo de execução para o mesmo código, alterando apenas o formato do arquivo e utilizando a técnica de particionamento.

Conclusão

Escolher o formato de arquivo certo é crucial para a otimização do desempenho no Apache Spark. Embora a simplicidade e a universalidade do CSV sejam atraentes para tarefas simples e menores, o Apache Parquet brilha com seus benefícios de desempenho em cargas de trabalho analíticas maiores e complexas.

Aproveitar os pontos fortes de cada um com base nas necessidades específicas de suas tarefas de processamento de dados levará a trabalhos Spark mais eficientes e econômicos. Seja por meio de tempo de processamento reduzido, melhor utilização de recursos ou consistência de dados aprimorada, selecionar o formato apropriado pode fazer uma diferença notável em seus esforços de engenharia de dados.