gpt4 book ai didi

c# - .NET InsertMediaUpload 将 CSV 上传到 BigQuery 时的奇怪问题

转载 作者:太空宇宙 更新时间:2023-11-03 10:31:50 26 4
gpt4 key购买 nike

我正在使用 .NET 客户端 API:IUploadProgress progress = insertMediaUpload.Upload() 将 csv 上传到 BigQuery。基本上我做的是:

1.提交上传任务,
2. 获取作业状态(pending, running, done..)
3. 如果BigQuery报错,则打印报错并抛出异常,以便进一步处理。

下面的代码不是我想要的,我希望有人能帮助我改进它。

具体来说,发生了几个奇怪的代码行为:
1. 即使在故意失败的同一 CSV 上运行相同的代码,在 UploadOnResponseReceived() 中解析出的 BQ 错误消息也会在某些调用中打印出来,但不会在其他调用中打印出来。为什么?
2. IUploadProgress 值似乎与 UploadOnResponseReceived() 行为有关:如果我在 UploadOnResponseReceived 中什么都不做,那么 progress.status 将始终为“已完成”,如果 UploadOnResponseReceived 抛出异常,则 progress.status 将失败。
3. 当progress.status 失败时,无法获取UploadOnResponseReceived 抛出的异常。我实际上确实需要获得异常,我该怎么办?

 public bool ExecuteUploadJobToTable(string dataset, string tableId, string filePath, TableSchema schema, string createDisposition, char delimiter)
{

TableReference destTable = new TableReference { ProjectId = _account.ProjectId, DatasetId = dataset, TableId = tableId };

JobConfigurationLoad configLoad = new JobConfigurationLoad
{
Schema = schema,
DestinationTable = destTable,
Encoding = "ISO-8859-1",
CreateDisposition = "CREATE_IF_NEEDED",
WriteDisposition = createDisposition,
FieldDelimiter = delimiter.ToString(),
AllowJaggedRows = true,
SourceFormat = "CSV"
};

JobConfiguration config = new JobConfiguration {Load = configLoad};

Job job = new Job {Configuration = config};

//set job reference (mainly job id)
JobReference jobRef = new JobReference
{
JobId = GenerateJobID("Upload"),
ProjectId = _account.ProjectId
};
job.JobReference = jobRef;

bool isSuccess = true;
using (var fileStream = new FileStream(filePath, FileMode.Open))
{
JobsResource.InsertMediaUpload insertMediaUpload = new JobsResource.InsertMediaUpload(BigQueryService, job, job.JobReference.ProjectId, stream: fileStream, contentType: "application/octet-stream");
insertMediaUpload.ProgressChanged += UploadOnProgressChanged;
insertMediaUpload.ResponseReceived += UploadOnResponseReceived;

Console.WriteLine(string.Format("start {0}",jobRef.JobId));
IUploadProgress progress = insertMediaUpload.Upload();
if (progress.Status.ToString().Contains("Fail"))
{
isSuccess = false;
}
}
Console.WriteLine(isSuccess);
return isSuccess;
}

private void UploadOnProgressChanged(IUploadProgress process)
{
Console.WriteLine(process.Status + " " + process.BytesSent);
}

//thowring an exception will make IUploadProgress "Failed", otherwise, IUploadProgress will be "Completed"
private void UploadOnResponseReceived(Job job)
{
try
{
job = PollUntilJobDone(job.JobReference, 5);
}
catch(Exception e)
{
Console.WriteLine("Unexcepted unretryable exception happens when poll job status");
throw new BigQueryException("Unexcepted unretryable exception happens when poll job status",e);
}

StringBuilder errorMessageBuilder = new StringBuilder();
ErrorProto fatalError = job.Status.ErrorResult;
IList<ErrorProto> errors = job.Status.Errors;
if (fatalError != null)
{
errorMessageBuilder.AppendLine("Job failed while writing to Bigquery. " + fatalError.Reason + ": " + fatalError.Message +
" at " + fatalError.Location);
}
if (errors != null)
{
foreach (ErrorProto error in errors)
{
errorMessageBuilder.AppendLine("Error: [REASON] " + error.Reason + " [MESSAGE] " + error.Message +
" [LOCATION] " + error.Location);
}

}
if (errorMessageBuilder.Length>0)//fatalError != null || errors != null
{
Console.WriteLine(errorMessageBuilder.ToString());
throw new BigQueryException(errorMessageBuilder.ToString());
}
Console.WriteLine("upload should be successful");
}

private Job PollUntilJobDone(JobReference jobReference, int pauseSeconds)
{
int backoff = 1000;//backoff starts from 1 sec + random

for(int i = 0; i < 10; i++)
{
try
{
var pollJob = BigQueryService.Jobs.Get(jobReference.ProjectId, jobReference.JobId).Execute();
Console.WriteLine(jobReference.JobId + ": " + pollJob.Status.State);
if (pollJob.Status.State.Equals("DONE"))
{
return pollJob;
}
// Pause execution for pauseSeconds before polling job status again,
// to reduce unnecessary calls to the BigQuery API and lower overall
// application bandwidth.
Thread.Sleep(pauseSeconds * 1000);
}
catch (Exception e)
{
BigQueryException exception = new BigQueryException(e.Message,e);
if (exception.IsTemporary)
{
int sleep = backoff + Random.Next(1000);
Console.WriteLine("pollUntilJobDone job execute failed. Sleeping {0} ms before retry", sleep);
Thread.Sleep(sleep);
}
else
{
throw;
}
}
backoff *= 2;
}
return null;
}

最佳答案

关于您的“我如何捕获异常”问题,回调似乎在另一个线程上异步发生。如果您抛出异常,它会被调用回调的任何框架捕获。

在搜索类似问题时,我找到了这些可能对您有帮助的答案:Catching an exception thrown in an asynchronous callback ,这个显示了如何根据后台线程中收到的上传进度更新另一个线程中的 UI:Tracking upload progress of WebClient

关于c# - .NET InsertMediaUpload 将 CSV 上传到 BigQuery 时的奇怪问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29833955/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com