gpt4 book ai didi

azure-sql-database - 为什么 Azure 数据工厂似乎坚持将 DateTimes 作为字符串插入?

转载 作者:行者123 更新时间:2023-12-01 03:06:23 26 4
gpt4 key购买 nike

我正在尝试设置一个 Azure 数据工厂,将我的数据从 AzureSQL 数据库复制和非规范化到另一个 AzureSQL 数据库,以用于数据流的报告/BI 目的,但我遇到了插入日期的问题。

这是我的数据流的定义。

{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable2",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"script": "\n\nsource(output(\n\t\tBucketId as string,\n\t\tStreamId as string,\n\t\tStreamIdOriginal as string,\n\t\tStreamRevision as integer,\n\t\tItems as integer,\n\t\tCommitId as string,\n\t\tCommitSequence as integer,\n\t\tCommitStamp as timestamp,\n\t\tCheckpointNumber as long,\n\t\tDispatched as boolean,\n\t\tHeaders as binary,\n\t\tPayload as binary\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false,\n\tmapColumn(\n\t\tBucketId,\n\t\tCommitStamp\n\t)) ~> sink1"
}
}
}

这些是我的来源的定义
{
"name": "AzureSqlTable1",
"properties": {
"linkedServiceName": {
"referenceName": "Source_Test",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "AzureSqlTable",
"schema": [
{
"name": "BucketId",
"type": "varchar"
},
{
"name": "StreamId",
"type": "char"
},
{
"name": "StreamIdOriginal",
"type": "nvarchar"
},
{
"name": "StreamRevision",
"type": "int",
"precision": 10
},
{
"name": "Items",
"type": "tinyint",
"precision": 3
},
{
"name": "CommitId",
"type": "uniqueidentifier"
},
{
"name": "CommitSequence",
"type": "int",
"precision": 10
},
{
"name": "CommitStamp",
"type": "datetime2",
"scale": 7
},
{
"name": "CheckpointNumber",
"type": "bigint",
"precision": 19
},
{
"name": "Dispatched",
"type": "bit"
},
{
"name": "Headers",
"type": "varbinary"
},
{
"name": "Payload",
"type": "varbinary"
}
],
"typeProperties": {
"tableName": "[dbo].[Commits]"
}
}
}

和汇数据集
{
"name": "AzureSqlTable2",
"properties": {
"linkedServiceName": {
"referenceName": "Dest_Test",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "AzureSqlTable",
"schema": [],
"typeProperties": {
"tableName": "dbo.Test2"
}
}
}

使用数据流运行管道时,出现以下错误:
Activity dataflow1 failed: DF-EXEC-1 Conversion failed when converting date and/or time from character string.
com.microsoft.sqlserver.jdbc.SQLServerException: Conversion failed when converting date and/or time from character string.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:256)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:108)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:28)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1611)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$200(SQLServerBulkCopy.java:58)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:709)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:739)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1684)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:669)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:127)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:124)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:459)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1401)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

我的 Azure SQL 审计日志显示以下失败的语句(考虑到它使用 VARCHAR(50) 作为 [CommitStamp] 的类型,这并不奇怪:
INSERT BULK dbo.T_301fcb5e4a4148d4a48f2943011b2f04 (
[BucketId] NVARCHAR(MAX),
[CommitStamp] VARCHAR(50),
[StreamId] NVARCHAR(MAX),
[StreamIdOriginal] NVARCHAR(MAX),
[StreamRevision] INT,
[Items] INT,
[CommitId] NVARCHAR(MAX),
[CommitSequence] INT,
[CheckpointNumber] BIGINT,
[Dispatched] BIT,
[Headers] VARBINARY(MAX),
[Payload] VARBINARY(MAX),
[r8e440f7252bb401b9ead107597de6293] INT)
with (ROWS_PER_BATCH = 4096, TABLOCK)

我完全不知道为什么会发生这种情况。看起来模式信息是正确的,但不知何故,数据工厂/数据流似乎想要插入 CommitStamp作为字符串类型。

根据要求,数据流/代码/计划 View 的输出:


source(output(
BucketId as string,
StreamId as string,
StreamIdOriginal as string,
StreamRevision as integer,
Items as integer,
CommitId as string,
CommitSequence as integer,
CommitStamp as timestamp,
CheckpointNumber as long,
Dispatched as boolean,
Headers as binary,
Payload as binary
),
allowSchemaDrift: true,
validateSchema: false,
isolationLevel: 'READ_UNCOMMITTED',
format: 'table',
schemaName: '[dbo]',
tableName: '[Commits]',
store: 'sqlserver',
server: 'sign2025-sqldata.database.windows.net',
database: 'SignPath.Application',
user: 'Sign2025Admin',
password: '**********') ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
deletable:false,
insertable:true,
updateable:false,
upsertable:false,
mapColumn(
BucketId,
CommitStamp
),
schemaName: 'dbo',
tableName: 'Test2',
store: 'sqlserver',
server: 'sign2025-sqldata.database.windows.net',
database: 'SignPath.Reporting',
user: 'Sign2025Admin',
password: '**********') ~> sink1

