பைத்தானில் நிகழ்நேர தரவு ஸ்ட்ரீமிங்கை எவ்வாறு செயல்படுத்துவது

Paittanil Nikalnera Taravu Striminkai Evvaru Ceyalpatuttuvatu



பைத்தானில் நிகழ்நேர தரவு ஸ்ட்ரீமிங்கைச் செயல்படுத்துவதில் தேர்ச்சி பெறுவது இன்றைய தரவு சம்பந்தப்பட்ட உலகில் இன்றியமையாத திறமையாக செயல்படுகிறது. பைத்தானில் நம்பகத்தன்மையுடன் நிகழ்நேர தரவு ஸ்ட்ரீமிங்கைப் பயன்படுத்துவதற்கான முக்கிய படிகள் மற்றும் அத்தியாவசிய கருவிகளை இந்த வழிகாட்டி ஆராய்கிறது. Apache Kafka அல்லது Apache Pulsar போன்ற பொருத்தமான கட்டமைப்பைத் தேர்ந்தெடுப்பதில் இருந்து, சிரமமில்லாத தரவு நுகர்வு, செயலாக்கம் மற்றும் பயனுள்ள காட்சிப்படுத்தலுக்கான பைதான் குறியீட்டை எழுதுவது வரை, சுறுசுறுப்பான மற்றும் திறமையான நிகழ்நேர தரவு சேனல்களை உருவாக்க தேவையான திறன்களைப் பெறுவோம்.

எடுத்துக்காட்டு 1: பைத்தானில் நிகழ்நேர தரவு ஸ்ட்ரீமிங்கை செயல்படுத்துதல்

இன்றைய தரவு உந்துதல் யுகத்திலும் உலகிலும் பைத்தானில் நிகழ்நேர தரவு ஸ்ட்ரீமிங்கைச் செயல்படுத்துவது மிகவும் முக்கியமானது. இந்த விரிவான எடுத்துக்காட்டில், Google Colab இல் Apache Kafka மற்றும் Python ஐப் பயன்படுத்தி நிகழ்நேர தரவு ஸ்ட்ரீமிங் அமைப்பை உருவாக்கும் செயல்முறையை நாங்கள் மேற்கொள்வோம்.







குறியீட்டு முறையைத் தொடங்குவதற்கு முன் உதாரணத்தைத் தொடங்க, Google Colab இல் ஒரு குறிப்பிட்ட சூழலை உருவாக்குவது அவசியம். முதலில் நாம் செய்ய வேண்டியது தேவையான நூலகங்களை நிறுவுவதுதான். காஃப்கா ஒருங்கிணைப்புக்கு 'kafka-python' நூலகத்தைப் பயன்படுத்துகிறோம்.



! பிப் நிறுவு காஃப்கா-பைத்தான்


இந்த கட்டளையானது 'kafka-python' நூலகத்தை நிறுவுகிறது, இது Apache Kafka க்கான பைதான் செயல்பாடுகள் மற்றும் பிணைப்புகளை வழங்குகிறது. அடுத்து, எங்கள் திட்டத்திற்கு தேவையான நூலகங்களை இறக்குமதி செய்கிறோம். 'KafkaProducer' மற்றும் 'KafkaConsumer' உள்ளிட்ட தேவையான நூலகங்களை இறக்குமதி செய்வது 'kafka-python' நூலகத்தின் வகுப்புகளாகும், இது காஃப்கா தரகர்களுடன் தொடர்பு கொள்ள அனுமதிக்கிறது. JSON என்பது JSON தரவுகளுடன் பணிபுரியும் பைதான் நூலகமாகும், இது செய்திகளை வரிசைப்படுத்தவும் சீரழிக்கவும் பயன்படுகிறது.



காஃப்கா இறக்குமதி காஃப்கா தயாரிப்பாளர், காஃப்கா நுகர்வோர்
json இறக்குமதி


காஃப்கா தயாரிப்பாளரின் உருவாக்கம்





