gpt4 book ai didi

python - Cloud Dataflow 流式传输,空闲时停止以省钱?

转载 作者:太空宇宙 更新时间:2023-11-04 00:01:11 25 4
gpt4 key购买 nike

我有一个应用程序,用户可以在其中投票。

我希望我的应用程序能够扩展,所以我决定使用 Cloud Dataflow 聚合存储在 Firestore 中的计数器。

我设置了一个类型为streaming 的 Dataflow 作业,这样我就可以在用户投票时监听 pubsub 主题。

有时我每天有数千个用户输入,有时我有几百个...当一段时间没有收到 pubsub 消息时,是否有任何解决方案可以“暂停”作业?

目前,我的数据流作业一直在运行,恐怕这会花费我很多钱。

如果有人可以帮助我理解流媒体工作的计费,我将不胜感激

这是我的 Python 管道:

def run(argv=None):
# Config
parser = argparse.ArgumentParser()
# Output PubSub Topic
parser.add_argument(
'--output_topic', required=True)
# Input PubSub Topic
parser.add_argument(
'--input_topic', required=True)

known_args, pipeline_args = parser.parse_known_args(argv)

# Pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True

# Pipeline process
with beam.Pipeline(options=pipeline_options) as p:

# Counting votes
def count_votes(contestant_votes):
(contestant, votes) = contestant_votes
return (contestant, sum(votes))

# Format data to a fake object (used to be parsed by the CF)
def format_result(contestant_votes):
(contestant, votes) = contestant_votes
return '{ "contestant": %s, "votes": %d }' % (contestant, votes)

transformed = (p
| 'Receive PubSub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
.with_output_types(bytes)
| 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'Pair with one' >> beam.Map(lambda x: (x, 1))
| 'Apply window of time' >> beam.WindowInto(window.FixedWindows(30, 0))
| 'Group by contestant' >> beam.GroupByKey()
| 'Count votes' >> beam.Map(count_votes)
| 'Format to fake object string' >> beam.Map(format_result)
| 'Transform to PubSub base64 string' >> beam.Map(lambda x: x.encode('utf-8'))
.with_output_types(bytes))

# Trigger a the output PubSub topic with the message payload
transformed | beam.io.WriteToPubSub(known_args.output_topic)

result = p.run()
result.wait_until_finish()


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

Job config

最佳答案

回答您的成本问题:对于您当前使用的工作器,您将花费大约 250 美元(取决于您当月的 PD 使用情况)。

目前没有等待强制数据流“空闲”或扩展到 0 个工作器。您可以拥有的最小值是 1。

话虽这么说,但您可以采取一些途径来尽量降低成本。

如果您的工作器负载不大,并且您想要最简单的选项,则可以使用功能较弱的工作器(n1-standard-1 [~USD $77.06] 或 n1-standard-2 [~USD $137.17]) . https://cloud.google.com/products/calculator/#id=3bbedf2f-8bfb-41db-9923-d3a5ef0c0250 (如果你看到我添加了所有 3 个变体,使用我在你的照片中看到的 430GB PD)。

如果您需要计算能力,您可以切换到使用基于 cron 的数据流作业,如下所述:https://cloud.google.com/blog/products/gcp/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions .有了这个,您可能应该从订阅而不是主题中阅读,这样您就可以保留消息直到您开始工作。

关于python - Cloud Dataflow 流式传输,空闲时停止以省钱?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55667401/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com