PySpark Pandas_Udf()

Pyspark Pandas Udf



PySpark DataFrame ஐ மாற்றுவது pandas_udf() செயல்பாட்டைப் பயன்படுத்தி சாத்தியமாகும். இது ஒரு பயனர் வரையறுக்கப்பட்ட செயல்பாடாகும், இது அம்புக்குறியுடன் PySpark DataFrame இல் பயன்படுத்தப்படுகிறது. pandas_udf() ஐப் பயன்படுத்தி வெக்டரைஸ் செய்யப்பட்ட செயல்பாடுகளைச் செய்யலாம். ஒரு அலங்கரிப்பாளராக இந்த செயல்பாட்டை நிறைவேற்றுவதன் மூலம் அதை செயல்படுத்தலாம். தொடரியல், அளவுருக்கள் மற்றும் வெவ்வேறு எடுத்துக்காட்டுகளை அறிய இந்த வழிகாட்டியில் மூழ்குவோம்.

உள்ளடக்கத்தின் தலைப்பு:

நீங்கள் PySpark DataFrame மற்றும் தொகுதி நிறுவல் பற்றி தெரிந்து கொள்ள விரும்பினால், இதைப் பார்க்கவும் கட்டுரை .







Pyspark.sql.functions.pandas_udf()

pandas_udf () ஆனது PySpark இல் உள்ள sql.functions தொகுதியில் கிடைக்கிறது, அதை 'from' முக்கிய சொல்லைப் பயன்படுத்தி இறக்குமதி செய்யலாம். இது எங்கள் PySpark DataFrame இல் வெக்டரைஸ் செய்யப்பட்ட செயல்பாடுகளைச் செய்யப் பயன்படுகிறது. இந்த செயல்பாடு மூன்று அளவுருக்களை கடந்து ஒரு அலங்கரிப்பாளர் போல் செயல்படுத்தப்படுகிறது. அதன் பிறகு, அம்புக்குறியைப் பயன்படுத்தி வெக்டார் வடிவமைப்பில் (இதற்குத் தொடர்/NumPy ஐப் பயன்படுத்துவது போல) தரவை வழங்கும் பயனர் வரையறுக்கப்பட்ட செயல்பாட்டை உருவாக்கலாம். இந்தச் செயல்பாட்டிற்குள், முடிவைத் திரும்பப் பெற முடியும்.



அமைப்பு & தொடரியல்:



முதலில், இந்த செயல்பாட்டின் அமைப்பு மற்றும் தொடரியல் பற்றி பார்ப்போம்:

@pandas_udf(datatype)
செயல்_பெயர் (செயல்பாடு) -> convert_format:
திரும்ப அறிக்கை

இங்கே, function_name என்பது நமது வரையறுக்கப்பட்ட செயல்பாட்டின் பெயர். தரவு வகையானது இந்தச் செயல்பாட்டின் மூலம் வழங்கப்படும் தரவு வகையைக் குறிப்பிடுகிறது. 'திரும்ப' முக்கிய சொல்லைப் பயன்படுத்தி முடிவைத் திரும்பப் பெறலாம். அனைத்து செயல்பாடுகளும் அம்புக்குறி மூலம் செயல்பாட்டிற்குள் செய்யப்படுகின்றன.





Pandas_udf (செயல்பாடு மற்றும் திரும்பும் வகை)

  1. முதல் அளவுரு அதற்கு அனுப்பப்படும் பயனர் வரையறுக்கப்பட்ட செயல்பாடு ஆகும்.
  2. செயல்பாட்டிலிருந்து திரும்பும் தரவு வகையைக் குறிப்பிட இரண்டாவது அளவுரு பயன்படுத்தப்படுகிறது.

தகவல்கள்:

இந்த முழு வழிகாட்டியிலும், ஒரே ஒரு PySpark DataFrame ஐ மட்டுமே விளக்கமாகப் பயன்படுத்துகிறோம். நாம் வரையறுக்கும் அனைத்து பயனர் வரையறுக்கப்பட்ட செயல்பாடுகளும் இந்த PySpark DataFrame இல் பயன்படுத்தப்படும். PySpark ஐ நிறுவிய பின் முதலில் உங்கள் சூழலில் இந்த DataFrame ஐ உருவாக்குவதை உறுதி செய்து கொள்ளவும்.



பைஸ்பார்க் இறக்குமதி

pyspark.sql இலிருந்து SparkSession இறக்குமதி

linuxhint_spark_app = SparkSession.builder.appName( 'லினக்ஸ் குறிப்பு' ).getOrCreate()

pyspark.sql.functions இலிருந்து pandas_udf ஐ இறக்குமதி செய்கிறது

pyspark.sql.types இலிருந்து இறக்குமதி *

பாண்டாக்களை பாண்டாவாக இறக்குமதி செய்

