gpt4 book ai didi

pyspark - 如何在 pyspark 数据帧上使用 forEachPartition?

转载 作者:行者123 更新时间:2023-12-02 02:39:48 26 4
gpt4 key购买 nike

我尝试在具有 8 个分区的 RDD 上使用 pyspark 来使用 forEachPartition() 方法。我的自定义函数尝试为给定的字符串输入生成字符串输出。这是代码

from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
import pandas as pd
import datetime

def compute_sentiment_score(text):
client = language.LanguageServiceClient()
document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
sentiment = client.analyze_sentiment(document=document).document_sentiment
return str(sentiment.score)

def compute_sentiment_magnitude(text):
client = language.LanguageServiceClient()
document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
sentiment = client.analyze_sentiment(document=document).document_sentiment
return str(sentiment.magnitude)

import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/path-to-file.json"

imdb_reviews = pd.read_csv('imdb_reviews.csv', header=None, names=['input1', 'input2'], encoding= "ISO-8859-1")

imdb_reviews.head()

input1 input2
0 first think another Disney movie, might good, ... 1
1 Put aside Dr. House repeat missed, Desperate H... 0
2 big fan Stephen King's work, film made even gr... 1
3 watched horrid thing TV. Needless say one movi... 0
4 truly enjoyed film. acting terrific plot. Jeff... 1


spark_imdb_reviews = spark.createDataFrame(imdb_reviews) # create spark dataframe


spark_imdb_reviews.printSchema()
root
|-- input1: string (nullable = true)
|-- input2: long (nullable = true)

这是我的自定义函数 -

def compute_sentiment_score(text):
client = language.LanguageServiceClient()
document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
sentiment = client.analyze_sentiment(document=document).document_sentiment
return str(sentiment.score)

def compute_sentiment_magnitude(text):
client = language.LanguageServiceClient()
document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
sentiment = client.analyze_sentiment(document=document).document_sentiment
return str(sentiment.magnitude)

这是我尝试使用 forEachPartition() 方法的方法 -

create_rdd = spark_imdb_reviews.select("input1").rdd # create RDD
print(create_rdd.getNumPartitions()) # print the partitions
print(create_rdd.take(1)) # display data
new_rdd = create_rdd.foreachPartition(compute_sentiment_score) # compute score

这给出了这个输出和一个错误 -

8
[Row(input1="first think another Disney movie, might good, it's kids movie. watch it, can't help enjoy it. ages love movie. first saw movie 10 8 years later still love it! Danny Glover superb could play part better. Christopher Lloyd hilarious perfect part. Tony Danza believable Mel Clark. can't help, enjoy movie! give 10/10!")]

File "<ipython-input-106-e3fd65ce75cc>", line 3, in compute_sentiment_score
TypeError: <itertools.chain object at 0x11ab7f198> has type itertools.chain, but expected one of: bytes, unicode

最佳答案

有两个类似的函数:

这两个函数都需要另一个函数作为参数(此处为compute_sentiment_score)。该函数获取以迭代器形式传递的分区的内容。问题中的 text 参数实际上是一个迭代器,可以在 compute_sentiment_score 内部使用。

foreachPartitionmapPartition 之间的区别在于,foreachPartition 是 Spark 操作,而 mapPartition 是转换。这意味着 foreachPartition 调用的代码会立即执行,并且 RDD 保持不变,而 mapPartition 可用于创建新的 RDD。为了存储计算出的情绪分数,应使用 mapPartitions

def compute_sentiment_score(itr_text):
#setup the things that are expensive and should be prepared only once per partition
client = language.LanguageServiceClient()

#run the loop for each row of the partition
for text in itr_text:
document = types.Document(content=text.value,type=enums.Document.Type.PLAIN_TEXT, language='en')
sentiment = client.analyze_sentiment(document=document).document_sentiment
yield (text.value, sentiment.score)

df_with_score = df.rdd.mapPartitions(compute_sentiment_score)
df_with_score.foreach(print)

在此示例中,每个分区都会调用 client = language.LanguageServiceClient() 一次。可能必须减少分区数量,例如 coalesce .

关于pyspark - 如何在 pyspark 数据帧上使用 forEachPartition?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63806467/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com