PySpark Pandas_Udf()

Pyspark Pandas Udf



Preoblikovanje PySpark DataFrame je možno s funkcijo pandas_udf(). To je uporabniško definirana funkcija, ki se uporablja na PySpark DataFrame s puščico. Vektorizirane operacije lahko izvajamo z uporabo pandas_udf(). Izvede se lahko s posredovanjem te funkcije kot dekoraterja. Poglobimo se v ta vodnik, da spoznamo sintakso, parametre in različne primere.

Tema vsebine:

Če želite izvedeti več o PySpark DataFrame in namestitvi modula, preglejte to Članek .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () je na voljo v modulu sql.functions v PySparku, ki ga je mogoče uvoziti s ključno besedo »from«. Uporablja se za izvajanje vektoriziranih operacij na našem PySpark DataFrame. Ta funkcija je implementirana kot dekorater s posredovanjem treh parametrov. Po tem lahko ustvarimo uporabniško definirano funkcijo, ki vrne podatke v vektorskem formatu (kot za to uporabljamo series/NumPy) z uporabo puščice. Znotraj te funkcije lahko vrnemo rezultat.



Struktura in sintaksa:



Najprej si poglejmo strukturo in sintakso te funkcije:

@pandas_udf(vrsta podatkov)
def ime_funkcije(operacija) -> format_pretvorbe:
izjava o vrnitvi

Tu je ime_funkcije ime naše definirane funkcije. Podatkovni tip določa podatkovni tip, ki ga vrne ta funkcija. Rezultat lahko vrnemo s ključno besedo 'return'. Vse operacije se izvajajo znotraj funkcije z dodelitvijo puščice.





Pandas_udf (funkcija in povratna vrsta)

  1. Prvi parameter je uporabniško definirana funkcija, ki mu je posredovana.
  2. Drugi parameter se uporablja za podajanje vrnjenega tipa podatkov iz funkcije.

podatki:

V tem celotnem vodniku za predstavitev uporabljamo samo en PySpark DataFrame. Vse uporabniško definirane funkcije, ki jih definiramo, so uporabljene na tem PySpark DataFrame. Prepričajte se, da ste ta DataFrame najprej ustvarili v svojem okolju po namestitvi PySpark.



uvozi pyspark

iz pyspark.sql uvozi SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Namig za Linux' ).getOrCreate()

iz pyspark.sql.functions uvoz pandas_udf

iz uvoza pyspark.sql.types *

uvozite pande kot pando

# podrobnosti o zelenjavi

zelenjava =[{ 'tip' : 'zelenjava' , 'ime' : 'paradižnik' , 'locate_country' : 'ZDA' , 'količina' : 800 },

{ 'tip' : 'sadje' , 'ime' : 'banana' , 'locate_country' : 'KITAJSKA' , 'količina' : dvajset },

{ 'tip' : 'zelenjava' , 'ime' : 'paradižnik' , 'locate_country' : 'ZDA' , 'količina' : 800 },

{ 'tip' : 'zelenjava' , 'ime' : 'Mango' , 'locate_country' : 'JAPONSKA' , 'količina' : 0 },

{ 'tip' : 'sadje' , 'ime' : 'limona' , 'locate_country' : 'INDIJA' , 'količina' : 1700 },

{ 'tip' : 'zelenjava' , 'ime' : 'paradižnik' , 'locate_country' : 'ZDA' , 'količina' : 1200 },

{ 'tip' : 'zelenjava' , 'ime' : 'Mango' , 'locate_country' : 'JAPONSKA' , 'količina' : 0 },

{ 'tip' : 'sadje' , 'ime' : 'limona' , 'locate_country' : 'INDIJA' , 'količina' : 0 }

]

# ustvarite tržni podatkovni okvir iz zgornjih podatkov

market_df = linuxhint_spark_app.createDataFrame(zelenjava)

market_df.show()

Izhod:

Tukaj ustvarimo ta DataFrame s 4 stolpci in 8 vrsticami. Zdaj uporabljamo pandas_udf() za ustvarjanje uporabniško definiranih funkcij in njihovo uporabo v teh stolpcih.

Pandas_udf() z različnimi vrstami podatkov

V tem scenariju ustvarimo nekaj uporabniško definiranih funkcij s pandas_udf() in jih uporabimo v stolpcih ter prikažemo rezultate z uporabo metode select(). V vsakem primeru uporabljamo pandas.Series, ko izvajamo vektorizirane operacije. To upošteva vrednosti stolpca kot enodimenzionalno matriko in operacija se uporabi za stolpec. V samem dekoratorju določimo vrsto vrnitve funkcije.

Primer 1: Pandas_udf() z vrsto niza

Tukaj ustvarimo dve uporabniško definirani funkciji s tipom vrnitve niza za pretvorbo vrednosti stolpca tipa niza v velike in male črke. Končno te funkcije uporabimo v stolpcih »type« in »locate_country«.

# Pretvorite stolpec vrste v velike črke s pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

vrni i.str.upper()

# Pretvorite stolpec locate_country v male črke s pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

vrni i.str.lower()

# Prikažite stolpce z uporabo select()

market_df.select( 'tip' ,tip_velike_crke( 'tip' ), 'lociraj državo' ,
država_male_crke( 'lociraj državo' )).pokaži()

