gpt4 book ai didi

python - 添加requirements.txt [Python]时数据流失败

转载 作者:行者123 更新时间:2023-12-04 13:14:12 26 4
gpt4 key购买 nike

因此,当我尝试使用 DataflowRunner 运行数据流并包含如下所示的 requirements.txt 时

google-cloud-storage==1.28.1
pandas==1.0.3
smart-open==2.0.0

每次在这条线上失败
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://..../beamapp-.../numpy-1.18.2.zip...
Traceback (most recent call last):
File "Database.py", line 107, in <module>
run()
File "Database.py", line 101, in run
| 'Write CSV' >> beam.ParDo(WriteCSVFIle(options.output_bucket,
pandora_options.output_folder))
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in __exit__
self.run().wait_until_finish()
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 483, in run
self._options).run(False)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 496, in run
return self.runner.run_pipeline(self, self._options)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 548, in run_pipeline
self.dataflow_client.create_job(self.job), self)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
return fun(*args, **kwargs)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 624, in create_job
self.create_job_description(job)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 680, in create_job_description
resources = self._stage_resources(job.options)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 577, in _stage_resources
staging_location=google_cloud_options.staging_location)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/portability/stager.py", line 182, in stage_job_resources
pkg, FileSystems.join(staging_location, os.path.basename(pkg)))
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 942, in stage_artifact
local_path_to_artifact, artifact_name)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
return fun(*args, **kwargs)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 564, in _gcs_file_copy
self.stage_file(to_folder, to_name, f, total_size=total_size)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 602, in stage_file
response = self._storage_client.objects.Insert(request, upload=upload)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1156, in Insert
upload=upload, upload_config=upload_config)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
http_request, client=self.client)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
return self.StreamInChunks()
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
additional_headers=additional_headers)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 957, in __StreamMedia
response = send_func(self.stream.tell())
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 943, in CallSendChunk
start, additional_headers=additional_headers)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1120, in __SendChunk
return self.__SendMediaRequest(request, end)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1033, in __SendMediaRequest
retries=self.num_retries, check_response_func=CheckResponse)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 356, in MakeRequest
max_retry_wait, total_wait_sec))
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 304, in HandleExceptionsAndRebuildHttpConnections
raise retry_args.exc
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 346, in MakeRequest
check_response_func=check_response_func)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 396, in _MakeRequestNoRetry
redirections=redirections, connection_type=connection_type)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
redirections, connection_type)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
redirections, connection_type)
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1991, in request
cachekey,
File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1690, in _request
content,
httplib2.RedirectMissingLocation: Redirected but the response is missing a Location: header.

这是我正在运行的命令
python Database.py     
--runner DataflowRunner
--project XXX
--staging_location gs://.../staging
--temp_location gs://.../temp
--template_location gs://.../Template
--requirements_file requirements.txt