காஃப்கா தயாரிப்பாளர் ஒரு காஃப்கா தலைப்புக்கு தரவை அனுப்புவதால் இது முக்கியமானது. எங்கள் எடுத்துக்காட்டில், 'நிகழ்நேர-தலைப்பு' எனப்படும் தலைப்புக்கு உருவகப்படுத்தப்பட்ட நிகழ்நேர தரவை அனுப்ப தயாரிப்பாளரை உருவாக்குகிறோம்.

காஃப்கா புரோக்கரின் முகவரியை 'localhost:9092' எனக் குறிப்பிடும் 'KafkaProducer' நிகழ்வை உருவாக்குகிறோம். பின்னர், 'value_serializer' ஐப் பயன்படுத்துகிறோம், இது காஃப்காவிற்கு அனுப்பும் முன் தரவை வரிசைப்படுத்தும் செயல்பாடாகும். எங்கள் விஷயத்தில், ஒரு lambda செயல்பாடு UTF-8-குறியீடு செய்யப்பட்ட JSON என தரவை குறியாக்குகிறது. இப்போது, ​​சில நிகழ் நேரத் தரவை உருவகப்படுத்தி காஃப்கா தலைப்புக்கு அனுப்புவோம்.



தயாரிப்பாளர் = காஃப்கா தயாரிப்பாளர் ( bootstrap_servers = 'லோக்கல் ஹோஸ்ட்:9092' ,
மதிப்பு_சீரியலைசர் =lambda v: json.dumps ( உள்ளே ) .குறியீடு ( 'utf-8' ) )
# உருவகப்படுத்தப்பட்ட நிகழ்நேர தரவு
தரவு = { 'சென்சார்_ஐடி' : 1 , 'வெப்ப நிலை' : 25.5 , 'ஈரப்பதம்' : 60.2 }
# தலைப்புக்கு தரவை அனுப்புகிறது
தயாரிப்பாளர்.அனுப்பு ( 'நிகழ் நேர தலைப்பு' , தகவல்கள் )


இந்த வரிகளில், உருவகப்படுத்தப்பட்ட சென்சார் தரவைக் குறிக்கும் 'தரவு' அகராதியை நாங்கள் வரையறுக்கிறோம். இந்தத் தரவை 'நிகழ்நேர-தலைப்பில்' வெளியிட 'அனுப்பு' முறையைப் பயன்படுத்துகிறோம்.

பின்னர், நாங்கள் ஒரு காஃப்கா நுகர்வோரை உருவாக்க விரும்புகிறோம், மேலும் காஃப்கா நுகர்வோர் ஒரு காஃப்கா தலைப்பிலிருந்து தரவைப் படிக்கிறார். 'நிகழ்நேர-தலைப்பில்' செய்திகளை நுகர்வதற்கும் செயலாக்குவதற்கும் ஒரு நுகர்வோரை உருவாக்குகிறோம். 'காஃப்கா நுகர்வோர்' நிகழ்வை உருவாக்குகிறோம், நாங்கள் பயன்படுத்த விரும்பும் தலைப்பைக் குறிப்பிடுகிறோம், எ.கா., (நிகழ்நேர-தலைப்பு) மற்றும் காஃப்கா தரகரின் முகவரி. பின்னர், 'value_deserializer' என்பது காஃப்காவிலிருந்து பெறப்பட்ட தரவை சீரழிக்கும் ஒரு செயல்பாடாகும். எங்கள் விஷயத்தில், ஒரு lambda செயல்பாடு UTF-8-குறியீடு செய்யப்பட்ட JSON என தரவை டிகோட் செய்கிறது.

நுகர்வோர் = காஃப்கா நுகர்வோர் ( 'நிகழ் நேர தலைப்பு' ,
bootstrap_servers = 'லோக்கல் ஹோஸ்ட்:9092' ,
மதிப்பு_டீரியலைசர் =lambda x: json.loads ( x.குறியீடு ( 'utf-8' ) ) )


தலைப்பிலிருந்து வரும் செய்திகளைத் தொடர்ந்து நுகர்வதற்கும் செயலாக்குவதற்கும் மீண்டும் ஒரு சுழற்சியைப் பயன்படுத்துகிறோம்.

