Pretvarjanje PySpark DataFrame v CSV

Pretvarjanje Pyspark Dataframe V Csv



Oglejmo si štiri različne scenarije pretvorbe PySpark DataFrame v CSV. Neposredno uporabljamo metodo write.csv() za pretvorbo PySpark DataFrame v CSV. S funkcijo to_csv() pretvorimo PySpark Pandas DataFrame v CSV. To je mogoče tudi s pretvorbo v matriko NumPy.

Tema vsebine:

Če želite izvedeti več o PySpark DataFrame in namestitvi modula, si oglejte to Članek .







PySpark DataFrame v CSV s pretvorbo v Pandas DataFrame

To_csv() je metoda, ki je na voljo v modulu Pandas, ki pretvori Pandas DataFrame v CSV. Najprej moramo naš PySpark DataFrame pretvoriti v Pandas DataFrame. Za to se uporablja metoda toPandas(). Oglejmo si sintakso funkcije to_csv() skupaj z njenimi parametri.



Sintaksa:



pandas_dataframe_obj.to_csv(pot/ 'ime_datoteke.csv' , glava ,indeks,stolpci,način ...)
  1. Določiti moramo ime datoteke CSV. Če želite preneseni CSV shraniti na določeno mesto v računalniku, lahko poleg imena datoteke določite tudi pot.
  2. Stolpci so vključeni, če je glava nastavljena na »True«. Če stolpcev ne potrebujete, nastavite glavo na »False«.
  3. Indeksi so navedeni, če je indeks nastavljen na »True«. Če ne potrebujete indeksov, nastavite indeks na »False«.
  4. Parameter Columns vsebuje seznam imen stolpcev, v katerih lahko določimo, kateri stolpci so ekstrahirani v datoteko CSV.
  5. Zapise lahko dodajamo v CSV s parametrom mode. Pripni – za to se uporablja »a«.

1. primer: s parametri glave in indeksa

Ustvarite »skills_df« PySpark DataFrame s 3 vrsticami in 4 stolpci. Pretvorite ta DataFrame v CSV tako, da ga najprej pretvorite v Pandas DataFrame.





uvozi pyspark

iz pyspark.sql uvozi SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()

# podatki o spretnostih s 3 vrsticami in 4 stolpci

spretnosti =[{ 'id' : 123 , 'oseba' : 'Draga' , 'spretnost' : 'slika' , 'nagrada' : 25000 },

{ 'id' : 112 , 'oseba' : 'Mouni' , 'spretnost' : 'ples' , 'nagrada' : 2000 },

{ 'id' : 153 , 'oseba' : 'Tulasi' , 'spretnost' : 'branje' , 'nagrada' : 1200 }

]

# ustvarite podatkovni okvir spretnosti iz zgornjih podatkov

skills_df = linuxhint_spark_app.createDataFrame(skills)

spretnosti_df.show()

# Pretvori skills_df v pandas DataFrame

pandas_skills_df= skills_df.toPandas()

natisni (pandas_skills_df)

# Pretvorite ta DataFrame v csv z glavo in indeksom

pandas_skills_df.to_csv( 'pandas_skills1.csv' , glava =True, index=True)

Izhod:



Vidimo lahko, da je PySpark DataFrame pretvorjen v Pandas DataFrame. Poglejmo, ali je pretvorjen v CSV z imeni stolpcev in indeksi:

Primer 2: pripni podatke v CSV

Ustvarite še en PySpark DataFrame z 1 zapisom in ga pripnite v CSV, ki je ustvarjen kot del našega prvega primera. Prepričajte se, da moramo nastaviti glavo na »False« skupaj s parametrom načina. V nasprotnem primeru so imena stolpcev dodana tudi kot vrstica.

uvozi pyspark

iz pyspark.sql uvozi SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()

spretnosti =[{ 'id' : 90 , 'oseba' : 'Bhargav' , 'spretnost' : 'branje' , 'nagrada' : 12000 }

]

# ustvarite podatkovni okvir spretnosti iz zgornjih podatkov

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Pretvori skills_df v pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Dodajte ta DataFrame v datoteko pandas_skills1.csv

pandas_skills_df.to_csv( 'pandas_skills1.csv' , način= 'a' , glava =False)

