• InfinitePy Newsletter 🇧🇷
  • Posts
  • Principais transformações e ações disponíveis no Apache Spark DataFrame: Uma Visão Geral com Exemplos Práticos

Principais transformações e ações disponíveis no Apache Spark DataFrame: Uma Visão Geral com Exemplos Práticos

Exemplos Esclarecedores de Transformações e Ações em DataFrames PySpark.

🕒 Tempo estimado de leitura: 11 minutos

Bem-vindo de volta à terceira parte da nossa série sobre PySpark. Se você perdeu nossos artigos anteriores, pode se atualizar clicando aqui e aqui.

Agora, vamos mergulhar no mundo PySpark com alguns exemplos práticos que também estão disponíveis no Google Colab aqui 👨‍🔬.

Apache Spark Ecosystem

O ecossistema do Apache Spark é uma poderosa plataforma de processamento de dados em grande escala que permite o processamento rápido e distribuído de grandes volumes de dados. Ele integra-se perfeitamente com diversos componentes e ferramentas como o Hadoop, Hive, HBase e Cassandra, tornando-o altamente versátil.

O Spark Core é o motor principal que gerencia a execução de jobs distribuídos, enquanto bibliotecas adicionais como Spark SQL, Spark Streaming, MLlib e GraphX oferecem suporte para consulta de dados estruturados, processamento de streams de dados em tempo real, machine learning e processamento de grafos, respectivamente.

Apache Spark Ecosystem.

Abstrações fundamentais no ecossistema Apache Spark

RDD (Resilient Distributed Dataset)

É a estrutura de dados central no Apache Spark e representa uma coleção imutável e distribuída de objetos. Um RDD é dividido em partições, que podem ser processadas paralelamente em um cluster de computadores.

Os RDDs oferecem tolerância a falhas, o que significa que podem ser reconstruídos automaticamente em caso de falhas nos nós do cluster. Os RDDs fornecem uma interface de programação mais baixa em relação a DataFrames e Datasets, sendo mais adequados para desenvolvedores que precisam de controle granular sobre o processamento de dados.

  • Uma vez criado, RDD são imutáveis.

  • Pode-se persistir ou fazer cache de RDDs na memória ou no disco.

  • RDDs Spark são tolerantes a falhas. Se um determinado nó ou tarefa falhar, o RDD pode ser construído automaticamente em nós restantes e a tarefa será finalizada.

Operações sobre RDDs podem ser transformações (como map, filter, groupByKey) ou ações (como collect, count, saveAsTextFile).

  • Transformações são operações que retornam novos RDDs e são executadas de maneira "lazy", ou seja, só são aplicadas quando uma ação é chamada.

  • Ações são operações que retornam valores ou salvam dados para armazenamento externo.

Avaliação tardia (lazy evaluation)

Avaliação tardia no Spark significa que a execução não será iniciada até que uma ação seja acionada. No Spark, a avaliação tardia surge quando ocorrem as transformações do Spark.

Avaliação tardia (lazy evaluation)

DataFrame

Um DataFrame é uma abstração de dados organizada em colunas nomeadas. É semelhante a uma tabela em um banco de dados relacional ou a um DataFrame em pandas, R ou Python. Os DataFrames no Spark são distribuídos e oferecem suporte a operações de processamento de dados em larga escala.

Os DataFrames são projetados para serem mais eficientes em termos de desempenho do que os RDDs, pois aproveitam o Catalyst Optimizer do Spark, que otimiza as consultas e realiza otimizações físicas, como projeções de coluna e filtragem pushdown.

Os DataFrames também oferecem uma API mais rica em comparação com os RDDs, tornando mais fácil e intuitivo para os desenvolvedores trabalharem com eles.

Datasets

Os Datasets são uma abstração mais recente introduzida no Spark 1.6. Eles são semelhantes aos DataFrames em termos de representação tabular de dados, mas fornecem uma tipagem estática e suporte à linguagem de programação Scala e Java. Isso significa que os Datasets têm os benefícios da inferência de tipos em tempo de compilação e verificação de erros de tipo.

Os Datasets combinam a eficiência dos DataFrames com a orientação a objetos dos RDDs. No entanto, a funcionalidade do Dataset está disponível principalmente para Scala e Java, enquanto Python tem suporte limitado a partir do Spark 3.0.

