PySpark Read.Parquet()

Pyspark Read Parquet



V PySparku funkcija write.parquet() zapiše DataFrame v datoteko parketa, read.parquet() pa prebere datoteko parketa v PySpark DataFrame ali kateri koli drug DataSource. Za hitro in učinkovito obdelavo stolpcev v Apache Spark moramo stisniti podatke. Stiskanje podatkov prihrani naš spomin in vsi stolpci se pretvorijo v ravno raven. To pomeni, da obstaja shramba na ravni stolpca. Datoteka, ki jih shranjuje, je znana kot datoteka PARQUET.

V tem priročniku se bomo osredotočili predvsem na branje/nalaganje datoteke parketa v PySpark DataFrame/SQL z uporabo funkcije read.parquet(), ki je na voljo v razredu pyspark.sql.DataFrameReader.

Tema vsebine:







Pridobite pilo za parket



Preberite datoteko parketa v PySpark DataFrame



Preberite datoteko parketa v SQL PySpark





Pyspark.sql.DataFrameReader.parquet()

Ta funkcija se uporablja za branje datoteke parketa in njeno nalaganje v PySpark DataFrame. Vzame pot/ime datoteke za parket. Lahko preprosto uporabimo funkcijo read.parquet(), saj je to generična funkcija.

Sintaksa:



Oglejmo si sintakso read.parquet():

spark_app.read.parquet(ime_datoteke.parquet/pot)

Najprej namestite modul PySpark z ukazom pip:

pip namestite pyspark

Pridobite pilo za parket

Za branje parketarne datoteke potrebujete podatke, v katerih je parketarna datoteka generirana iz teh podatkov. V tem delu bomo videli, kako iz PySpark DataFrame ustvariti datoteko parketa.

Ustvarimo PySpark DataFrame s 5 zapisi in to zapišimo v datoteko parketa 'industry_parquet'.

uvozi pyspark

iz pyspark.sql uvozi SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()

# ustvarite podatkovni okvir, ki shranjuje podrobnosti industrije

industrial_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Kmetijstvo' ,Območje= 'ZDA' ,
Ocena= 'Vroče' ,Skupaj_zaposlenih= 100 ),

Vrstica (Vrsta= 'Kmetijstvo' ,Območje= 'Indija' ,Ocena= 'Vroče' ,Skupaj_zaposlenih= 200 ),

Vrstica (Vrsta= 'Razvoj' ,Območje= 'ZDA' ,Ocena= 'toplo' ,Skupaj_zaposlenih= 100 ),

Vrstica (Vrsta= 'Izobraževanje' ,Območje= 'ZDA' ,Ocena= 'Kul' ,Skupaj_zaposlenih= 400 ),

Vrstica (Vrsta= 'Izobraževanje' ,Območje= 'ZDA' ,Ocena= 'toplo' ,Skupaj_zaposlenih= dvajset )

])

# Dejanski DataFrame

industrija_df.show()

# Industrijo_df zapišite v datoteko parketa

industrija_df.coalesce( 1 ).piši.parket( 'industrija_parket' )

Izhod:

To je DataFrame, ki vsebuje 5 zapisov.

Za prejšnji DataFrame se ustvari datoteka parketa. Tu je ime naše datoteke s pripono »part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet«. To datoteko uporabljamo v celotni vadnici.

Preberite datoteko parketa v PySpark DataFrame

Imamo pilo za parket. Preberimo to datoteko s funkcijo read.parquet() in jo naložimo v PySpark DataFrame.

uvozi pyspark

iz pyspark.sql uvozi SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()

# Preberi datoteko parketa v objekt dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Prikaz dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Izhod:

DataFrame prikažemo z metodo show(), ki je bila ustvarjena iz datoteke parket.

SQL poizvedbe s parketno datoteko