# காய்கறி விவரங்கள்

காய்கறி =[{ 'வகை' : 'காய்கறி' , 'பெயர்' : 'தக்காளி' , 'locate_country' : 'அமெரிக்கா' , 'அளவு' : 800 },

{ 'வகை' : 'பழம்' , 'பெயர்' : 'வாழை' , 'locate_country' : 'சீனா' , 'அளவு' : இருபது },

{ 'வகை' : 'காய்கறி' , 'பெயர்' : 'தக்காளி' , 'locate_country' : 'அமெரிக்கா' , 'அளவு' : 800 },

{ 'வகை' : 'காய்கறி' , 'பெயர்' : 'மாங்கனி' , 'locate_country' : 'ஜப்பான்' , 'அளவு' : 0 },

{ 'வகை' : 'பழம்' , 'பெயர்' : 'எலுமிச்சை' , 'locate_country' : 'இந்தியா' , 'அளவு' : 1700 },

{ 'வகை' : 'காய்கறி' , 'பெயர்' : 'தக்காளி' , 'locate_country' : 'அமெரிக்கா' , 'அளவு' : 1200 },

{ 'வகை' : 'காய்கறி' , 'பெயர்' : 'மாங்கனி' , 'locate_country' : 'ஜப்பான்' , 'அளவு' : 0 },

{ 'வகை' : 'பழம்' , 'பெயர்' : 'எலுமிச்சை' , 'locate_country' : 'இந்தியா' , 'அளவு' : 0 }

]

# மேலே உள்ள தரவுகளிலிருந்து சந்தை தரவு சட்டத்தை உருவாக்கவும்

market_df = linuxhint_spark_app.createDataFrame(காய்கறி)

market_df.show()

வெளியீடு:

இங்கே, இந்த DataFrame ஐ 4 நெடுவரிசைகள் மற்றும் 8 வரிசைகளுடன் உருவாக்குகிறோம். இப்போது, ​​பயனர் வரையறுக்கப்பட்ட செயல்பாடுகளை உருவாக்க மற்றும் அவற்றை இந்த நெடுவரிசைகளில் பயன்படுத்த pandas_udf() ஐப் பயன்படுத்துகிறோம்.

வெவ்வேறு தரவு வகைகளுடன் Pandas_udf().

இந்தச் சூழ்நிலையில், pandas_udf() மூலம் சில பயனர் வரையறுக்கப்பட்ட செயல்பாடுகளை உருவாக்கி அவற்றை நெடுவரிசைகளில் பயன்படுத்துகிறோம் மற்றும் தேர்வு() முறையைப் பயன்படுத்தி முடிவுகளைக் காண்பிக்கிறோம். ஒவ்வொரு சந்தர்ப்பத்திலும், வெக்டரைஸ் செய்யப்பட்ட செயல்பாடுகளைச் செய்யும்போது, ​​பாண்டாக்களைப் பயன்படுத்துகிறோம். இது நெடுவரிசை மதிப்புகளை ஒரு பரிமாண வரிசையாகக் கருதுகிறது மற்றும் செயல்பாடு நெடுவரிசையில் பயன்படுத்தப்படுகிறது. அலங்காரத்தில், செயல்பாடு திரும்பும் வகையை நாங்கள் குறிப்பிடுகிறோம்.

எடுத்துக்காட்டு 1: சரம் வகையுடன் Pandas_udf().

இங்கே, சரம் வகை நெடுவரிசை மதிப்புகளை பெரிய எழுத்து மற்றும் சிறிய எழுத்துக்கு மாற்ற, சரம் திரும்பும் வகையுடன் இரண்டு பயனர் வரையறுக்கப்பட்ட செயல்பாடுகளை உருவாக்குகிறோம். இறுதியாக, இந்த செயல்பாடுகளை 'வகை' மற்றும் 'locate_country' நெடுவரிசைகளில் பயன்படுத்துகிறோம்.

# pandas_udf உடன் வகை நெடுவரிசையை பெரிய எழுத்தாக மாற்றவும்

@pandas_udf(ஸ்ட்ரிங் டைப்())

def type_upper_case(i: panda.Series) -> panda.Series:

திரும்ப i.str.upper()

# லோகேட்_கன்ட்ரி நெடுவரிசையை பாண்டாஸ்_யுடிஎஃப் உடன் சிற்றெழுத்துக்கு மாற்றவும்

@pandas_udf(ஸ்ட்ரிங் டைப்())

def country_lower_case(i: panda.Series) -> panda.Series:

திரும்ப i.str.lower()

# தேர்ந்தெடுத்த()ஐப் பயன்படுத்தி நெடுவரிசைகளைக் காண்பி