Resumo

Apache RDD (Resilient Distributed Datasets) era uma abstração fundamental no Apache Spark para processamento distribuído de dados. Os RDDs forneceram uma estrutura de processamento de dados paralelo e tolerante a falhas que permitiu aos usuários realizar cálculos distribuídos em grandes conjuntos de dados.

No entanto, houveram avanços no Spark, e abstrações mais recentes, como DataFrames e Datasets, ganharam destaque. DataFrames e Datasets fornecem uma API de nível superior e otimizações sobre RDDs, tornando-os mais fáceis de usar e eficientes para muitos casos de uso comuns.

Os RDDs ainda são usados em determinados cenários onde é necessário um controle refinado sobre o processamento de dados distribuídos. No entanto, para a maioria dos usuários, especialmente aqueles que lidam com dados estruturados, DataFrames e Datasets foram recomendados devido à sua facilidade de uso e benefícios de desempenho.

Para quem está aprendendo Apache Spark ou computação distribuída, é benéfico entender o contexto histórico dos RDDs, mas deve-se concentrar nas abstrações mais recentes, como DataFrames e Datasets, pois são mais amplamente usados em aplicativos Spark modernos.

Principais Transformações

select, seleciona um conjunto específico de colunas.

df.select("coluna1", "coluna2").show()

filter, filtra as linhas com base em uma condição.

df.filter(df["coluna"] > 10).show()

groupBy, agrupa o DataFrame por uma ou mais colunas.

df.groupBy("coluna1").agg({"coluna2": "sum"}).show()

withColumn, adiciona ou substitui uma coluna.

df.withColumn("nova_coluna", df["coluna"] * 2).show()

join, realiza uma junção entre dois DataFrames.

df1.join(df2, df1["coluna1"] == df2["coluna2"], "inner").show()

orderBy, ordena o DataFrame com base em uma ou mais colunas.

df.orderBy("coluna1", ascending=False).show()

drop, remove uma coluna do DataFrame.

df.drop("coluna").show()

distinct, retorna as linhas distintas do DataFrame.

df.distinct().show()

Principais Ações

  • show, exibe as primeiras linhas do DataFrame.

  • count, retorna o número de linhas no DataFrame.

  • collect, retorna todas as linhas do DataFrame como uma lista no programa driver.

  • take, retorna as primeiras n linhas do DataFrame.

  • describe, calcula estatísticas descritivas para colunas numéricas.

  • printSchema, exibe o esquema do DataFrame.

  • write, escreve o DataFrame em fontes externas.

Projeto em Ação: Da Teoria à Prática

Para reforçar seu conhecimento, vamos aplicar o que aprendemos em um pequeno projeto prático inspirado nos artigos Analisando dados de vendas do Excel com Python Pandas e Seaborn. Se você perdeu nossos artigos anteriores, pode se atualizar clicando aqui, aqui e aqui.

  1. Crie uma sessão Spark.

  2. Leia dois arquivos CSV em DataFrames Spark.

  3. Realize uma junção entre os dois DataFrames.

  4. Adicione uma nova coluna com operações matemáticas.

  5. Agrupar os dados por uma coluna e, em seguida, aplica uma agregação.

  6. Ordenação e exibição dos resultados finais.

  7. Salve os DataFrames resultante em um arquivo Parquet.

Abaixo está um exemplo de código comentado completo para orientá-lo:

Crie uma sessão Spark

from pyspark.sql import SparkSession

# 1. Cria a sessão Spark
# SparkSession é a entrada principal para interagir com o Spark.
# Aqui, estamos criando uma instância de SparkSession, que será usada para criar DataFrames.

# "builder" cria uma instância de Builder que permite configurar a sessão Spark.
# "appName" define o nome do aplicativo Spark para que possamos identificá-lo na interface do usuário do Spark.
# "getOrCreate" cria uma nova SparkSession se não houver nenhuma existente ou retorna a existente.
spark = SparkSession.builder.appName("Projeto Prático").getOrCreate()

# A partir deste ponto, podemos usar 'spark' para carregar dados, transformá-los e realizar várias operações analíticas.

Leia dois arquivos CSV em DataFrames Spark

