Kako implementirati pretakanje podatkov v realnem času v Pythonu

Kako Implementirati Pretakanje Podatkov V Realnem Casu V Pythonu



Obvladovanje izvajanja pretakanja podatkov v realnem času v Pythonu je bistvena veščina v današnjem svetu, ki vključuje podatke. Ta priročnik raziskuje temeljne korake in bistvena orodja za uporabo pretakanja podatkov v realnem času z avtentičnostjo v Pythonu. Od izbire primernega ogrodja, kot sta Apache Kafka ali Apache Pulsar, do pisanja kode Python za preprosto porabo podatkov, obdelavo in učinkovito vizualizacijo, pridobili bomo potrebne veščine za izdelavo agilnih in učinkovitih podatkovnih kanalov v realnem času.

Primer 1: Implementacija pretakanja podatkov v realnem času v Pythonu

Implementacija pretakanja podatkov v realnem času v Pythonu je ključnega pomena v današnji dobi in svetu, ki temeljijo na podatkih. V tem podrobnem primeru se bomo sprehodili skozi postopek izdelave sistema za pretakanje podatkov v realnem času z uporabo Apache Kafka in Python v storitvi Google Colab.







Za inicializacijo primera, preden začnemo kodirati, je bistvenega pomena zgraditi specifično okolje v storitvi Google Colab. Prva stvar, ki jo moramo narediti, je namestitev potrebnih knjižnic. Za integracijo Kafke uporabljamo knjižnico »kafka-python«.



! pip namestite kafka-python


Ta ukaz namesti knjižnico »kafka-python«, ki nudi funkcije Python in vezave za Apache Kafka. Nato uvozimo zahtevane knjižnice za naš projekt. Uvoz zahtevanih knjižnic, vključno z »KafkaProducer« in »KafkaConsumer«, sta razreda iz knjižnice »kafka-python«, ki nam omogočata interakcijo s posredniki Kafka. JSON je knjižnica Python za delo s podatki JSON, ki jih uporabljamo za serializacijo in deserializacijo sporočil.



iz kafka import KafkaProducer, KafkaConsumer
uvozi json


Ustvarjanje Kafkinega producenta





To je pomembno, ker producent Kafke pošlje podatke temi Kafka. V našem primeru ustvarimo proizvajalca za pošiljanje simuliranih podatkov v realnem času v temo, imenovano »tema v realnem času«.

Ustvarimo primerek »KafkaProducer«, ki podaja naslov posrednika Kafka kot »localhost:9092«. Nato uporabimo »value_serializer«, funkcijo, ki serializira podatke, preden jih pošlje Kafki. V našem primeru funkcija lambda kodira podatke kot JSON, kodiran z UTF-8. Zdaj pa simulirajmo nekaj podatkov v realnem času in jih pošljimo temi Kafka.



producent = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
serializator_vrednosti =lambda v: json.dumps ( v ) .kodirati ( 'utf-8' ) )
# Simulirani podatki v realnem času
podatki = { 'sensor_id' : 1 , 'temperatura' : 25.5 , 'vlažnost' : 60.2 }
# Pošiljanje podatkov v temo
producent.pošlji ( 'tema v realnem času' , podatki )


V teh vrsticah definiramo »podatkovni« slovar, ki predstavlja simulirane podatke senzorja. Nato uporabimo metodo »pošlji« za objavo teh podatkov v »temi v realnem času«.

Nato želimo ustvariti porabnika Kafka, porabnik Kafka pa prebere podatke iz teme Kafka. Ustvarimo potrošnika, ki porabi in obdeluje sporočila v »temi v realnem času«. Ustvarimo primerek »KafkaConsumer«, pri čemer določimo temo, ki jo želimo porabiti, npr. (tema v realnem času) in naslov posrednika Kafka. Nato je »value_deserializer« funkcija, ki deserializira podatke, prejete od Kafke. V našem primeru funkcija lambda dekodira podatke kot JSON, kodiran z UTF-8.

potrošnik = KafkaPotrošnik ( 'tema v realnem času' ,
bootstrap_servers = 'localhost:9092' ,
deserializator_vrednosti =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )


Uporabljamo iterativno zanko za neprekinjeno porabo in obdelavo sporočil iz teme.

# Branje in obdelava podatkov v realnem času
za sporočilo v potrošnik:
podatki = sporočilo.vrednost
tiskanje ( f 'Prejeti podatki: {podatki}' )


V zanki pridobimo vrednost vsakega sporočila in simulirane podatke senzorja ter jih natisnemo na konzolo. Zagon proizvajalca in potrošnika Kafka vključuje izvajanje te kode v storitvi Google Colab in izvajanje posameznih celic kode. Proizvajalec pošlje simulirane podatke v temo Kafka, potrošnik pa prejete podatke prebere in natisne.


Analiza izhoda med izvajanjem kode

Opazovali bomo podatke, ki se proizvajajo in porabljajo v realnem času. Format podatkov se lahko razlikuje glede na našo simulacijo ali dejanski vir podatkov. V tem podrobnem primeru pokrivamo celoten postopek nastavitve sistema za pretakanje podatkov v realnem času z uporabo Apache Kafka in Python v storitvi Google Colab. Razložili bomo vsako vrstico kode in njen pomen pri izgradnji tega sistema. Pretakanje podatkov v realnem času je zmogljiva zmogljivost in ta primer služi kot osnova za bolj zapletene aplikacije v realnem svetu.