market_df.select( 'வகை' ,type_upper_case( 'வகை' ), 'locate_country' ,
நாடு_சிறு எழுத்து( 'locate_country' )).show()

வெளியீடு:

விளக்கம்:

StringType() செயல்பாடு pyspark.sql.types தொகுதியில் கிடைக்கிறது. PySpark DataFrame ஐ உருவாக்கும் போது இந்த தொகுதியை ஏற்கனவே இறக்குமதி செய்துள்ளோம்.

  1. முதலில், UDF (பயனர் வரையறுக்கப்பட்ட செயல்பாடு) str.upper() செயல்பாட்டைப் பயன்படுத்தி சரங்களை பெரிய எழுத்தில் வழங்குகிறது. str.upper() ஆனது தொடர் தரவு கட்டமைப்பில் கிடைக்கிறது (செயல்பாட்டின் உள்ளே ஒரு அம்புக்குறியுடன் தொடராக மாற்றுகிறோம்) இது கொடுக்கப்பட்ட சரத்தை பெரிய எழுத்தாக மாற்றுகிறது. இறுதியாக, இந்த செயல்பாடு தேர்ந்தெடுக்கப்பட்ட () முறையின் உள்ளே குறிப்பிடப்பட்ட 'வகை' நெடுவரிசையில் பயன்படுத்தப்படுகிறது. முன்னர், வகை நெடுவரிசையில் உள்ள அனைத்து சரங்களும் சிறிய எழுத்துக்களில் உள்ளன. இப்போது, ​​அவை பெரிய எழுத்துக்கு மாற்றப்பட்டுள்ளன.
  2. இரண்டாவதாக, str.lower()function ஐப் பயன்படுத்தி UDF சரங்களை பெரிய எழுத்தில் வழங்குகிறது. str.lower() ஆனது தொடர் தரவு கட்டமைப்பில் கிடைக்கிறது, இது கொடுக்கப்பட்ட சரத்தை சிறிய எழுத்தாக மாற்றுகிறது. இறுதியாக, இந்த செயல்பாடு தேர்ந்தெடுக்கப்பட்ட () முறையின் உள்ளே குறிப்பிடப்பட்ட 'வகை' நெடுவரிசையில் பயன்படுத்தப்படுகிறது. முன்னதாக, வகை நெடுவரிசையில் உள்ள அனைத்து சரங்களும் பெரிய எழுத்தில் இருக்கும். இப்போது, ​​அவை சிறிய எழுத்துக்களுக்கு மாற்றப்பட்டுள்ளன.

எடுத்துக்காட்டு 2: முழு எண் வகையுடன் Pandas_udf().

PySpark DataFrame முழு எண் நெடுவரிசையை Pandas தொடராக மாற்றி, ஒவ்வொரு மதிப்பிலும் 100ஐச் சேர்க்கும் UDFஐ உருவாக்குவோம். இந்தச் செயல்பாட்டிற்கு 'அளவு' நெடுவரிசையை தேர்வு() முறையில் அனுப்பவும்.

# 100ஐச் சேர்க்கவும்

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

திரும்ப i+ 100

# மேலே உள்ள செயல்பாடு மற்றும் காட்சிக்கு அளவு நெடுவரிசையை அனுப்பவும்.

market_df.select( 'அளவு' ,சேர்_100( 'அளவு' )).show()

வெளியீடு:

விளக்கம்:

UDF இன் உள்ளே, நாங்கள் எல்லா மதிப்புகளையும் மீண்டும் செய்து, அவற்றை தொடராக மாற்றுகிறோம். அதன் பிறகு, தொடரில் உள்ள ஒவ்வொரு மதிப்புக்கும் 100ஐச் சேர்க்கிறோம். இறுதியாக, இந்த செயல்பாட்டிற்கு “அளவு” நெடுவரிசையை அனுப்புகிறோம், மேலும் அனைத்து மதிப்புகளிலும் 100 சேர்க்கப்படுவதைக் காணலாம்.

Pandas_udf() வெவ்வேறு தரவு வகைகளைப் பயன்படுத்தி Groupby() & Agg()

ஒருங்கிணைந்த நெடுவரிசைகளுக்கு UDF ஐ அனுப்புவதற்கான எடுத்துக்காட்டுகளைப் பார்ப்போம். இங்கே, நெடுவரிசை மதிப்புகள் முதலில் groupby() செயல்பாட்டைப் பயன்படுத்தி தொகுக்கப்படுகின்றன மற்றும் agg() செயல்பாட்டைப் பயன்படுத்தி திரட்டுதல் செய்யப்படுகிறது. இந்த மொத்த செயல்பாட்டிற்குள் நாங்கள் எங்கள் UDF ஐ கடந்து செல்கிறோம்.

தொடரியல்:

pyspark_dataframe_object.groupby( 'grouping_column' ).ag(UDF
(pyspark_dataframe_object[ 'நெடுவரிசை' ]))