Izhod CSV:

Vidimo lahko, da je v datoteko CSV dodana nova vrstica.

3. primer: s parametrom Columns

Imejmo isti DataFrame in ga pretvorimo v CSV z dvema stolpcema: »oseba« in »nagrada«.

uvozi pyspark

iz pyspark.sql uvozi SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()

# podatki o spretnostih s 3 vrsticami in 4 stolpci

spretnosti =[{ 'id' : 123 , 'oseba' : 'Draga' , 'spretnost' : 'slika' , 'nagrada' : 25000 },

{ 'id' : 112 , 'oseba' : 'Mouni' , 'spretnost' : 'ples' , 'nagrada' : 2000 },

{ 'id' : 153 , 'oseba' : 'Tulasi' , 'spretnost' : 'branje' , 'nagrada' : 1200 }

]

# ustvarite podatkovni okvir spretnosti iz zgornjih podatkov

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Pretvori skills_df v pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Pretvorite ta DataFrame v csv z določenimi stolpci

pandas_skills_df.to_csv( 'pandas_skills2.csv' , stolpci=[ 'oseba' , 'nagrada' ])

Izhod CSV:

Vidimo lahko, da v datoteki CSV obstajata samo stolpca »oseba« in »nagrada«.

PySpark Pandas DataFrame v CSV z uporabo metode To_Csv().

To_csv() je metoda, ki je na voljo v modulu Pandas, ki pretvori Pandas DataFrame v CSV. Najprej moramo naš PySpark DataFrame pretvoriti v Pandas DataFrame. Za to se uporablja metoda toPandas(). Oglejmo si sintakso funkcije to_csv() skupaj z njenimi parametri:

Sintaksa:

pyspark_pandas_dataframe_obj.to_csv(pot/ 'ime_datoteke.csv' , glava ,indeks,stolpci,...)
  1. Določiti moramo ime datoteke CSV. Če želite preneseni CSV shraniti na določeno mesto v računalniku, lahko poleg imena datoteke določite tudi pot.
  2. Stolpci so vključeni, če je glava nastavljena na »True«. Če stolpcev ne potrebujete, nastavite glavo na »False«.
  3. Indeksi so navedeni, če je indeks nastavljen na »True«. Če ne potrebujete indeksov, nastavite indeks na »False«.
  4. Parameter stolpcev vsebuje seznam imen stolpcev, v katerih lahko določimo, kateri stolpci so izvlečeni v datoteko CSV.

1. primer: s parametrom Columns

Ustvarite PySpark Pandas DataFrame s 3 stolpci in ga pretvorite v CSV z uporabo to_csv() s stolpcema »person« in »prize«.

iz pyspark uvoz pand

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'oseba' :[ 'Draga' , 'Mouni' , 'sam' , 'radha' ], 'nagrada' :[ 1 , 2 , 3 , 4 ]})

natisni (pyspark_pandas_dataframe)

# Pretvorite ta DataFrame v csv z določenimi stolpci

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , stolpci=[ 'oseba' , 'nagrada' ])

Izhod:

Vidimo lahko, da je PySpark Pandas DataFrame pretvorjen v CSV z dvema particijama. Vsaka particija vsebuje 2 zapisa. Poleg tega sta stolpca v CSV samo »oseba« in »nagrada«.

Particijska datoteka 1:

Particijska datoteka 2:

2. primer: s parametrom glave

Uporabite prejšnji DataFrame in določite parameter glave tako, da ga nastavite na »True«.

iz pyspark uvoz pand

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'oseba' :[ 'Draga' , 'Mouni' , 'sam' , 'radha' ], 'nagrada' :[ 1 , 2 , 3 , 4 ]})

# Pretvorite ta DataFrame v csv z glavo.

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , glava = res)

Izhod CSV:

Vidimo lahko, da je PySpark Pandas DataFrame pretvorjen v CSV z dvema particijama. Vsaka particija vsebuje 2 zapisa z imeni stolpcev.

Particijska datoteka 1:

Particijska datoteka 2:

PySpark Pandas DataFrame v CSV s pretvorbo v NumPy Array

