Pyspark.sql.DataFrameWriter.saveAsTable()
Najprej si bomo ogledali, kako s funkcijo write.saveAsTable() zapisati obstoječi PySpark DataFrame v tabelo. Za pisanje DataFrame v tabelo potrebuje ime tabele in druge neobvezne parametre, kot so načini, partionBy itd. Shranjena je kot parketar.
Sintaksa:
dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
- Table_name je ime tabele, ki je ustvarjena iz dataframe_obj.
- Podatke tabele lahko dodamo/prepišemo s parametrom mode.
- PartitionBy vzame en/več stolpcev za ustvarjanje particij na podlagi vrednosti v teh podanih stolpcih.
Primer 1:
Ustvarite PySpark DataFrame s 5 vrsticami in 4 stolpci. Zapišite ta podatkovni okvir v tabelo z imenom »Agri_Table1«.
uvozi pyspark
iz pyspark.sql uvozi SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()
# podatki o kmetijstvu s 5 vrsticami in 5 stolpci
agri =[{ 'Soil_Type' : 'Črna' , 'Irigation_availability' : 'ne' , 'Acres' : 2500 , 'Stanje_tal' : 'suho' ,
'Država' : 'ZDA' },
{ 'Soil_Type' : 'Črna' , 'Irigation_availability' : 'Da' , 'Acres' : 3500 , 'Stanje_tal' : 'mokra' ,
'Država' : 'Indija' },
{ 'Soil_Type' : 'Rdeča' , 'Irigation_availability' : 'Da' , 'Acres' : 210 , 'Stanje_tal' : 'suho' ,
'Država' : 'UK' },
{ 'Soil_Type' : 'Drugo' , 'Irigation_availability' : 'ne' , 'Acres' : 1000 , 'Stanje_tal' : 'mokra' ,
'Država' : 'ZDA' },
{ 'Soil_Type' : 'pesek' , 'Irigation_availability' : 'ne' , 'Acres' : 500 , 'Stanje_tal' : 'suho' ,
'Država' : 'Indija' }]
# ustvarite podatkovni okvir iz zgornjih podatkov
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Zapišite zgornji DataFrame v tabelo.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Tabela1' )
Izhod:
Vidimo lahko, da je ena datoteka parketa ustvarjena s prejšnjimi podatki PySpark.
Primer 2:
Upoštevajte prejšnji DataFrame in zapišite »Agri_Table2« v tabelo tako, da razdelite zapise na podlagi vrednosti v stolpcu »Država«.
# Zapišite zgornji DataFrame v tabelo s parametrom partitionByagri_df.write.saveAsTable( 'Agri_Tabela2' ,partitionBy=[ 'Država' ])
Izhod:
V stolpcu »Država« so tri edinstvene vrednosti – »Indija«, »UK« in »ZDA«. Tako so ustvarjene tri particije. Vsaka pregrada drži parketne datoteke.
Pyspark.sql.DataFrameReader.table()
Naložimo tabelo v PySpark DataFrame s funkcijo spark.read.table(). Potrebuje samo en parameter, ki je ime poti/tabele. Tabelo neposredno naloži v PySpark DataFrame in vse funkcije SQL, ki se uporabljajo za PySpark DataFrame, je mogoče uporabiti tudi na tem naloženem DataFrame.
Sintaksa:
spark_app.read.table(pot/'ime_tabele')V tem scenariju uporabljamo prejšnjo tabelo, ki je bila ustvarjena iz PySpark DataFrame. Prepričajte se, da morate v svojem okolju implementirati prejšnje odrezke kode scenarija.
primer:
Naložite tabelo »Agri_Table1« v DataFrame z imenom »loaded_data«.
naloženi_podatki = linuxhint_spark_app.read.table( 'Agri_Table1' )loaded_data.show()
Izhod:
Vidimo lahko, da je tabela naložena v PySpark DataFrame.
Izvajanje poizvedb SQL
Zdaj izvedemo nekaj poizvedb SQL na naloženem DataFrameu s funkcijo spark.sql().
# Uporabite ukaz SELECT za prikaz vseh stolpcev iz zgornje tabele.linuxhint_spark_app.sql( 'IZBERI * iz Agri_Table1' ).show()
# KJE Klavzula
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry' ' ).show()
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000 ' ).show()
Izhod:
- Prva poizvedba prikaže vse stolpce in zapise iz DataFrame.
- Druga poizvedba prikazuje zapise na podlagi stolpca »Soil_status«. Obstajajo samo trije zapisi z elementom 'Dry'.
- Zadnja poizvedba vrne dva zapisa z »Acres«, ki sta večja od 2000.
Pyspark.sql.DataFrameWriter.insertInto()
S funkcijo insertInto() lahko dodamo DataFrame v obstoječo tabelo. To funkcijo lahko uporabimo skupaj z selectExpr(), da definiramo imena stolpcev in jo nato vstavimo v tabelo. Ta funkcija kot parameter vzame tudi TableName.
Sintaksa:
DataFrame_obj.write.insertInto('Ime_tabele')V tem scenariju uporabljamo prejšnjo tabelo, ki je bila ustvarjena iz PySpark DataFrame. Prepričajte se, da morate v svojem okolju implementirati prejšnje odrezke kode scenarija.
primer:
Ustvarite nov DataFrame z dvema zapisoma in ju vstavite v tabelo »Agri_Table1«.
uvozi pysparkiz pyspark.sql uvozi SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()
# podatki o kmetovanju z 2 vrsticama
agri =[{ 'Soil_Type' : 'pesek' , 'Irigation_availability' : 'ne' , 'Acres' : 2500 , 'Stanje_tal' : 'suho' ,
'Država' : 'ZDA' },
{ 'Soil_Type' : 'pesek' , 'Irigation_availability' : 'ne' , 'Acres' : 1200 , 'Stanje_tal' : 'mokra' ,
'Država' : 'Japonska' }]
# ustvarite podatkovni okvir iz zgornjih podatkov
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Akri' , 'Država' , 'Razpoložljivost_namakanja' , 'vrsta_tal' ,
'Stanje_tal' ).write.insertInto( 'Agri_Tabela1' )
# Prikaži končno Agri_Table1
linuxhint_spark_app.sql( 'IZBERI * iz Agri_Table1' ).show()
Izhod:
Zdaj je skupno število vrstic, ki so prisotne v DataFrame, 7.
Zaključek
Zdaj razumete, kako zapisati PySpark DataFrame v tabelo s funkcijo write.saveAsTable(). Vzame ime tabele in druge neobvezne parametre. Nato smo to tabelo naložili v PySpark DataFrame s funkcijo spark.read.table(). Potrebuje samo en parameter, ki je ime poti/tabele. Če želite dodati nov DataFrame v obstoječo tabelo, uporabite funkcijo insertInto().