# நிகழ்நேர தரவைப் படித்தல் மற்றும் செயலாக்குதல்
க்கான செய்தி உள்ளே நுகர்வோர்:
தரவு = செய்தி.மதிப்பு
அச்சு ( f 'பெறப்பட்ட தரவு: {data}' )


ஒவ்வொரு செய்தியின் மதிப்பையும், லூப்பில் உள்ள எங்களின் உருவகப்படுத்தப்பட்ட சென்சார் தரவையும் மீட்டெடுத்து, அதை கன்சோலில் அச்சிடுவோம். காஃப்கா தயாரிப்பாளர் மற்றும் நுகர்வோரை இயக்குவது இந்த குறியீட்டை Google Colab இல் இயக்குவது மற்றும் குறியீடு செல்களை தனித்தனியாக இயக்குவது ஆகியவை அடங்கும். தயாரிப்பாளர் உருவகப்படுத்தப்பட்ட தரவை காஃப்கா தலைப்புக்கு அனுப்புகிறார், மேலும் நுகர்வோர் பெறப்பட்ட தரவைப் படித்து அச்சிடுகிறார்.


குறியீடு இயங்கும்போது வெளியீட்டின் பகுப்பாய்வு

உற்பத்தி செய்யப்பட்டு நுகரப்படும் நிகழ்நேரத் தரவை நாங்கள் கவனிப்போம். எங்கள் உருவகப்படுத்துதல் அல்லது உண்மையான தரவு மூலத்தைப் பொறுத்து தரவு வடிவம் மாறுபடலாம். இந்த விரிவான எடுத்துக்காட்டில், Google Colab இல் Apache Kafka மற்றும் Python ஐப் பயன்படுத்தி நிகழ்நேர தரவு ஸ்ட்ரீமிங் அமைப்பை அமைப்பதற்கான முழு செயல்முறையையும் நாங்கள் உள்ளடக்குகிறோம். குறியீட்டின் ஒவ்வொரு வரியையும் இந்த அமைப்பை உருவாக்குவதில் அதன் முக்கியத்துவத்தையும் விளக்குவோம். நிகழ்நேர தரவு ஸ்ட்ரீமிங் ஒரு சக்திவாய்ந்த திறன் ஆகும், மேலும் இந்த உதாரணம் மிகவும் சிக்கலான நிஜ-உலகப் பயன்பாடுகளுக்கான அடித்தளமாக செயல்படுகிறது.

எடுத்துக்காட்டு 2: பங்குச் சந்தைத் தரவைப் பயன்படுத்தி பைத்தானில் நிகழ்நேர தரவு ஸ்ட்ரீமிங்கைச் செயல்படுத்துதல்

ஒரு வித்தியாசமான சூழ்நிலையைப் பயன்படுத்தி பைத்தானில் நிகழ்நேர தரவு ஸ்ட்ரீமிங்கை செயல்படுத்துவதற்கான மற்றொரு தனித்துவமான உதாரணத்தைச் செய்வோம்; இந்த நேரத்தில், நாங்கள் பங்குச் சந்தை தரவுகளில் கவனம் செலுத்துவோம். Google Colab இல் Apache Kafka மற்றும் Python ஐப் பயன்படுத்தி பங்கு விலை மாற்றங்களைப் படம்பிடித்து அவற்றைச் செயலாக்கும் நிகழ்நேர டேட்டா ஸ்ட்ரீமிங் அமைப்பை நாங்கள் உருவாக்குகிறோம். முந்தைய எடுத்துக்காட்டில் காட்டப்பட்டுள்ளபடி, Google Colab இல் எங்கள் சூழலை உள்ளமைப்பதன் மூலம் தொடங்குகிறோம். முதலில், தேவையான நூலகங்களை நிறுவுகிறோம்:

! பிப் நிறுவு kafka-python yfinance