Po nalaganju v DataFrame je mogoče ustvariti tabele SQL in prikazati podatke, ki so prisotni v DataFrame. Ustvariti moramo ZAČASNI POGLED in uporabiti ukaze SQL za vrnitev zapisov iz DataFrame, ki je ustvarjen iz datoteke parketa.

Primer 1:

Ustvarite začasni pogled z imenom »Sektorji« in uporabite ukaz SELECT za prikaz zapisov v DataFrame. Lahko se sklicujete na to vadnica ki pojasnjuje, kako ustvariti POGLED v Spark – SQL.

uvozi pyspark

iz pyspark.sql uvozi SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()

# Preberi datoteko parketa v objekt dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Ustvari pogled iz zgornje datoteke parketa z imenom - 'Sektorji'

dataframe_from_parquet.createOrReplaceTempView( 'Sektorji' )

# Poizvedba za prikaz vseh zapisov iz sektorjev

linuxhint_spark_app.sql( 'izberi * iz sektorjev' ).show()

Izhod:

Primer 2:

S prejšnjim POGLEDOM napišite poizvedbo SQL:

  1. Za prikaz vseh zapisov iz sektorjev, ki pripadajo 'Indiji'.
  2. Za prikaz vseh zapisov iz sektorjev z zaposlenimi, ki so večji od 100.
# Poizvedba za prikaz vseh zapisov iz sektorjev, ki pripadajo 'Indiji'.

linuxhint_spark_app.sql( 'izberite * med sektorji, kjer je Area='India'' ).show()

# Poizvedba za prikaz vseh zapisov iz sektorjev z več kot 100 zaposlenimi

linuxhint_spark_app.sql( 'izberite * iz sektorjev, kjer je Skupno_zaposlenih>100' ).show()

Izhod:

Obstaja samo en zapis z območjem, ki je »Indija«, in dva zapisa z zaposlenimi, ki presegajo 100.

Preberite datoteko parketa v SQL PySpark

Najprej moramo ustvariti POGLED z ukazom CREATE. Z uporabo ključne besede »path« v poizvedbi SQL lahko datoteko parketa preberemo v Spark SQL. Za potjo moramo navesti ime datoteke/lokacijo datoteke.

Sintaksa:

spark_app.sql( 'USTVARITE ZAČASNI POGLED ime_pogleda Z UPORABO MOŽNOSTI parketa (pot ' ime_datoteke.parket ')' )

Primer 1:

Ustvarite začasni pogled z imenom “Sector2” in vanj preberite datoteko parketa. S funkcijo sql() napišite poizvedbo za izbiro, da prikažete vse zapise, ki so prisotni v pogledu.

uvozi pyspark

iz pyspark.sql uvozi SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()

# Preberite datoteko parketa v Spark-SQL

linuxhint_spark_app.sql( 'USTVARI ZAČASNI POGLED Sektor2 Z UPORABO MOŽNOSTI parketa (pot ' del-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Poizvedba za prikaz vseh zapisov iz sektorja2

linuxhint_spark_app.sql( 'izberi * iz sektorja 2' ).show()

Izhod:

Primer 2:

Uporabite prejšnji POGLED in napišite poizvedbo za prikaz vseh zapisov z oceno »Vroče« ali »Kul«.

# Poizvedba za prikaz vseh zapisov iz sektorja 2 z oceno - Hot ali Cool.

linuxhint_spark_app.sql( 'izberite * iz Sektorja2, kjer je Rating='Vroče' ALI Rating='Cool'' ).show()

Izhod:

Obstajajo trije zapisi z oceno 'Vroče' ali 'Kul'.

Zaključek

V PySparku funkcija write.parquet() zapiše DataFrame v datoteko parketa. Funkcija read.parquet() prebere datoteko parketa v PySpark DataFrame ali kateri koli drug DataSource. Naučili smo se brati datoteko parketa v PySpark DataFrame in v tabelo PySpark. Kot del te vadnice smo razpravljali tudi o tem, kako ustvariti tabele iz PySpark DataFrame in filtrirati podatke s klavzulo WHERE.