Imamo možnost pretvoriti PySpark Pandas DataFrame v CSV s pretvorbo v polje Numpy. To_numpy() je metoda, ki je na voljo v modulu PySpark Pandas, ki pretvori PySpark Pandas DataFrame v matriko NumPy.

Sintaksa:

pyspark_pandas_dataframe_obj.to_numpy()

Ne bo sprejel nobenih parametrov.

Uporaba metode Tofile().

Po pretvorbi v matriko NumPy lahko uporabimo metodo tofile() za pretvorbo NumPy v CSV. Tu shrani vsak zapis v novo celico v stolpcu v datoteki CSV.

Sintaksa:

array_obj.to_numpy(ime datoteke/pot,sep=’’)

Vzame ime datoteke ali pot CSV in ločilo.

primer:

Ustvarite PySpark Pandas DataFrame s 3 stolpci in 4 zapisi in ga pretvorite v CSV tako, da ga najprej pretvorite v matriko NumPy.

iz pyspark uvoz pand

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'oseba' :[ 'Draga' , 'Mouni' , 'sam' , 'radha' ], 'nagrada' :[ 1 , 2 , 3 , 4 ]})

# Pretvorite zgornji DataFrame v matriko numpy

pretvorjeno = pyspark_pandas_dataframe.to_numpy()

natisni (pretvorjeno)

# Uporaba tofile()

pretvorjeno.tofile( 'converted1.csv' , sep = ',' )

Izhod:

[[ 90 'Draga' 1 ]

[ 78 'Mouni' 2 ]

[ 90 'sam' 3 ]

[ 57 'radha' 4 ]]

Vidimo lahko, da je PySpark Pandas DataFrame pretvorjen v matriko NumPy (12 vrednosti). Če vidite podatke CSV, shrani vsako vrednost celice v nov stolpec.

PySpark DataFrame v CSV z uporabo metode Write.Csv().

Metoda write.csv() kot parameter vzame ime/pot datoteke, kamor moramo shraniti datoteko CSV.

Sintaksa:

dataframe_object.coalesce( 1 ).write.csv( 'Ime datoteke' )

Pravzaprav je CSV shranjen kot particija (več kot ena). Da bi se tega znebili, združimo vse particionirane datoteke CSV v eno. V tem scenariju uporabljamo funkcijo coalesce(). Zdaj lahko vidimo samo eno datoteko CSV z vsemi vrsticami iz PySpark DataFrame.

primer:

Razmislite o PySpark DataFrame s 4 zapisi s 4 stolpci. Zapišite ta DataFrame v CSV z datoteko z imenom »market_details«.

uvozi pyspark

iz pyspark.sql uvozi SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()

# tržni podatki s 4 vrsticami in 4 stolpci

trg =[{ 'm_id' : 'mz-001' , 'm_name' : 'ABC' , 'm_city' : 'delhi' , 'm_state' : 'delhi' },

{ 'm_id' : 'mz-002' , 'm_name' : 'XYZ' , 'm_city' : 'patna' , 'm_state' : 'lucknow' },

{ 'm_id' : 'mz-003' , 'm_name' : 'PQR' , 'm_city' : 'florida' , 'm_state' : 'ena' },

{ 'm_id' : 'mz-004' , 'm_name' : 'ABC' , 'm_city' : 'delhi' , 'm_state' : 'lucknow' }

]



# ustvarite tržni podatkovni okvir iz zgornjih podatkov

market_df = linuxhint_spark_app.createDataFrame(market)

# Dejanski tržni podatki

market_df.show()

# write.csv()

market_df.coalesce( 1 ).write.csv( 'tržni_podrobnosti' )

Izhod:

Preverimo datoteko:

Za ogled zapisov odprite zadnjo datoteko.

Zaključek

Naučili smo se štirih različnih scenarijev, ki pretvorijo PySpark DataFrame v CSV s primeri ob upoštevanju različnih parametrov. Ko delate s PySpark DataFrame, imate dve možnosti za pretvorbo tega DataFrame v CSV: en način je uporaba metode write(), drugi pa uporaba metode to_csv() s pretvorbo v Pandas DataFrame. Če delate s PySpark Pandas DataFrame, lahko uporabite tudi to_csv() in tofile() s pretvorbo v matriko NumPy.