இங்கே, 'yfinance' நூலகத்தைச் சேர்க்கிறோம், இது நிகழ்நேர பங்குச் சந்தைத் தரவைப் பெற அனுமதிக்கிறது. அடுத்து, தேவையான நூலகங்களை இறக்குமதி செய்கிறோம். 'காஃப்கா-பைதான்' நூலகத்திலிருந்து 'காஃப்கா தயாரிப்பாளர்' மற்றும் 'காஃப்கா நுகர்வோர்' வகுப்புகளைத் தொடர்ந்து காஃப்கா தொடர்புக்காகப் பயன்படுத்துகிறோம். JSON தரவுடன் வேலை செய்ய JSON ஐ இறக்குமதி செய்கிறோம். நிகழ்நேர பங்குச் சந்தைத் தரவைப் பெற “yfinance” ஐப் பயன்படுத்துகிறோம். நிகழ்நேர புதுப்பிப்புகளை உருவகப்படுத்த, நேர தாமதத்தைச் சேர்க்க 'நேரம்' நூலகத்தையும் இறக்குமதி செய்கிறோம்.

காஃப்கா இறக்குமதி காஃப்கா தயாரிப்பாளர், காஃப்கா நுகர்வோர்
json இறக்குமதி
yfinance இறக்குமதி என yf
இறக்குமதி நேரம்


இப்போது, ​​பங்குத் தரவுகளுக்காக காஃப்கா தயாரிப்பாளரை உருவாக்குகிறோம். எங்கள் காஃப்கா தயாரிப்பாளர் நிகழ்நேர பங்குத் தரவைப் பெற்று, 'பங்கு விலை' என்ற காஃப்கா தலைப்புக்கு அனுப்புகிறார்.

தயாரிப்பாளர் = காஃப்கா தயாரிப்பாளர் ( bootstrap_servers = 'லோக்கல் ஹோஸ்ட்:9092' ,
மதிப்பு_சீரியலைசர் =lambda v: json.dumps ( உள்ளே ) .குறியீடு ( 'utf-8' ) )

போது உண்மை:
பங்கு = yf.டிக்கர் ( 'ஏஏபிஎல்' ) # எடுத்துக்காட்டு: Apple Inc. பங்கு
பங்கு_தரவு = பங்கு.வரலாறு ( காலம் = '1டி' )
last_price = stock_data [ 'நெருக்கமான' ] .iloc [ - 1 ]
தரவு = { 'சின்னம்' : 'ஏஏபிஎல்' , 'விலை' : கடைசி_விலை }
தயாரிப்பாளர்.அனுப்பு ( 'பங்கு விலை' , தகவல்கள் )
நேரம்.தூக்கம் ( 10 ) # ஒவ்வொரு 10 வினாடிகளுக்கும் நிகழ்நேர புதுப்பிப்புகளை உருவகப்படுத்தவும்


இந்தக் குறியீட்டில் உள்ள காஃப்கா தரகரின் முகவரியுடன் “காஃப்கா தயாரிப்பாளர்” நிகழ்வை உருவாக்குகிறோம். லூப்பின் உள்ளே, Apple Inc. (“AAPL”)க்கான சமீபத்திய பங்கு விலையைப் பெற “yfinance” ஐப் பயன்படுத்துகிறோம். பின்னர், கடைசி இறுதி விலையை பிரித்தெடுத்து, 'பங்கு விலை' தலைப்புக்கு அனுப்புவோம். இறுதியில், ஒவ்வொரு 10 வினாடிகளுக்கும் நிகழ்நேர புதுப்பிப்புகளை உருவகப்படுத்த நேர தாமதத்தை அறிமுகப்படுத்துகிறோம்.

'பங்கு-விலை' தலைப்பில் இருந்து பங்கு விலை தரவைப் படித்து செயலாக்க காஃப்கா நுகர்வோரை உருவாக்குவோம்.

நுகர்வோர் = காஃப்கா நுகர்வோர் ( 'பங்கு விலை' ,
bootstrap_servers = 'லோக்கல் ஹோஸ்ட்:9092' ,
மதிப்பு_டீரியலைசர் =lambda x: json.loads ( x.குறியீடு ( 'utf-8' ) ) )

க்கான செய்தி உள்ளே நுகர்வோர்:
stock_data = செய்தி.மதிப்பு
அச்சு ( f 'பெறப்பட்ட பங்குத் தரவு: {stock_data['symbol']} - விலை: {stock_data['price']}' )