如果我删除 --requirements_file requirements.txt 它会完成但是当我尝试运行该作业时它会失败,因为它找不到包。
  • 我正在使用云存储列出存储桶中的所有文件,因此如果您有其他不涉及云存储的解决方案,将不胜感激

  • 这是我的 dataflow-requirements-cache 文件夹。在清理它之前,我有多个不同版本的文件,例如
    botocore-1.16.16.tar.gz
    botocore-1.16.17.tar.gz
    botocore-1.16.18.tar.gz

    我清理后它看起来像这样,(尝试上传numpy时仍然失败)
    numpy-1.18.4.zip
    urllib3-1.25.9.tar.gz
    smart_open-2.0.0.tar.gz
    six-1.15.0.tar.gz
    setuptools-47.1.0.zip
    s3transfer-0.3.3.tar.gz
    rsa-4.0.tar.gz
    requests-2.23.0.tar.gz
    pytz-2020.1.tar.gz
    python-dateutil-2.8.1.tar.gz
    pyasn1-modules-0.2.8.tar.gz
    pyasn1-0.4.8.tar.gz
    protobuf-3.12.2.tar.gz
    pandas-1.0.3.tar.gz
    jmespath-0.10.0.tar.gz
    idna-2.9.tar.gz
    googleapis-common-protos-1.51.0.tar.gz
    google-resumable-media-0.5.0.tar.gz
    google-cloud-storage-1.28.1.tar.gz
    google-cloud-core-1.3.0.tar.gz
    google-auth-1.15.0.tar.gz
    google-api-core-1.17.0.tar.gz
    docutils-0.15.2.tar.gz
    chardet-3.0.4.tar.gz
    certifi-2020.4.5.1.tar.gz
    cachetools-4.1.0.tar.gz
    botocore-1.16.18.tar.gz
    boto3-1.13.18.tar.gz
    boto-2.49.0.tar.gz

    - - 编辑 - -
    完整的输出
    (airflow) afragotsis-mac:pandora_database afragotsis$ python PandoraDatabase.py \
    > --runner DataflowRunner \
    > --project XXX \
    > --staging_location gs://.../dataflow-template/PandoraDatabase/staging \
    > --temp_location gs://.../dataflow-template/PandoraDatabase/temp \
    > --template_location gs://.../dataflow-template/PandoraDatabase/pandoraTemplate \
    > --requirements_file requirements.txt \
    > --save_main_session True
    WARNING:apache_beam.options.pipeline_options:--region not set; will default to us-central1. Future releases of Beam will require the user to set --region explicitly, or else have a default set via the gcloud tool. https://cloud.google.com/compute/docs/regions-zones
    INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
    INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pipeline.pb...
    INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
    INFO:oauth2client.client:Refreshing access_token
    INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
    INFO:oauth2client.client:Refreshing access_token
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pipeline.pb in 0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/requirements.txt...
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/requirements.txt in 0 seconds.
    INFO:apache_beam.runners.portability.stager:Executing command: ['/Users/afragotsis/opt/anaconda3/envs/airflow/bin/python', '-m', 'pip', 'download', '--dest', '/var/folders/zj/dqg766ks0cx663lg7brll7b80000gn/T/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/rsa-4.0.tar.gz...
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/rsa-4.0.tar.gz in 0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/urllib3-1.25.9.tar.gz...
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/urllib3-1.25.9.tar.gz in 0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/boto3-1.13.19.tar.gz...
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/boto3-1.13.19.tar.gz in 0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pyasn1-modules-0.2.8.tar.gz...
    INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/pyasn1-modules-0.2.8.tar.gz in 0 seconds.
    INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://.../dataflow-template/PandoraDatabase/staging/beamapp-afragotsis-0529200636-871276.1590782796.871390/numpy-1.18.4.zip...
    Traceback (most recent call last):
    File "PandoraDatabase.py", line 125, in <module>
    run()
    File "PandoraDatabase.py", line 119, in run
    | 'Write CSV' >> beam.ParDo(WriteCSVFIle(pandora_options.output_bucket, pandora_options.output_folder))
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in __exit__
    self.run().wait_until_finish()
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 483, in run
    self._options).run(False)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/pipeline.py", line 496, in run
    return self.runner.run_pipeline(self, self._options)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 548, in run_pipeline
    self.dataflow_client.create_job(self.job), self)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
    return fun(*args, **kwargs)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 624, in create_job
    self.create_job_description(job)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 680, in create_job_description
    resources = self._stage_resources(job.options)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 577, in _stage_resources
    staging_location=google_cloud_options.staging_location)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/portability/stager.py", line 182, in stage_job_resources
    pkg, FileSystems.join(staging_location, os.path.basename(pkg)))
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 942, in stage_artifact
    local_path_to_artifact, artifact_name)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 234, in wrapper
    return fun(*args, **kwargs)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 564, in _gcs_file_copy
    self.stage_file(to_folder, to_name, f, total_size=total_size)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 602, in stage_file
    response = self._storage_client.objects.Insert(request, upload=upload)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1156, in Insert
    upload=upload, upload_config=upload_config)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
    http_request, client=self.client)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
    return self.StreamInChunks()
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
    additional_headers=additional_headers)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 957, in __StreamMedia
    response = send_func(self.stream.tell())
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 943, in CallSendChunk
    start, additional_headers=additional_headers)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1120, in __SendChunk
    return self.__SendMediaRequest(request, end)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/transfer.py", line 1033, in __SendMediaRequest
    retries=self.num_retries, check_response_func=CheckResponse)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 356, in MakeRequest
    max_retry_wait, total_wait_sec))
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 304, in HandleExceptionsAndRebuildHttpConnections
    raise retry_args.exc
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 346, in MakeRequest
    check_response_func=check_response_func)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 396, in _MakeRequestNoRetry
    redirections=redirections, connection_type=connection_type)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
    redirections, connection_type)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
    redirections, connection_type)
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1991, in request
    cachekey,
    File "/Users/afragotsis/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/httplib2/__init__.py", line 1690, in _request
    content,
    httplib2.RedirectMissingLocation: Redirected but the response is missing a Location: header.

    dataflow-requirements-cache 的完整路径
    /private/var/folders/zj/dqg766ks0cx663lg7brll7b80000gn/T/dataflow-requirements-cache

    尝试上传 numpy 时总是失败

    最佳答案

    好的,所以无论我尝试什么,我都无法使其与需求文件一起使用。所以我尝试了安装文件。所以现在命令看起来像这样

    python Database.py     
    --runner DataflowRunner
    --project XXX
    --staging_location gs://.../staging
    --temp_location gs://.../temp
    --template_location gs://.../Template
    --setup_file /Users/.../setup.py \
    --save_main_session True

    和安装文件是这个
    import setuptools

    REQUIRED_PACKAGES = [
    'google-cloud-storage==1.28.1',
    'pandas==1.0.3',
    'smart-open==2.0.0'
    ]

    PACKAGE_NAME = 'my_package'
    PACKAGE_VERSION = '0.0.1'

    setuptools.setup(
    name=PACKAGE_NAME,
    version=PACKAGE_VERSION,
    description='Example project',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    )

    关于python - 添加requirements.txt [Python]时数据流失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62032382/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com