Primer 2: Implementacija pretakanja podatkov v realnem času v Pythonu z uporabo podatkov o borzi

Naredimo še en edinstven primer izvajanja pretakanja podatkov v realnem času v Pythonu z uporabo drugačnega scenarija; tokrat se bomo posvetili borznim podatkom. Ustvarimo sistem za pretakanje podatkov v realnem času, ki zajema spremembe cen delnic in jih obdeluje z uporabo Apache Kafka in Python v Google Colab. Kot je prikazano v prejšnjem primeru, začnemo s konfiguracijo našega okolja v storitvi Google Colab. Najprej namestimo zahtevane knjižnice:

! pip namestite kafka-python yfinance


Tukaj dodamo knjižnico »yfinance«, ki nam omogoča pridobivanje podatkov o borzi v realnem času. Nato uvozimo potrebne knjižnice. Še naprej uporabljamo razreda »KafkaProducer« in »KafkaConsumer« iz knjižnice »kafka-python« za interakcijo Kafka. JSON uvozimo za delo s podatki JSON. Uporabljamo tudi »yfinance« za pridobivanje podatkov o borzi v realnem času. Uvozimo tudi »časovno« knjižnico, da dodamo časovni zamik za simulacijo posodobitev v realnem času.

iz kafka import KafkaProducer, KafkaConsumer
uvozi json
import yfinance kot yf
uvoz čas


Zdaj ustvarimo proizvajalca Kafka za podatke o zalogah. Naš proizvajalec Kafka dobi podatke o delnicah v realnem času in jih pošlje v temo Kafka z imenom »borzna cena«.

producent = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
serializator_vrednosti =lambda v: json.dumps ( v ) .kodirati ( 'utf-8' ) )

medtem Prav:
zaloga = yf.Ticker ( 'AAPL' ) # Primer: delnica Apple Inc
podatki o zalogi = zgodovina zalog ( obdobje = '1d' )
zadnja_cena = podatki o zalogi [ 'Zapri' ] .iloc [ - 1 ]
podatki = { 'simbol' : 'AAPL' , 'cena' : zadnja_cena }
producent.pošlji ( 'cena delnic' , podatki )
čas.spanje ( 10 ) # Simulirajte posodobitve v realnem času vsakih 10 sekund


Ustvarimo primerek »KafkaProducer« z naslovom posrednika Kafka v tej kodi. Znotraj zanke uporabljamo »yfinance«, da pridobimo najnovejšo ceno delnic družbe Apple Inc. (»AAPL«). Nato izvlečemo zadnjo zaključno ceno in jo pošljemo v temo »borzna cena«. Sčasoma uvedemo časovni zamik za simulacijo posodobitev v realnem času vsakih 10 sekund.

Ustvarimo Kafkinega potrošnika za branje in obdelavo podatkov o tečajih delnic iz teme »borzni tečaj«.

potrošnik = KafkaPotrošnik ( 'cena delnic' ,
bootstrap_servers = 'localhost:9092' ,
deserializator_vrednosti =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

za sporočilo v potrošnik:
podatki o zalogi = sporočilo.vrednost
tiskanje ( f »Prejeti podatki o zalogi: {stock_data['symbol']} – cena: {stock_data['price']}« )


Ta koda je podobna nastavitvi potrošnika v prejšnjem primeru. Nenehno bere in obdeluje sporočila iz teme »borzna cena« ter natisne borzni simbol in ceno na konzolo. Celice kode izvajamo zaporedno, npr. eno za drugo v Google Colabu, da zaženemo proizvajalca in potrošnika. Proizvajalec prejema in pošilja posodobitve tečajev delnic v realnem času, medtem ko potrošnik bere in prikazuje te podatke.

! pip namestite kafka-python yfinance
iz kafka import KafkaProducer, KafkaConsumer
uvozi json
import yfinance kot yf
uvoz čas
producent = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
serializator_vrednosti =lambda v: json.dumps ( v ) .kodirati ( 'utf-8' ) )

medtem Prav:
zaloga = yf.Ticker ( 'AAPL' ) # delnice Apple Inc
podatki o zalogi = zgodovina zalog ( obdobje = '1d' )
zadnja_cena = podatki o zalogi [ 'Zapri' ] .iloc [ - 1 ]

podatki = { 'simbol' : 'AAPL' , 'cena' : zadnja_cena }

producent.pošlji ( 'cena delnic' , podatki )

čas.spanje ( 10 ) # Simulirajte posodobitve v realnem času vsakih 10 sekund
potrošnik = KafkaPotrošnik ( 'cena delnic' ,
bootstrap_servers = 'localhost:9092' ,
deserializator_vrednosti =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

za sporočilo v potrošnik:
podatki o zalogi = sporočilo.vrednost
tiskanje ( f »Prejeti podatki o zalogi: {stock_data['symbol']} – cena: {stock_data['price']}« )


Pri analizi izhoda po zagonu kode bomo opazovali sprotne posodobitve cen delnic za Apple Inc., ki se proizvajajo in porabljajo.

Zaključek

V tem edinstvenem primeru smo prikazali implementacijo pretakanja podatkov v realnem času v Pythonu z uporabo Apache Kafka in knjižnice »yfinance« za zajemanje in obdelavo podatkov o borzi. Vsako vrstico kode smo natančno razložili. Pretakanje podatkov v realnem času je mogoče uporabiti na različnih področjih za izdelavo aplikacij v resničnem svetu na področju financ, interneta stvari itd.