- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有一个应用程序,用户可以在其中投票。
我希望我的应用程序能够扩展,所以我决定使用 Cloud Dataflow 聚合存储在 Firestore 中的计数器。
我设置了一个类型为streaming 的 Dataflow 作业,这样我就可以在用户投票时监听 pubsub 主题。
有时我每天有数千个用户输入,有时我有几百个...当一段时间没有收到 pubsub 消息时,是否有任何解决方案可以“暂停”作业?
目前,我的数据流作业一直在运行,恐怕这会花费我很多钱。
如果有人可以帮助我理解流媒体工作的计费,我将不胜感激
这是我的 Python 管道:
def run(argv=None):
# Config
parser = argparse.ArgumentParser()
# Output PubSub Topic
parser.add_argument(
'--output_topic', required=True)
# Input PubSub Topic
parser.add_argument(
'--input_topic', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
# Pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
# Pipeline process
with beam.Pipeline(options=pipeline_options) as p:
# Counting votes
def count_votes(contestant_votes):
(contestant, votes) = contestant_votes
return (contestant, sum(votes))
# Format data to a fake object (used to be parsed by the CF)
def format_result(contestant_votes):
(contestant, votes) = contestant_votes
return '{ "contestant": %s, "votes": %d }' % (contestant, votes)
transformed = (p
| 'Receive PubSub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
.with_output_types(bytes)
| 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'Pair with one' >> beam.Map(lambda x: (x, 1))
| 'Apply window of time' >> beam.WindowInto(window.FixedWindows(30, 0))
| 'Group by contestant' >> beam.GroupByKey()
| 'Count votes' >> beam.Map(count_votes)
| 'Format to fake object string' >> beam.Map(format_result)
| 'Transform to PubSub base64 string' >> beam.Map(lambda x: x.encode('utf-8'))
.with_output_types(bytes))
# Trigger a the output PubSub topic with the message payload
transformed | beam.io.WriteToPubSub(known_args.output_topic)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
最佳答案
回答您的成本问题:对于您当前使用的工作器,您将花费大约 250 美元(取决于您当月的 PD 使用情况)。
目前没有等待强制数据流“空闲”或扩展到 0 个工作器。您可以拥有的最小值是 1。
话虽这么说,但您可以采取一些途径来尽量降低成本。
如果您的工作器负载不大,并且您想要最简单的选项,则可以使用功能较弱的工作器(n1-standard-1 [~USD $77.06] 或 n1-standard-2 [~USD $137.17]) . https://cloud.google.com/products/calculator/#id=3bbedf2f-8bfb-41db-9923-d3a5ef0c0250 (如果你看到我添加了所有 3 个变体,使用我在你的照片中看到的 430GB PD)。
如果您需要计算能力,您可以切换到使用基于 cron 的数据流作业,如下所述:https://cloud.google.com/blog/products/gcp/scheduling-dataflow-pipelines-using-app-engine-cron-service-or-cloud-functions .有了这个,您可能应该从订阅而不是主题中阅读,这样您就可以保留消息直到您开始工作。
关于python - Cloud Dataflow 流式传输,空闲时停止以省钱?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55667401/
我的问题是,在幕后,对于元素级 Beam DoFn (ParDo),Cloud Dataflow 的并行工作负载如何?例如,在我的 ParDO 中,我向外部服务器发送一个 http 请求以获取一个元素
就 Google Cloud 上 Dataflow 的 HA 而言,最佳架构是什么?我的工作负载在两个区域运行。数据流从一个多区域存储桶中读取并将结果写出到另一个多区域存储桶中。 为了实现高可用性(以
如图 here数据流管道由固定的 DAG 表示。我想知道是否有可能实现一个管道,在该管道中处理继续进行,直到基于到目前为止计算的数据满足动态评估的条件。 这是一些伪代码来说明我想要实现的内容:
在旧的定价页面上,他们提到 Cloud Dataflow 工作人员使用的所有 Google Compute 实例都是根据持续使用价格规则计费的,但新的定价页面不再提及。 我假设由于它在内部使用相同的
批处理 Dataflow 作业处理完所有数据后是否可以执行操作?具体来说,我想将管道刚刚处理的文本文件移动到不同的 GCS 存储桶。我不确定将它放在我的管道中的哪个位置以确保它在数据处理完成后执行一次
我希望能够通过自定义键使用分组,但这是我目前的尝试, 我们为 KV 对象的键使用自定义类,因为我们希望 GroupBy 具有更复杂的条件,而不是使用 String 等进行简单的键匹配。 ```
当尝试在 Dataflow 服务上运行管道时,我在命令行上指定了暂存和临时存储桶(在 GCS 中)。当程序执行时,我在管道运行之前收到一个 RuntimeException,根本原因是我在路径中遗漏了
我试图找到一种优雅地结束我的工作的方法,以免丢失任何数据,从 PubSub 流式传输并写入 BigQuery。 我可以设想的一种可能方法是让作业停止提取新数据,然后运行直到它处理完所有内容,但我不知道
问题: 使用 Cloud Dataflow 时,我们会看到 2 个指标(请参阅 this page): 系统延迟 数据新鲜度 这些在 Stackdriver 中也可用以下名称(摘自 here): sy
我一直在阅读 Dataflow SDK 文档,试图找出当数据到达流作业中的水印时会发生什么。 这一页: https://cloud.google.com/dataflow/model/windowin
有没有办法(或任何类型的黑客)从压缩文件中读取输入数据? 我的输入包含数百个文件,这些文件是用 gzip 压缩生成的,解压缩它们有些乏味。 最佳答案 Dataflow 现在支持从压缩文本源中读取(从
我正在尝试在 Dataflow 中执行联合操作。是否有用于在 Dataflow 中合并两个 PCollections 的示例代码? 最佳答案 一个简单的方法是像这样将 Flatten() 与 Remo
在我的管道上运行“更新”后,我注意到有新创建的永久磁盘在 10 多分钟后未附加到任何实例。 最佳答案 这是 Dataflow 服务的一个持续已知问题,会在管道更新过程中导致孤立磁盘。可以安全地删除这些
是否可以为 Dataflow 工作人员提供自定义包? 我想从计算内部输出到 Debian 打包的二进制文件。 编辑:需要明确的是,包配置非常复杂,仅将文件捆绑在 --filesToStage 中是不可
我想使用 Google Cloud Dataflow 创建 session 窗口,如 dataflow model paper 中所述。 .我想将我的未绑定(bind)数据发送到 Pub/Sub,然后
我正在尝试运行从 pubsub 主题读取并写入 bigquery 的管道。时间戳是从主题消息中解析出来的。但是,我收到了一条关于允许的时间戳偏差的错误,并引用了下面复制的文档。 getAllowedT
我有一个大型数据文件 (1 TB) 的数据要导入 BigQuery。每行包含一个键。在导入数据并创建我的 PCollection 以导出到 BigQuery 时,我想确保我不会基于此键值导入重复记录。
我正在通过 Python API 在 Dataflow 上使用 Apache Beam 从 Bigquery 读取数据,对其进行处理,然后将其转储到 Datastore 接收器中。 不幸的是,作业经常
我一直在研究使用 spring-cloud-dataflow 中的 spring-cloud-task 构建的项目。查看示例项目和文档后,似乎表明任务是通过仪表板或 shell 手动启动的。 spri
我有以下场景: 管道 A 在 BigQuery 中查找表 A,进行一些计算并返回列名列表。 此列名称列表用作管道 B 输出的 BigQuery 架构。 您能否让我知道实现这一目标的最佳选择是什么? 管
我是一名优秀的程序员,十分优秀!