Izhod:

Pojasnilo:

Funkcija StringType() je na voljo v modulu pyspark.sql.types. Ta modul smo že uvozili med ustvarjanjem PySpark DataFrame.

  1. Prvič, UDF (uporabniško določena funkcija) vrne nize z velikimi črkami s funkcijo str.upper(). Funkcija str.upper() je na voljo v podatkovni strukturi serije (saj pretvarjamo v serijo s puščico znotraj funkcije), ki dani niz pretvori v velike črke. Končno se ta funkcija uporabi za stolpec »type«, ki je določen znotraj metode select(). Prej so bili vsi nizi v stolpcu vrste napisani z malimi črkami. Zdaj so spremenjene v velike črke.
  2. Drugič, UDF vrne nize z velikimi črkami s funkcijo str.lower(). Funkcija str.lower() je na voljo v podatkovni strukturi serije, ki dani niz pretvori v male črke. Končno se ta funkcija uporabi za stolpec »type«, ki je določen znotraj metode select(). Prej so bili vsi nizi v stolpcu vrste napisani z velikimi črkami. Zdaj so spremenjene v male črke.

Primer 2: Pandas_udf() s celoštevilskim tipom

Ustvarimo UDF, ki pretvori celoštevilski stolpec PySpark DataFrame v serijo Pandas in vsaki vrednosti dodamo 100. Tej funkciji znotraj metode select() posredujte stolpec »količina«.

# Dodajte 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

vrni i+ 100

# Prenesite stolpec s količino v zgornjo funkcijo in prikaz.

market_df.select( 'količina' ,dodaj_100( 'količina' )).pokaži()

Izhod:

Pojasnilo:

Znotraj UDF ponovimo vse vrednosti in jih pretvorimo v serije. Nato vsaki vrednosti v nizu dodamo 100. Nazadnje tej funkciji posredujemo stolpec »količina« in vidimo, da je 100 dodanih vsem vrednostim.

Pandas_udf() z različnimi tipi podatkov z uporabo Groupby() & Agg()

Oglejmo si primere za posredovanje UDF v združene stolpce. Tukaj se vrednosti stolpcev najprej združijo v skupine s funkcijo groupby(), združevanje pa se izvede s funkcijo agg(). Naš UDF posredujemo znotraj te agregatne funkcije.

Sintaksa:

pyspark_dataframe_object.groupby( 'združevanje_stolpca' ).agg(UDF
(pyspark_dataframe_object[ 'stolpec' ]))

Tukaj se najprej združijo vrednosti v stolpcu za združevanje. Nato se izvede združevanje vseh združenih podatkov glede na naš UDF.

Primer 1: Pandas_udf() z agregatno srednjo vrednostjo()

Tukaj ustvarimo uporabniško definirano funkcijo s povratnim tipom float. Znotraj funkcije izračunamo povprečje s funkcijo mean(). Ta UDF se posreduje v stolpec »količina«, da se pridobi povprečna količina za vsako vrsto.

# vrni srednjo vrednost/povprečje

@pandas_udf( 'lebdeti' )

def povprečna_funkcija(i: panda.Series) -> float:

vrni i.mean()

# Prenesite stolpec količine v funkcijo tako, da združite stolpec vrste.

market_df.groupby( 'tip' ).agg(povprečna_funkcija(market_df[ 'količina' ])).pokaži()

Izhod:

Združujemo glede na elemente v stolpcu »type«. Oblikovani sta dve skupini – »sadje« in »zelenjava«. Za vsako skupino se izračuna in vrne povprečje.

Primer 2: Pandas_udf() z Aggregate Max() in Min()

Tukaj ustvarimo dve uporabniško definirani funkciji s povratnim tipom integer (int). Prvi UDF vrne najmanjšo vrednost, drugi UDF pa največjo vrednost.

# pandas_udf, ki vrne najmanjšo vrednost

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

vrni i.min()

# pandas_udf, ki vrne največjo vrednost

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

vrni i.max()

# Prenesite stolpec s količino v min_ pandas_udf z združevanjem locate_country.

market_df.groupby( 'lociraj državo' ).agg(min_(market_df[ 'količina' ])).pokaži()

# Prenesite stolpec s količino v max_ pandas_udf z združevanjem locate_country.

market_df.groupby( 'lociraj državo' ).agg(max_(market_df[ 'količina' ])).pokaži()

Izhod:

Za vrnitev najmanjših in največjih vrednosti uporabljamo funkciji min() in max() v vrnjenem tipu UDF-jev. Zdaj združujemo podatke v stolpcu »locate_country«. Oblikovane so štiri skupine (»KITAJSKA«, »INDIJA«, »JAPONSKA«, »ZDA«). Za vsako skupino vrnemo največjo količino. Podobno vrnemo minimalno količino.

Zaključek

V bistvu se pandas_udf () uporablja za izvajanje vektoriziranih operacij na našem PySpark DataFrame. Videli smo, kako ustvariti pandas_udf() in ga uporabiti v PySpark DataFrame. Za boljše razumevanje smo razpravljali o različnih primerih z upoštevanjem vseh podatkovnih tipov (niz, plavajoče in celo število). Prek funkcije agg() je mogoče uporabiti pandas_udf() z groupby().