gpt4 book ai didi

azure - 将数据从 blob 存储复制到 sqlDatabase(到多个表中)

转载 作者:行者123 更新时间:2023-12-03 02:46:24 27 4
gpt4 key购买 nike

我对 azure 比较陌生,我发现自己陷入了困境!我正在尝试使用 Azure DataFactory 将数据从 Blob 存储读取到 SQL 数据库中。我使用复制事件使此过程正常工作,现在我尝试将数据插入到以某种方式相互关联的多个表中(privateKey、foreignKey)。 例如,要更新表 CAR,我需要知道表所有者中是否存在所有者。而且我无法找到有关如何进行的详细解释!有哪位有经验的可以给我一些指导吗?谢谢

最佳答案

我会采取不同的方法来解决这个问题。使用下面的代码,我们可以将多个文件中的数据(所有文件都具有相似的名称)合并到一个数据框中,并将整个文件推送到 SQL Server 中。这是 Scala,因此需要在 Azure Databricks 环境中运行。

# merge files with similar names into a single dataframe
val DF = spark.read.format("csv")
.option("sep","|")
.option("inferSchema","true")
.option("header","false")
.load("mnt/rawdata/corp/ABC*.gz")


DF.count()


# rename headers in dataframe
val newNames = Seq("ID", "FName", "LName", "Address", "ZipCode", "file_name")
val dfRenamed = df.toDF(newNames: _*)

dfRenamed.printSchema


# push the dataframe to sql server
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Aquire a DataFrame collection (val collection)

val config = Config(Map(
"url" -> "my_sql_server.database.windows.net",
"databaseName" -> "my_db_name",
"dbTable" -> "dbo.my_table",
"user" -> "xxxxx",
"password" -> "xxxxx",
"connectTimeout" -> "5", //seconds
"queryTimeout" -> "5" //seconds
))

import org.apache.spark.sql.SaveMode
DF.write.mode(SaveMode.Append).sqlDB(config)

上面的代码将读取每个文件的每一行。如果标题位于第一行,则效果很好。如果标题不在第一行,请使用下面的代码创建特定架构,然后再次读取每个文件的每一行。

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.functions.input_file_name

val customSchema = StructType(Array(
StructField("field1", StringType, true),
StructField("field2", StringType, true),
StructField("field3", StringType, true),
StructField("field4", StringType, true),
StructField("field5", StringType, true),
StructField("field6", StringType, true),
StructField("field7", StringType, true)))

val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "false")
.option("sep", "|")
.schema(customSchema)
.load("mnt/rawdata/corp/ABC*.gz")
.withColumn("file_name", input_file_name())


import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._



val bulkCopyConfig = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"databaseName" -> "MyDatabase",
"dbTable" -> "dbo.Clients",
"bulkCopyBatchSize" -> "2500",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))

df.write.mode(SaveMode.Append).
//df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.

关于azure - 将数据从 blob 存储复制到 sqlDatabase(到多个表中),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58505987/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com