V tem priročniku se bomo osredotočili predvsem na branje/nalaganje datoteke parketa v PySpark DataFrame/SQL z uporabo funkcije read.parquet(), ki je na voljo v razredu pyspark.sql.DataFrameReader.
Tema vsebine:
Preberite datoteko parketa v PySpark DataFrame
Preberite datoteko parketa v SQL PySpark
Pyspark.sql.DataFrameReader.parquet()
Ta funkcija se uporablja za branje datoteke parketa in njeno nalaganje v PySpark DataFrame. Vzame pot/ime datoteke za parket. Lahko preprosto uporabimo funkcijo read.parquet(), saj je to generična funkcija.
Sintaksa:
Oglejmo si sintakso read.parquet():
spark_app.read.parquet(ime_datoteke.parquet/pot)Najprej namestite modul PySpark z ukazom pip:
pip namestite pyspark
Pridobite pilo za parket
Za branje parketarne datoteke potrebujete podatke, v katerih je parketarna datoteka generirana iz teh podatkov. V tem delu bomo videli, kako iz PySpark DataFrame ustvariti datoteko parketa.
Ustvarimo PySpark DataFrame s 5 zapisi in to zapišimo v datoteko parketa 'industry_parquet'.
uvozi pysparkiz pyspark.sql uvozi SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()
# ustvarite podatkovni okvir, ki shranjuje podrobnosti industrije
industrial_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Kmetijstvo' ,Območje= 'ZDA' ,
Ocena= 'Vroče' ,Skupaj_zaposlenih= 100 ),
Vrstica (Vrsta= 'Kmetijstvo' ,Območje= 'Indija' ,Ocena= 'Vroče' ,Skupaj_zaposlenih= 200 ),
Vrstica (Vrsta= 'Razvoj' ,Območje= 'ZDA' ,Ocena= 'toplo' ,Skupaj_zaposlenih= 100 ),
Vrstica (Vrsta= 'Izobraževanje' ,Območje= 'ZDA' ,Ocena= 'Kul' ,Skupaj_zaposlenih= 400 ),
Vrstica (Vrsta= 'Izobraževanje' ,Območje= 'ZDA' ,Ocena= 'toplo' ,Skupaj_zaposlenih= dvajset )
])
# Dejanski DataFrame
industrija_df.show()
# Industrijo_df zapišite v datoteko parketa
industrija_df.coalesce( 1 ).piši.parket( 'industrija_parket' )
Izhod:
To je DataFrame, ki vsebuje 5 zapisov.
Za prejšnji DataFrame se ustvari datoteka parketa. Tu je ime naše datoteke s pripono »part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet«. To datoteko uporabljamo v celotni vadnici.
Preberite datoteko parketa v PySpark DataFrame
Imamo pilo za parket. Preberimo to datoteko s funkcijo read.parquet() in jo naložimo v PySpark DataFrame.
uvozi pysparkiz pyspark.sql uvozi SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()
# Preberi datoteko parketa v objekt dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Prikaz dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
Izhod:
DataFrame prikažemo z metodo show(), ki je bila ustvarjena iz datoteke parket.
SQL poizvedbe s parketno datoteko
Po nalaganju v DataFrame je mogoče ustvariti tabele SQL in prikazati podatke, ki so prisotni v DataFrame. Ustvariti moramo ZAČASNI POGLED in uporabiti ukaze SQL za vrnitev zapisov iz DataFrame, ki je ustvarjen iz datoteke parketa.
Primer 1:
Ustvarite začasni pogled z imenom »Sektorji« in uporabite ukaz SELECT za prikaz zapisov v DataFrame. Lahko se sklicujete na to vadnica ki pojasnjuje, kako ustvariti POGLED v Spark – SQL.
uvozi pysparkiz pyspark.sql uvozi SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()
# Preberi datoteko parketa v objekt dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Ustvari pogled iz zgornje datoteke parketa z imenom - 'Sektorji'
dataframe_from_parquet.createOrReplaceTempView( 'Sektorji' )
# Poizvedba za prikaz vseh zapisov iz sektorjev
linuxhint_spark_app.sql( 'izberi * iz sektorjev' ).show()
Izhod:
Primer 2:
S prejšnjim POGLEDOM napišite poizvedbo SQL:
- Za prikaz vseh zapisov iz sektorjev, ki pripadajo 'Indiji'.
- Za prikaz vseh zapisov iz sektorjev z zaposlenimi, ki so večji od 100.
linuxhint_spark_app.sql( 'izberite * med sektorji, kjer je Area='India'' ).show()
# Poizvedba za prikaz vseh zapisov iz sektorjev z več kot 100 zaposlenimi
linuxhint_spark_app.sql( 'izberite * iz sektorjev, kjer je Skupno_zaposlenih>100' ).show()
Izhod:
Obstaja samo en zapis z območjem, ki je »Indija«, in dva zapisa z zaposlenimi, ki presegajo 100.
Preberite datoteko parketa v SQL PySpark
Najprej moramo ustvariti POGLED z ukazom CREATE. Z uporabo ključne besede »path« v poizvedbi SQL lahko datoteko parketa preberemo v Spark SQL. Za potjo moramo navesti ime datoteke/lokacijo datoteke.
Sintaksa:
spark_app.sql( 'USTVARITE ZAČASNI POGLED ime_pogleda Z UPORABO MOŽNOSTI parketa (pot ' ime_datoteke.parket ')' )Primer 1:
Ustvarite začasni pogled z imenom “Sector2” in vanj preberite datoteko parketa. S funkcijo sql() napišite poizvedbo za izbiro, da prikažete vse zapise, ki so prisotni v pogledu.
uvozi pysparkiz pyspark.sql uvozi SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()
# Preberite datoteko parketa v Spark-SQL
linuxhint_spark_app.sql( 'USTVARI ZAČASNI POGLED Sektor2 Z UPORABO MOŽNOSTI parketa (pot ' del-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )
# Poizvedba za prikaz vseh zapisov iz sektorja2
linuxhint_spark_app.sql( 'izberi * iz sektorja 2' ).show()
Izhod:
Primer 2:
Uporabite prejšnji POGLED in napišite poizvedbo za prikaz vseh zapisov z oceno »Vroče« ali »Kul«.
# Poizvedba za prikaz vseh zapisov iz sektorja 2 z oceno - Hot ali Cool.linuxhint_spark_app.sql( 'izberite * iz Sektorja2, kjer je Rating='Vroče' ALI Rating='Cool'' ).show()
Izhod:
Obstajajo trije zapisi z oceno 'Vroče' ali 'Kul'.
Zaključek
V PySparku funkcija write.parquet() zapiše DataFrame v datoteko parketa. Funkcija read.parquet() prebere datoteko parketa v PySpark DataFrame ali kateri koli drug DataSource. Naučili smo se brati datoteko parketa v PySpark DataFrame in v tabelo PySpark. Kot del te vadnice smo razpravljali tudi o tem, kako ustvariti tabele iz PySpark DataFrame in filtrirati podatke s klavzulo WHERE.