最佳答案

我创建了一个数据流,用于将数据从 Azure SQL 数据库复制到另一个 Azure SQL 数据库。成功隐蔽datatime2VARCHAR(50) .

这是我的数据流的定义:

{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DestinationDataset_sto",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "DestinationDataset_mex",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"script": "\n\nsource(output(\n\t\tID as integer,\n\t\ttName as string,\n\t\tmyTime as timestamp\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(input(\n\t\tID as integer,\n\t\ttName as string,\n\t\tmyTime as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false) ~> sink1"
}
}
}

我的来源的定义:
{
"name": "DestinationDataset_sto",
"properties": {
"linkedServiceName": {
"referenceName": "AzureSqlDatabase1",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "AzureSqlTable",
"schema": [
{
"name": "ID",
"type": "int",
"precision": 10
},
{
"name": "tName",
"type": "varchar"
},
{
"name": "myTime",
"type": "datetime2",
"scale": 7
}
],
"typeProperties": {
"tableName": "[dbo].[demo]"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}

我的接收器设置:
{
"name": "DestinationDataset_mex",
"properties": {
"linkedServiceName": {
"referenceName": "AzureSqlDatabase1",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "AzureSqlTable",
"schema": [
{
"name": "ID",
"type": "int",
"precision": 10
},
{
"name": "tName",
"type": "varchar"
},
{
"name": "myTime",
"type": "varchar"
}
],
"typeProperties": {
"tableName": "[dbo].[demo1]"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}

这是我的数据流步骤。
enter image description here

第 1 步:源设置:
enter image description here
enter image description here

第 2 步:接收器设置:
enter image description here
enter image description here
enter image description here
enter image description here

运行成功:
enter image description here

除了 myTime 表 demo 和 demo1 几乎具有相同的模式.

我的源表及其数据:

enter image description here

我的接收器表和从 demo 复制的数据:

enter image description here

数据流计划:
source(output(
ID as integer,
tName as string,
myTime as timestamp
),
allowSchemaDrift: true,
validateSchema: true,
isolationLevel: 'SERIALIZABLE',
format: 'table',
schemaName: '[dbo]',
tableName: '[demo]',
store: 'sqlserver',
server: '****.database.windows.net',
database: '****',
user: 'ServerAdmin',
password: '**********') ~> source1
source1 sink(input(
ID as integer,
tName as string,
myTime as string
),
allowSchemaDrift: true,
validateSchema: false,
format: 'table',
deletable:false,
insertable:true,
updateable:false,
upsertable:false,
schemaName: '[dbo]',
tableName: '[demo1]',
store: 'sqlserver',
server: '****.database.windows.net',
database: '****',
user: 'ServerAdmin',
password: '**********') ~> sink1

更新 1:

我手动创建接收器表,发现:

Data Flow can convert datatime2 to VARCHAR()(maybe NVARCHAR()) , date ,datetimeoffset.



当我尝试日期类型时 time , datetime , datetime2 , smalldatetime , 数据流总是报错:
"message": "DF-EXEC-1 Conversion failed when converting date and/or time from character 

2019-7-11 更新:

我向 Azure 支持寻求帮助,他们回答我:这是数据流的错误,目前没有解决方案。
enter image description here

2019-7-12 更新:

我与 Azure 支持进行了测试,他们认为这是一个错误。这是新电子邮件:
enter image description here

他们还告诉我 修复已经完成,它将在下一个部署列车中部署。这可能是下周结束 .

希望这可以帮助。

关于azure-sql-database - 为什么 Azure 数据工厂似乎坚持将 DateTimes 作为字符串插入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56948054/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com