Kako brati in pisati podatke tabele v PySpark

Kako Brati In Pisati Podatke Tabele V Pyspark



Obdelava podatkov v PySpark je hitrejša, če so podatki naloženi v obliki tabele. S tem, z uporabo izrazov SQL, bo obdelava hitra. Torej je pretvorba PySpark DataFrame/RDD v tabelo, preden jo pošljete v obdelavo, boljši pristop. Danes si bomo ogledali, kako prebrati podatke tabele v PySpark DataFrame, zapisati PySpark DataFrame v tabelo in vstaviti nov DataFrame v obstoječo tabelo z uporabo vgrajenih funkcij. Pojdimo!

Pyspark.sql.DataFrameWriter.saveAsTable()

Najprej si bomo ogledali, kako s funkcijo write.saveAsTable() zapisati obstoječi PySpark DataFrame v tabelo. Za pisanje DataFrame v tabelo potrebuje ime tabele in druge neobvezne parametre, kot so načini, partionBy itd. Shranjena je kot parketar.

Sintaksa:







dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
  1. Table_name je ime tabele, ki je ustvarjena iz dataframe_obj.
  2. Podatke tabele lahko dodamo/prepišemo s parametrom mode.
  3. PartitionBy vzame en/več stolpcev za ustvarjanje particij na podlagi vrednosti v teh podanih stolpcih.

Primer 1:

Ustvarite PySpark DataFrame s 5 vrsticami in 4 stolpci. Zapišite ta podatkovni okvir v tabelo z imenom »Agri_Table1«.



uvozi pyspark

iz pyspark.sql uvozi SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()

# podatki o kmetijstvu s 5 vrsticami in 5 stolpci

agri =[{ 'Soil_Type' : 'Črna' , 'Irigation_availability' : 'ne' , 'Acres' : 2500 , 'Stanje_tal' : 'suho' ,
'Država' : 'ZDA' },

{ 'Soil_Type' : 'Črna' , 'Irigation_availability' : 'Da' , 'Acres' : 3500 , 'Stanje_tal' : 'mokra' ,
'Država' : 'Indija' },

{ 'Soil_Type' : 'Rdeča' , 'Irigation_availability' : 'Da' , 'Acres' : 210 , 'Stanje_tal' : 'suho' ,
'Država' : 'UK' },

{ 'Soil_Type' : 'Drugo' , 'Irigation_availability' : 'ne' , 'Acres' : 1000 , 'Stanje_tal' : 'mokra' ,
'Država' : 'ZDA' },

{ 'Soil_Type' : 'pesek' , 'Irigation_availability' : 'ne' , 'Acres' : 500 , 'Stanje_tal' : 'suho' ,
'Država' : 'Indija' }]



# ustvarite podatkovni okvir iz zgornjih podatkov

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Zapišite zgornji DataFrame v tabelo.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Tabela1' )

Izhod:







Vidimo lahko, da je ena datoteka parketa ustvarjena s prejšnjimi podatki PySpark.



Primer 2:

Upoštevajte prejšnji DataFrame in zapišite »Agri_Table2« v tabelo tako, da razdelite zapise na podlagi vrednosti v stolpcu »Država«.

# Zapišite zgornji DataFrame v tabelo s parametrom partitionBy

agri_df.write.saveAsTable( 'Agri_Tabela2' ,partitionBy=[ 'Država' ])

Izhod:

V stolpcu »Država« so tri edinstvene vrednosti – »Indija«, »UK« in »ZDA«. Tako so ustvarjene tri particije. Vsaka pregrada drži parketne datoteke.

Pyspark.sql.DataFrameReader.table()

Naložimo tabelo v PySpark DataFrame s funkcijo spark.read.table(). Potrebuje samo en parameter, ki je ime poti/tabele. Tabelo neposredno naloži v PySpark DataFrame in vse funkcije SQL, ki se uporabljajo za PySpark DataFrame, je mogoče uporabiti tudi na tem naloženem DataFrame.

Sintaksa:

spark_app.read.table(pot/'ime_tabele')

V tem scenariju uporabljamo prejšnjo tabelo, ki je bila ustvarjena iz PySpark DataFrame. Prepričajte se, da morate v svojem okolju implementirati prejšnje odrezke kode scenarija.

primer:

Naložite tabelo »Agri_Table1« v DataFrame z imenom »loaded_data«.

naloženi_podatki = linuxhint_spark_app.read.table( 'Agri_Table1' )

loaded_data.show()

Izhod:

Vidimo lahko, da je tabela naložena v PySpark DataFrame.

Izvajanje poizvedb SQL

Zdaj izvedemo nekaj poizvedb SQL na naloženem DataFrameu s funkcijo spark.sql().

# Uporabite ukaz SELECT za prikaz vseh stolpcev iz zgornje tabele.

linuxhint_spark_app.sql( 'IZBERI * iz Agri_Table1' ).show()

# KJE Klavzula

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry' ' ).show()

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000 ' ).show()

Izhod:

  1. Prva poizvedba prikaže vse stolpce in zapise iz DataFrame.
  2. Druga poizvedba prikazuje zapise na podlagi stolpca »Soil_status«. Obstajajo samo trije zapisi z elementom 'Dry'.
  3. Zadnja poizvedba vrne dva zapisa z »Acres«, ki sta večja od 2000.

Pyspark.sql.DataFrameWriter.insertInto()

S funkcijo insertInto() lahko dodamo DataFrame v obstoječo tabelo. To funkcijo lahko uporabimo skupaj z selectExpr(), da definiramo imena stolpcev in jo nato vstavimo v tabelo. Ta funkcija kot parameter vzame tudi TableName.

Sintaksa:

DataFrame_obj.write.insertInto('Ime_tabele')

V tem scenariju uporabljamo prejšnjo tabelo, ki je bila ustvarjena iz PySpark DataFrame. Prepričajte se, da morate v svojem okolju implementirati prejšnje odrezke kode scenarija.

primer:

Ustvarite nov DataFrame z dvema zapisoma in ju vstavite v tabelo »Agri_Table1«.

uvozi pyspark

iz pyspark.sql uvozi SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()

# podatki o kmetovanju z 2 vrsticama

agri =[{ 'Soil_Type' : 'pesek' , 'Irigation_availability' : 'ne' , 'Acres' : 2500 , 'Stanje_tal' : 'suho' ,
'Država' : 'ZDA' },

{ 'Soil_Type' : 'pesek' , 'Irigation_availability' : 'ne' , 'Acres' : 1200 , 'Stanje_tal' : 'mokra' ,
'Država' : 'Japonska' }]

# ustvarite podatkovni okvir iz zgornjih podatkov

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Akri' , 'Država' , 'Razpoložljivost_namakanja' , 'vrsta_tal' ,
'Stanje_tal' ).write.insertInto( 'Agri_Tabela1' )

# Prikaži končno Agri_Table1

linuxhint_spark_app.sql( 'IZBERI * iz Agri_Table1' ).show()

Izhod:

Zdaj je skupno število vrstic, ki so prisotne v DataFrame, 7.

Zaključek

Zdaj razumete, kako zapisati PySpark DataFrame v tabelo s funkcijo write.saveAsTable(). Vzame ime tabele in druge neobvezne parametre. Nato smo to tabelo naložili v PySpark DataFrame s funkcijo spark.read.table(). Potrebuje samo en parameter, ki je ime poti/tabele. Če želite dodati nov DataFrame v obstoječo tabelo, uporabite funkcijo insertInto().