இந்த குறியீடு முந்தைய உதாரணத்தின் நுகர்வோர் அமைப்பைப் போன்றது. இது 'பங்கு-விலை' தலைப்பில் இருந்து செய்திகளைத் தொடர்ந்து படித்து செயலாக்குகிறது மற்றும் பங்குச் சின்னத்தையும் விலையையும் கன்சோலில் அச்சிடுகிறது. குறியீட்டு கலங்களை வரிசையாக இயக்குகிறோம், எ.கா., தயாரிப்பாளர் மற்றும் நுகர்வோரை இயக்க Google Colabல் ஒவ்வொன்றாக. நுகர்வோர் இந்தத் தரவைப் படித்து காண்பிக்கும் போது உற்பத்தியாளர் நிகழ்நேர பங்கு விலை புதுப்பிப்புகளைப் பெற்று அனுப்புகிறார்.

! பிப் நிறுவு kafka-python yfinance
காஃப்கா இறக்குமதி காஃப்கா தயாரிப்பாளர், காஃப்கா நுகர்வோர்
json இறக்குமதி
yfinance இறக்குமதி என yf
இறக்குமதி நேரம்
தயாரிப்பாளர் = காஃப்கா தயாரிப்பாளர் ( bootstrap_servers = 'லோக்கல் ஹோஸ்ட்:9092' ,
மதிப்பு_சீரியலைசர் =lambda v: json.dumps ( உள்ளே ) .குறியீடு ( 'utf-8' ) )

போது உண்மை:
பங்கு = yf.டிக்கர் ( 'ஏஏபிஎல்' ) # Apple Inc. பங்கு
பங்கு_தரவு = பங்கு.வரலாறு ( காலம் = '1டி' )
last_price = stock_data [ 'நெருக்கமான' ] .iloc [ - 1 ]

தரவு = { 'சின்னம்' : 'ஏஏபிஎல்' , 'விலை' : கடைசி_விலை }

தயாரிப்பாளர்.அனுப்பு ( 'பங்கு விலை' , தகவல்கள் )

நேரம்.தூக்கம் ( 10 ) # ஒவ்வொரு 10 வினாடிகளுக்கும் நிகழ்நேர புதுப்பிப்புகளை உருவகப்படுத்தவும்
நுகர்வோர் = காஃப்கா நுகர்வோர் ( 'பங்கு விலை' ,
bootstrap_servers = 'லோக்கல் ஹோஸ்ட்:9092' ,
மதிப்பு_டீரியலைசர் =lambda x: json.loads ( x.குறியீடு ( 'utf-8' ) ) )

க்கான செய்தி உள்ளே நுகர்வோர்:
stock_data = செய்தி.மதிப்பு
அச்சு ( f 'பெறப்பட்ட பங்குத் தரவு: {stock_data['symbol']} - விலை: {stock_data['price']}' )


குறியீடு இயங்கிய பிறகு வெளியீட்டின் பகுப்பாய்வில், Apple Inc. இன் நிகழ்நேர பங்கு விலை புதுப்பிப்புகள் தயாரிக்கப்பட்டு நுகரப்படுவதை நாங்கள் கவனிப்போம்.

முடிவுரை

இந்த தனித்துவமான எடுத்துக்காட்டில், Apache Kafka மற்றும் 'yfinance' நூலகத்தைப் பயன்படுத்தி பைத்தானில் நிகழ்நேர டேட்டா ஸ்ட்ரீமிங்கைச் செயல்படுத்தி, பங்குச் சந்தைத் தரவைப் பிடிக்கவும் செயலாக்கவும் செய்து காட்டினோம். குறியீட்டின் ஒவ்வொரு வரியையும் முழுமையாக விளக்கினோம். நிதி, IoT மற்றும் பலவற்றில் நிஜ-உலகப் பயன்பாடுகளை உருவாக்க பல்வேறு துறைகளில் நிகழ்நேர தரவு ஸ்ட்ரீமிங்கைப் பயன்படுத்தலாம்.