இங்கே, தொகுத்தல் நெடுவரிசையில் உள்ள மதிப்புகள் முதலில் தொகுக்கப்படுகின்றன. பின்னர், எங்கள் UDF ஐப் பொறுத்து ஒவ்வொரு குழு தரவுகளிலும் திரட்டுதல் செய்யப்படுகிறது.

எடுத்துக்காட்டு 1: மொத்த சராசரி() உடன் Pandas_udf()

இங்கே, ரிட்டர்ன் டைப் ஃப்ளோட் மூலம் பயனர் வரையறுக்கப்பட்ட செயல்பாட்டை உருவாக்குகிறோம். செயல்பாட்டின் உள்ளே, சராசரி () செயல்பாட்டைப் பயன்படுத்தி சராசரியைக் கணக்கிடுகிறோம். ஒவ்வொரு வகைக்கும் சராசரி அளவைப் பெற இந்த UDF 'அளவு' நெடுவரிசைக்கு அனுப்பப்படுகிறது.

# சராசரி/சராசரியை திரும்பவும்

@pandas_udf( 'மிதவை' )

def average_function(i: panda.Series) -> float:

திரும்பு i.mean()

# வகை நெடுவரிசையைக் குழுவாக்குவதன் மூலம் செயல்பாட்டிற்கு அளவு நெடுவரிசையை அனுப்பவும்.

market_df.groupby( 'வகை' ).agg(average_function(market_df[ 'அளவு' ])).show()

வெளியீடு:

'வகை' நெடுவரிசையில் உள்ள உறுப்புகளின் அடிப்படையில் நாங்கள் குழுவாக்குகிறோம். இரண்டு குழுக்கள் உருவாகின்றன - 'பழம்' மற்றும் 'காய்கறி'. ஒவ்வொரு குழுவிற்கும், சராசரி கணக்கிடப்பட்டு திரும்பும்.

உதாரணம் 2: Pandas_udf() உடன் Max() மற்றும் Min()

இங்கே, முழு எண் (int) திரும்பும் வகையுடன் இரண்டு பயனர் வரையறுக்கப்பட்ட செயல்பாடுகளை உருவாக்குகிறோம். முதல் UDF குறைந்தபட்ச மதிப்பையும், இரண்டாவது UDF அதிகபட்ச மதிப்பையும் வழங்குகிறது.

குறைந்தபட்ச மதிப்பை வழங்கும் # pandas_udf

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

திரும்ப i.min()

அதிகபட்ச மதிப்பை வழங்கும் # pandas_udf

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

திரும்ப i.max()

# லோகேட்_கண்ட்ரியைக் குழுவாக்குவதன் மூலம் அளவு நெடுவரிசையை min_pandas_udf க்கு அனுப்பவும்.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'அளவு' ])).show()

# லோகேட்_கண்ட்ரியைக் குழுவாக்குவதன் மூலம் அளவு நெடுவரிசையை அதிகபட்சம்_ பாண்டாஸ்_யுடிஎஃப் க்கு அனுப்பவும்.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'அளவு' ])).show()

வெளியீடு:

குறைந்தபட்ச மற்றும் அதிகபட்ச மதிப்புகளை வழங்க, UDFகளின் திரும்பும் வகைகளில் min() மற்றும் max() செயல்பாடுகளைப் பயன்படுத்துகிறோம். இப்போது, ​​“locate_country” நெடுவரிசையில் தரவைக் குழுவாக்குகிறோம். நான்கு குழுக்கள் உருவாகின்றன ('சீனா', 'இந்தியா', 'ஜப்பான்', 'அமெரிக்கா'). ஒவ்வொரு குழுவிற்கும், அதிகபட்ச அளவை நாங்கள் திருப்பித் தருகிறோம். இதேபோல், குறைந்தபட்ச அளவை நாங்கள் திருப்பித் தருகிறோம்.

முடிவுரை

அடிப்படையில், pandas_udf () ஆனது எங்கள் PySpark DataFrame இல் வெக்டரைஸ் செய்யப்பட்ட செயல்பாடுகளைச் செய்யப் பயன்படுகிறது. pandas_udf() ஐ எவ்வாறு உருவாக்குவது மற்றும் அதை PySpark DataFrame இல் பயன்படுத்துவது எப்படி என்று பார்த்தோம். சிறந்த புரிதலுக்காக, அனைத்து தரவு வகைகளையும் (சரம், மிதவை மற்றும் முழு எண்) கருத்தில் கொண்டு வெவ்வேறு எடுத்துக்காட்டுகளைப் பற்றி விவாதித்தோம். agg() செயல்பாடு மூலம் groupby() உடன் pandas_udf() ஐப் பயன்படுத்த முடியும்.