- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想使用 spark.read.csv
解析一系列 .csv
文件,但我想在文件中包含每一行的行号。
我知道除非明确告知,否则 Spark 通常不会对 DataFrame 进行排序,而且我不想编写自己的 .csv
文件解析器,因为这比 Spark 的解析器慢得多自己的实现。如何以分布式安全方式添加此行号?
来自阅读 zipWithIndex ,它似乎很有用,但不幸的是,它似乎要求分区顺序稳定
最佳答案
假设我们有以下设置,用于在磁盘上创建一个包含我们控制的内容的 .csv
文件:
from pyspark.sql import types as T, functions as F, SparkSession
import os
import tempfile
spark = SparkSession.builder.getOrCreate()
# Synthesize DataFrames
schema = T.StructType([
T.StructField("col_1", T.StringType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("col_3", T.StringType(), False),
T.StructField("col_4", T.IntegerType(), False),
])
data = [
{"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
]
final_data = []
# Scale up a bit
for _ in range(3):
final_data += data
def refresh():
df = spark.createDataFrame(final_data, schema)
with tempfile.TemporaryDirectory() as tmpdirname:
pathname = os.path.join(
tmpdirname + "output.csv"
)
df.write.format("csv").option(
"header",
True
).save(pathname)
return spark.read.option(
"header",
True
).csv(pathname)
在此设置中,我们可以重复创建 .csv
文件并将它们保存到磁盘,然后像第一次解析它们时那样检索它们。
我们解析这些文件的策略将归结为以下几点:
zipWithIndex
创建一个 row_number 标识符这看起来像下面这样:
def create_index(
parsed_df,
row_number_column_name="index",
file_name_column_name="_file_name",
block_start_column_name="_block_start",
row_id_column_name="_row_id",
):
unindexed_df = parsed_df.selectExpr(
*parsed_df.columns,
f"input_file_name() AS {file_name_column_name}",
f"input_file_block_start() AS {block_start_column_name}",
f"monotonically_increasing_id() AS {row_id_column_name}"
).orderBy(
file_name_column_name,
block_start_column_name,
row_id_column_name
)
# Unfortunately, we have to unwrap the results of zipWithIndex, so there's some aliasing required
input_cols = unindexed_df.columns
zipped = unindexed_df.rdd.zipWithIndex().toDF()
aliased_columns = []
for input_col in input_cols:
aliased_columns += [zipped["_1." + input_col].alias(input_col)]
# Alias the original columns, remove the ones we built internally
return zipped.select(
*aliased_columns,
zipped["_2"].alias(row_number_column_name)
).drop(
file_name_column_name,
block_start_column_name,
row_id_column_name
)
example_df = refresh()
example_df = create_index(example_df)
example_df.show()
"""
+-----+-----+------+-----+-----+
|col_1|col_2| col_3|col_4|index|
+-----+-----+------+-----+-----+
|key_1| 1|CREATE| 0| 0|
|key_2| 2|CREATE| 0| 1|
|key_3| 3|CREATE| 0| 2|
|key_1| 1|CREATE| 0| 3|
|key_2| 2|CREATE| 0| 4|
|key_3| 3|CREATE| 0| 5|
|key_1| 1|CREATE| 0| 6|
|key_2| 2|CREATE| 0| 7|
|key_3| 3|CREATE| 0| 8|
+-----+-----+------+-----+-----+
"""
关于pyspark - 如何添加一列指示磁盘上文件的行号?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69996831/
我正在 csv 上使用 hadoop 来分析一些数据。我使用sql/mysql(不确定)来分析数据,现在陷入了僵局。 我花了好几个小时在谷歌上搜索,却没有找到任何相关的东西。我需要一个查询,在该查询中
我正在为 Bootstrap 网格布局的“简单”任务而苦苦挣扎。我希望在大视口(viewport)上有 4 列,然后在中型设备上有 2 列,最后在较小的设备上只有 1 列。 当我测试我的代码片段时,似
对于这个令人困惑的标题,我深表歉意,我想不出这个问题的正确措辞。相反,我只会给你背景信息和目标: 这是在一个表中,一个人可能有也可能没有多行数据,这些行可能包含相同的 activity_id 值,也可
具有 3 列的数据库表 - A int , B int , C int 我的问题是: 如何使用 Sequelize 结果找到 A > B + C const countTasks = await Ta
我在通过以下功能编写此查询时遇到问题: 首先按第 2 列 DESC 排序,然后从“不同的第 1 列”中选择 只有 Column1 是 DISTINCT 此查询没有帮助,因为它首先从第 1 列中进行选择
使用 Bootstrap 非常有趣和有帮助,目前我在创建以下需求时遇到问题。 “使用 bootstrap 在桌面上有 4 列,在平板电脑上有 2 列,在移动设备上有 1 列”谁能告诉我正确的结构 最佳
我是 R 新手,正在问一个非常基本的问题。当然,我在尝试从所提供的示例中获取指导的同时做了功课here和 here ,但无法在我的案例中实现这个想法,即可能是由于我的问题中的比较维度更大。 我的实
通常我会使用 R 并执行 merge.by,但这个文件似乎太大了,部门中的任何一台计算机都无法处理它! (任何从事遗传学工作的人的附加信息)本质上,插补似乎删除了 snp ID 的 rs 数字,我只剩
我有一个 df , delta1 delta2 0 -1 2 0 -1 0 0 0 我想知道如何分配 delt
您好,我想知道是否可以执行以下操作。显然,我已经尝试在 phpMyAdmin 中运行它,但出现错误。也许还有另一种方式来编写此查询。 SELECT * FROM eat_eat_restaurants
我有 2 个列表(标题和数据值)。我想要将数据值列 1 匹配并替换为头文件列 1,以获得与 dataValue 列 1 和标题值列 2 匹配的值 头文件 TotalLoad,M0001001 Hois
我有两个不同长度的文件,file2 是一个很大的引用文件,我从中提取文件 1 的数据。 我有一行 awk,我通常会对其进行调整以在我的文件中进行查找和替换,但它总是在同一列中进行查找和替换。 所以对于
假设我有两个表,如下所示。 create table contract( c_ID number(1) primary key, c_name varchar2(50) not
我有一个带有 varchar 列的 H2 表,其检查约束定义如下: CONSTRAINT my_constraint CHECK (varchar_field <> '') 以下插入语句失败,但当我删
这是最少量的代码,可以清楚地说明我的问题: One Two Three 前 2 个 div 应该是 2 个左列。第三个应该占据页面的其余部分。最后,我将添加选项来隐藏和
在 Azure 中的 Log Analytics 中,我为 VM Heartbeat 选择一个预定义查询,我在编辑器中运行查询正常,但当我去创建警报时,我不断收到警报“查询未返回 TimeGenera
在 Azure 中的 Log Analytics 中,我为 VM Heartbeat 选择一个预定义查询,我在编辑器中运行查询正常,但当我去创建警报时,我不断收到警报“查询未返回 TimeGenera
今天我开始使用 JexcelApi 并遇到了这个:当您尝试从特定位置获取元素时,不是像您通常期望的那样使用sheet.getCell(row,col),而是使用sheet.getCell(col,ro
我有一个包含 28 列的数据库。第一列是代码,第二列是名称,其余是值。 public void displayData() { con.Open(); MySqlDataAdapter
我很沮丧:每当我缩小这个网页时,一切都变得一团糟。我如何将网页居中,以便我可以缩小并且元素不会被错误定位。 (它应该是 2 列,但所有内容都合并为 1)我试过 但由于某种原因,这不起作用。 www.o
我是一名优秀的程序员,十分优秀!