o esquema (estrutura) do DataFrame.o esquema (estrutura) do DataFrame.o esquema (estrutura) do DataFrame.
#  Lê um arquivo CSV e carrega os dados em um DataFrame.
# 'read.csv' é um método fornecido pela SparkSession para ler arquivos CSV.
# "customers.csv" é o caminho para o arquivo CSV que queremos ler.
# header=True indica que o arquivo CSV tem uma linha de cabeçalho com os nomes das colunas.
# inferSchema=True permite ao Spark inferir automaticamente os tipos de dados das colunas com base na análise dos dados.
customers_df = spark.read.csv("customers.csv", header=True, inferSchema=True)

# Exibe o esquema (estrutura) do DataFrame.
# 'printSchema' mostra as colunas e seus respectivos tipos de dados inferidos.
customers_df.printSchema()

# Exibe as primeiras 5 linhas do DataFrame.
# 'show' exibe os dados na saída padrão.
# O parâmetro 5 indica quantas linhas devem ser exibidas.
# 'truncate=False' garante que o conteúdo das células não seja truncado e seja exibido por completo.
customers_df.show(5, truncate=False)

Executar o código acima produzirá a seguinte saída que exibe o esquema (estrutura) do DataFrame e as 5 primeiras linhas do DataFrame.

Esquema (estrutura) do DataFrame e as 5 primeiras linhas do DataFrame.

Realize uma junção entre os dois DataFrames

# Importa a biblioteca de funções do módulo pyspark.sql, renomeando-a como "F"
# Isso facilita a chamada de funções SQL fornecidas pelo PySpark.
from pyspark.sql import functions as F

# Realiza um join (junção) entre dois DataFrames: customers_df e orders_df.
# O join é feito com base na coluna "CustomerID" que ambos os DataFrames possuem.
customers_orders_df = customers_df.join(orders_df, customers_df["CustomerID"] == orders_df["CustomerID"])

# Mostra os primeiros 5 registros do DataFrame resultante após o join.
customers_orders_df.show(5, truncate=False)

Executar o código acima produzirá a seguinte saída.

Resultado da junção entre os dois DataFrames.

Adicione uma nova coluna com operações matemáticas

# A função withColumn aceita dois argumentos: o nome da nova coluna e a expressão de cálculo/resultante.
# Neste caso, estamos criando uma nova coluna chamada "Total".
# A expressão resultante é a multiplicação (utilizando o operador *) do valor da coluna 'Quantity' pelo valor da coluna 'Price' para cada linha do DataFrame customers_orders_df.
customers_orders_df = customers_orders_df.withColumn("Total", F.col('Quantity') * F.col("Price"))

# O método show exibe as primeiras n linhas do DataFrame, onde n é especificado pelo usuário.
customers_orders_df.show(5, truncate=False)

Executar o código acima produzirá a seguinte saída. Neste caso, estamos criando uma nova coluna chamada "Total".

Resultado após a nova coluna chamada "Total".

Agrupar os dados por uma coluna e, em seguida, aplica uma agregação

# Agrupa os dados pelo nome do produto e calcula a soma do total das vendas para cada produto
receita_por_produto_df = customers_orders_df.groupBy("Product").agg(F.sum("Total").alias("Receita Total"))

# Exibe as primeiras 5 linhas do DataFrame resultante sem truncar as colunas (mostrar todo o conteúdo das colunas)
receita_por_produto_df.show(5, truncate=False)

Executar o código acima produzirá a seguinte saída com a receita total por produto.

Receita total por produto.

Ordenação e exibição dos resultados finais

# Ordena 'receita_por_produto_df' de forma decrescente pela coluna "Receita Total"
# F.desc("Receita Total") especifica que a ordenação deve ser feita em ordem decrescente em relação à receita total
# Em SQL, seria equivalente a: ORDER BY "Receita Total" DESC
top_10_produtos_df = receita_por_produto_df.orderBy(F.desc("Receita Total")).limit(10)

# Aplica um limite de 10 linhas após a ordenação
# Ou seja, seleciona apenas os primeiros 10 produtos com as maiores receitas totais
# Em SQL, seria equivalente a: LIMIT 10

# Exibe o DataFrame resultante ('top_10_produtos_df') no console
top_10_produtos_df.show(truncate=False)

