Tema vsebine:
- PySpark DataFrame v CSV s pretvorbo v Pandas DataFrame
- PySpark Pandas DataFrame v CSV z uporabo metode To_Csv().
- PySpark Pandas DataFrame v CSV s pretvorbo v NumPy Array
- PySpark DataFrame v CSV z uporabo metode Write.Csv().
Če želite izvedeti več o PySpark DataFrame in namestitvi modula, si oglejte to Članek .
PySpark DataFrame v CSV s pretvorbo v Pandas DataFrame
To_csv() je metoda, ki je na voljo v modulu Pandas, ki pretvori Pandas DataFrame v CSV. Najprej moramo naš PySpark DataFrame pretvoriti v Pandas DataFrame. Za to se uporablja metoda toPandas(). Oglejmo si sintakso funkcije to_csv() skupaj z njenimi parametri.
Sintaksa:
pandas_dataframe_obj.to_csv(pot/ 'ime_datoteke.csv' , glava ,indeks,stolpci,način ...)
- Določiti moramo ime datoteke CSV. Če želite preneseni CSV shraniti na določeno mesto v računalniku, lahko poleg imena datoteke določite tudi pot.
- Stolpci so vključeni, če je glava nastavljena na »True«. Če stolpcev ne potrebujete, nastavite glavo na »False«.
- Indeksi so navedeni, če je indeks nastavljen na »True«. Če ne potrebujete indeksov, nastavite indeks na »False«.
- Parameter Columns vsebuje seznam imen stolpcev, v katerih lahko določimo, kateri stolpci so ekstrahirani v datoteko CSV.
- Zapise lahko dodajamo v CSV s parametrom mode. Pripni – za to se uporablja »a«.
1. primer: s parametri glave in indeksa
Ustvarite »skills_df« PySpark DataFrame s 3 vrsticami in 4 stolpci. Pretvorite ta DataFrame v CSV tako, da ga najprej pretvorite v Pandas DataFrame.
uvozi pyspark
iz pyspark.sql uvozi SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()
# podatki o spretnostih s 3 vrsticami in 4 stolpci
spretnosti =[{ 'id' : 123 , 'oseba' : 'Draga' , 'spretnost' : 'slika' , 'nagrada' : 25000 },
{ 'id' : 112 , 'oseba' : 'Mouni' , 'spretnost' : 'ples' , 'nagrada' : 2000 },
{ 'id' : 153 , 'oseba' : 'Tulasi' , 'spretnost' : 'branje' , 'nagrada' : 1200 }
]
# ustvarite podatkovni okvir spretnosti iz zgornjih podatkov
skills_df = linuxhint_spark_app.createDataFrame(skills)
spretnosti_df.show()
# Pretvori skills_df v pandas DataFrame
pandas_skills_df= skills_df.toPandas()
natisni (pandas_skills_df)
# Pretvorite ta DataFrame v csv z glavo in indeksom
pandas_skills_df.to_csv( 'pandas_skills1.csv' , glava =True, index=True)
Izhod:
Vidimo lahko, da je PySpark DataFrame pretvorjen v Pandas DataFrame. Poglejmo, ali je pretvorjen v CSV z imeni stolpcev in indeksi:
Primer 2: pripni podatke v CSV
Ustvarite še en PySpark DataFrame z 1 zapisom in ga pripnite v CSV, ki je ustvarjen kot del našega prvega primera. Prepričajte se, da moramo nastaviti glavo na »False« skupaj s parametrom načina. V nasprotnem primeru so imena stolpcev dodana tudi kot vrstica.
uvozi pysparkiz pyspark.sql uvozi SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()
spretnosti =[{ 'id' : 90 , 'oseba' : 'Bhargav' , 'spretnost' : 'branje' , 'nagrada' : 12000 }
]
# ustvarite podatkovni okvir spretnosti iz zgornjih podatkov
skills_df = linuxhint_spark_app.createDataFrame(skills)
# Pretvori skills_df v pandas DataFrame
pandas_skills_df= skills_df.toPandas()
# Dodajte ta DataFrame v datoteko pandas_skills1.csv
pandas_skills_df.to_csv( 'pandas_skills1.csv' , način= 'a' , glava =False)
Izhod CSV:
Vidimo lahko, da je v datoteko CSV dodana nova vrstica.
3. primer: s parametrom Columns
Imejmo isti DataFrame in ga pretvorimo v CSV z dvema stolpcema: »oseba« in »nagrada«.
uvozi pysparkiz pyspark.sql uvozi SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()
# podatki o spretnostih s 3 vrsticami in 4 stolpci
spretnosti =[{ 'id' : 123 , 'oseba' : 'Draga' , 'spretnost' : 'slika' , 'nagrada' : 25000 },
{ 'id' : 112 , 'oseba' : 'Mouni' , 'spretnost' : 'ples' , 'nagrada' : 2000 },
{ 'id' : 153 , 'oseba' : 'Tulasi' , 'spretnost' : 'branje' , 'nagrada' : 1200 }
]
# ustvarite podatkovni okvir spretnosti iz zgornjih podatkov
skills_df = linuxhint_spark_app.createDataFrame(skills)
# Pretvori skills_df v pandas DataFrame
pandas_skills_df= skills_df.toPandas()
# Pretvorite ta DataFrame v csv z določenimi stolpci
pandas_skills_df.to_csv( 'pandas_skills2.csv' , stolpci=[ 'oseba' , 'nagrada' ])
Izhod CSV:
Vidimo lahko, da v datoteki CSV obstajata samo stolpca »oseba« in »nagrada«.
PySpark Pandas DataFrame v CSV z uporabo metode To_Csv().
To_csv() je metoda, ki je na voljo v modulu Pandas, ki pretvori Pandas DataFrame v CSV. Najprej moramo naš PySpark DataFrame pretvoriti v Pandas DataFrame. Za to se uporablja metoda toPandas(). Oglejmo si sintakso funkcije to_csv() skupaj z njenimi parametri:
Sintaksa:
pyspark_pandas_dataframe_obj.to_csv(pot/ 'ime_datoteke.csv' , glava ,indeks,stolpci,...)- Določiti moramo ime datoteke CSV. Če želite preneseni CSV shraniti na določeno mesto v računalniku, lahko poleg imena datoteke določite tudi pot.
- Stolpci so vključeni, če je glava nastavljena na »True«. Če stolpcev ne potrebujete, nastavite glavo na »False«.
- Indeksi so navedeni, če je indeks nastavljen na »True«. Če ne potrebujete indeksov, nastavite indeks na »False«.
- Parameter stolpcev vsebuje seznam imen stolpcev, v katerih lahko določimo, kateri stolpci so izvlečeni v datoteko CSV.
1. primer: s parametrom Columns
Ustvarite PySpark Pandas DataFrame s 3 stolpci in ga pretvorite v CSV z uporabo to_csv() s stolpcema »person« in »prize«.
iz pyspark uvoz pandpyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'oseba' :[ 'Draga' , 'Mouni' , 'sam' , 'radha' ], 'nagrada' :[ 1 , 2 , 3 , 4 ]})
natisni (pyspark_pandas_dataframe)
# Pretvorite ta DataFrame v csv z določenimi stolpci
pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , stolpci=[ 'oseba' , 'nagrada' ])
Izhod:
Vidimo lahko, da je PySpark Pandas DataFrame pretvorjen v CSV z dvema particijama. Vsaka particija vsebuje 2 zapisa. Poleg tega sta stolpca v CSV samo »oseba« in »nagrada«.
Particijska datoteka 1:
Particijska datoteka 2:
2. primer: s parametrom glave
Uporabite prejšnji DataFrame in določite parameter glave tako, da ga nastavite na »True«.
iz pyspark uvoz pandpyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'oseba' :[ 'Draga' , 'Mouni' , 'sam' , 'radha' ], 'nagrada' :[ 1 , 2 , 3 , 4 ]})
# Pretvorite ta DataFrame v csv z glavo.
pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , glava = res)
Izhod CSV:
Vidimo lahko, da je PySpark Pandas DataFrame pretvorjen v CSV z dvema particijama. Vsaka particija vsebuje 2 zapisa z imeni stolpcev.
Particijska datoteka 1:
Particijska datoteka 2:
PySpark Pandas DataFrame v CSV s pretvorbo v NumPy Array
Imamo možnost pretvoriti PySpark Pandas DataFrame v CSV s pretvorbo v polje Numpy. To_numpy() je metoda, ki je na voljo v modulu PySpark Pandas, ki pretvori PySpark Pandas DataFrame v matriko NumPy.
Sintaksa:
pyspark_pandas_dataframe_obj.to_numpy()Ne bo sprejel nobenih parametrov.
Uporaba metode Tofile().
Po pretvorbi v matriko NumPy lahko uporabimo metodo tofile() za pretvorbo NumPy v CSV. Tu shrani vsak zapis v novo celico v stolpcu v datoteki CSV.
Sintaksa:
array_obj.to_numpy(ime datoteke/pot,sep=’’)Vzame ime datoteke ali pot CSV in ločilo.
primer:
Ustvarite PySpark Pandas DataFrame s 3 stolpci in 4 zapisi in ga pretvorite v CSV tako, da ga najprej pretvorite v matriko NumPy.
iz pyspark uvoz pandpyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'oseba' :[ 'Draga' , 'Mouni' , 'sam' , 'radha' ], 'nagrada' :[ 1 , 2 , 3 , 4 ]})
# Pretvorite zgornji DataFrame v matriko numpy
pretvorjeno = pyspark_pandas_dataframe.to_numpy()
natisni (pretvorjeno)
# Uporaba tofile()
pretvorjeno.tofile( 'converted1.csv' , sep = ',' )
Izhod:
[[ 90 'Draga' 1 ][ 78 'Mouni' 2 ]
[ 90 'sam' 3 ]
[ 57 'radha' 4 ]]
Vidimo lahko, da je PySpark Pandas DataFrame pretvorjen v matriko NumPy (12 vrednosti). Če vidite podatke CSV, shrani vsako vrednost celice v nov stolpec.
PySpark DataFrame v CSV z uporabo metode Write.Csv().
Metoda write.csv() kot parameter vzame ime/pot datoteke, kamor moramo shraniti datoteko CSV.
Sintaksa:
dataframe_object.coalesce( 1 ).write.csv( 'Ime datoteke' )Pravzaprav je CSV shranjen kot particija (več kot ena). Da bi se tega znebili, združimo vse particionirane datoteke CSV v eno. V tem scenariju uporabljamo funkcijo coalesce(). Zdaj lahko vidimo samo eno datoteko CSV z vsemi vrsticami iz PySpark DataFrame.
primer:
Razmislite o PySpark DataFrame s 4 zapisi s 4 stolpci. Zapišite ta DataFrame v CSV z datoteko z imenom »market_details«.
uvozi pysparkiz pyspark.sql uvozi SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()
# tržni podatki s 4 vrsticami in 4 stolpci
trg =[{ 'm_id' : 'mz-001' , 'm_name' : 'ABC' , 'm_city' : 'delhi' , 'm_state' : 'delhi' },
{ 'm_id' : 'mz-002' , 'm_name' : 'XYZ' , 'm_city' : 'patna' , 'm_state' : 'lucknow' },
{ 'm_id' : 'mz-003' , 'm_name' : 'PQR' , 'm_city' : 'florida' , 'm_state' : 'ena' },
{ 'm_id' : 'mz-004' , 'm_name' : 'ABC' , 'm_city' : 'delhi' , 'm_state' : 'lucknow' }
]
# ustvarite tržni podatkovni okvir iz zgornjih podatkov
market_df = linuxhint_spark_app.createDataFrame(market)
# Dejanski tržni podatki
market_df.show()
# write.csv()
market_df.coalesce( 1 ).write.csv( 'tržni_podrobnosti' )
Izhod:
Preverimo datoteko:
Za ogled zapisov odprite zadnjo datoteko.
Zaključek
Naučili smo se štirih različnih scenarijev, ki pretvorijo PySpark DataFrame v CSV s primeri ob upoštevanju različnih parametrov. Ko delate s PySpark DataFrame, imate dve možnosti za pretvorbo tega DataFrame v CSV: en način je uporaba metode write(), drugi pa uporaba metode to_csv() s pretvorbo v Pandas DataFrame. Če delate s PySpark Pandas DataFrame, lahko uporabite tudi to_csv() in tofile() s pretvorbo v matriko NumPy.