gpt4 book ai didi

java - 如何将 JSON 对象添加到 apache Spark 中的数据集/数据帧

转载 作者:行者123 更新时间:2023-12-02 11:26:36 25 4
gpt4 key购买 nike

我想知道是否有可能使用 Spark Dataset API 创建自定义 JSON

或 Apache Spark 提供的任何其他功能。我知道我可以使用 join() 方法加入两个数据集,但我想创建自定义 JSON,其中数据集 2(即在我的情况下发出警报)将作为 JSON 对象添加到数据集 1(即库存)使用“ALERT”键。

Dataset<Row> inventory = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("C:\\Users\\phyadavi\\LearningAndDevelopment\\\\CDXJSONMergeJob\\data1\\inventory.json");
Dataset<Row> alerts = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("C:\\Users\\phyadavi\\LearningAndDevelopment\\\\CDXJSONMergeJob\\data1\\alert.json");

Dataset<Row> inventoryAlerts = inventory.join(alerts);
inventoryAlerts.printSchema();

库存和警报的架构如下。

root
|-- Equipment: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- collectedPid: string (nullable = true)
| | | |-- collectedSerialNum: string (nullable = true)
| | | |-- containingHwId: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- items: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- tagName: string (nullable = true)
| | | | | |-- tagValue: string (nullable = true)
| | | |-- pceMultiPid: string (nullable = true)
| | | |-- pcePhyiscalType: string (nullable = true)
| | | |-- pcePid: string (nullable = true)
| | | |-- pceProductDescription: string (nullable = true)
| | | |-- pceProductFamily: string (nullable = true)
| | | |-- pceProductType: string (nullable = true)
| | | |-- pceRuleId: string (nullable = true)
| | | |-- productDescription: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- productType: string (nullable = true)
| | | |-- serialNumber: string (nullable = true)
| | | |-- snasItemType: string (nullable = true)
| | | |-- snasProductFamily: string (nullable = true)
| | | |-- snasSerialNumber: string (nullable = true)
| | | |-- snasValidationCode: string (nullable = true)
| | | |-- snasValidationSource: string (nullable = true)
|-- LicenseActivated: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- count: long (nullable = true)
| | | |-- type: string (nullable = true)
|-- NetworkElement: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- hostname: string (nullable = true)
| | | |-- ipAddress: string (nullable = true)
| | | |-- isManagedNe: boolean (nullable = true)
| | | |-- items: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- tagName: string (nullable = true)
| | | | | |-- tagValue: string (nullable = true)
| | | |-- lastUpdateDate: long (nullable = true)
| | | |-- managedNeId: string (nullable = true)
| | | |-- managementAddress: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- neName: string (nullable = true)
| | | |-- neRegistrationStatus: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- productType: string (nullable = true)
| | | |-- serialNumber: string (nullable = true)
| | | |-- smartLicenseProductInstanceIdentifier: string (nullable = true)
| | | |-- smartLicenseVirtualAccountName: string (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- softwareVersion: string (nullable = true)
| | | |-- systemUptime: long (nullable = true)
| | | |-- udiProductIdentifier: string (nullable = true)
|-- Versions: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- lastUpdated: long (nullable = true)
| | | |-- type: string (nullable = true)
| | | |-- version: string (nullable = true)
|-- collectorId: string (nullable = true)
|-- generatedAt: long (nullable = true)
|-- managedNeId: string (nullable = true)
|-- partyId: string (nullable = true)
|-- recordType: string (nullable = true)
|-- sourceNeId: string (nullable = true)
|-- sourcePartyId: string (nullable = true)
|-- sourceSubPartyId: string (nullable = true)
|-- wfid: string (nullable = true)

#####################################
root
|-- collectorId: string (nullable = true)
|-- generatedAt: long (nullable = true)
|-- managedNeId: string (nullable = true)
|-- neAlert: struct (nullable = true)
| |-- advisory: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- headlineName: string (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- psirtId: long (nullable = true)
| | | |-- publicReleaseInd: string (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- softwareVersion: string (nullable = true)
| | | |-- vulnerabilityReason: string (nullable = true)
| | | |-- vulnerabilityStatus: string (nullable = true)
| |-- fieldNotice: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- caveat: string (nullable = true)
| | | |-- distributionCode: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- fieldNoticeId: long (nullable = true)
| | | |-- fieldNoticeName: string (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- serialNumber: string (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- vulnerabilityReason: string (nullable = true)
| | | |-- vulnerabilityStatus: string (nullable = true)
| |-- hwEoX: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- bulletinName: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- hardwareEoXId: long (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productId: string (nullable = true)
| |-- swEoX: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- bulletinHeadline: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- softwareEoXId: long (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- softwareVersion: string (nullable = true)
|-- partyId: string (nullable = true)
|-- recordType: string (nullable = true)
|-- sourceNeId: string (nullable = true)
|-- sourcePartyId: string (nullable = true)
|-- sourceSubPartyId: string (nullable = true)
|-- wfid: string (nullable = true)

最佳答案

如果您想要连接并将一个数据集中的字段保留为嵌套,您可以使用 struct 创建一个 StructType 列并按如下方式连接

import org.apache.spark.sql.functions.udf

Dataset<Row> inventory = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("path to json inventory");
Dataset<Row> alerts = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("path to alerts json")
.select($"partyId", struct("columns").as("ALERTS"));
//column names are all the columns that you want in nested fiels with comma separated

Dataset<Row> inventoryAlerts = inventory.join(alerts);
inventoryAlerts.printSchema();

这应该在加入之后为您提供所需的架构

关于java - 如何将 JSON 对象添加到 apache Spark 中的数据集/数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49576171/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com