Executar o código acima produzirá a seguinte saída. Aqui é selecionado apenas os primeiros 10 produtos com as maiores receitas totais.

Primeiros 10 produtos com as maiores receitas totais.

Selecionando os Top 10 Produtos com mais vendas de unidades

# Agrupamos os dados pelo campo "Product" para combinar todas as linhas que possuem o mesmo produto.
# Em seguida, somamos a quantidade de unidades vendidas de cada produto.
# A função groupBy é utilizada para agrupar os dados e a função sum para somar os valores do campo "Quantity".

top_10_produtos_por_quantidade_df = (
    customers_orders_df
    .groupBy("Product")      # Agrupamento dos dados pelo campo "Product"
    .sum("Quantity")         # Somatório das quantidades (units) para cada produto
    .orderBy(F.desc("sum(Quantity)"))  # Ordenação em ordem decrescente pela soma das quantidades
    .limit(10)               # Limitação do resultado aos top 10 produtos com mais vendas
)

# Exibe os resultados do DataFrame
top_10_produtos_por_quantidade_df.show(truncate=False)

Executar o código acima produzirá a seguinte saída, com os top 10 produtos com mais vendas de unidades.

Top 10 produtos com mais vendas de unidades.

Salve os DataFrames resultante em um arquivo Parquet

# O código a seguir salva dois DataFrames no formato Parquet, utilizando o Apache Spark.
# Cada bloco de código salva um DataFrame diferente e usa compactação "snappy" para reduzir o tamanho do arquivo.

# Salvando o DataFrame "top_10_produtos_df" no formato Parquet
# Inicia a operação de escrita do DataFrame usando o método write
top_10_produtos_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .save("top_10_produtos.parquet")

# Salvando o DataFrame "top_10_produtos_por_quantidade_df" no formato Parquet
# Inicia a operação de escrita do DataFrame usando o método write
top_10_produtos_por_quantidade_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .save("top_10_produtos_por_quantidade.parquet")

# Explicação dos métodos utilizados
# .format("parquet")  Define o formato do arquivo de saída como "parquet"
# .mode("overwrite")  Define o modo de escrita como "overwrite". Isso significa que, se o arquivo já existir, ele será sobrescrito.
# .option("compression", "snappy")  Define a opção de compressão do arquivo como "snappy" para reduzir o espaço de armazenamento
# .save("top_10_produtos.parquet")  Especifica o caminho onde o arquivo parquet será salvo

Conclusão

Neste artigo abordamos as principais transformações e ações disponíveis no DataFrame do Apache Spark, fornecendo exemplos práticos para manipulação e processamento eficiente de grandes volumes de dados. Inicialmente, apresenta uma visão geral do ecossistema do Apache Spark e suas abstrações fundamentais, como RDD, DataFrames e Datasets, destacando suas características e casos de uso.

As transformações discutidas incluem operações como select, filter, groupBy, withColumn, join e orderBy. Entre as ações destacam-se show, printSchema, e write. Um exemplo prático é apresentado, mostrando a criação de uma sessão Spark, a manipulação de DataFrames a partir de arquivos CSV, e a realização de operações complexas, culminando com a persistência dos resultados em arquivos Parquet.

Para iniciantes e profissionais intermediários em Apache Spark, a familiarização com os DataFrames deve ser uma prioridade, dada sua capacidade de otimização e a simplicidade de sua API. Mesmo que os RDDs ainda encontrem aplicação em cenários específicos onde granularidade e controle são necessários, DataFrames e Datasets são as abstrações recomendadas devido à sua poderosa combinação de facilidade de uso e aprimoramento de desempenho.

Esperamos que este artigo tenha proporcionado uma compreensão sólida sobre a manipulação de DataFrames no Spark e que os exemplos fornecidos sirvam como um ponto de partida para suas próprias análises de dados em larga escala. Convidamos você a continuar experimentando no Google Colab e a explorar as diversas capacidades dessa ferramenta robusta para aprimorar seu conhecimento e habilidades em Apache Spark.

Para aprofundar o conhecimento e ver mais exemplos práticos, confira nossos artigos anteriores mencionados no começo desta página, além do manual oficial do Apache Spark.