gpt4 book ai didi

google-cloud-dataflow - Cloud Dataflow - Dataflow 如何进行并行处理?

转载 作者:行者123 更新时间:2023-12-04 13:02:53 28 4
gpt4 key购买 nike

我的问题是,在幕后,对于元素级 Beam DoFn (ParDo),Cloud Dataflow 的并行工作负载如何?例如,在我的 ParDO 中,我向外部服务器发送一个 http 请求以获取一个元素。我使用 30 个 worker ,每个 worker 有 4vCPU。

  • 这是否意味着每个 worker 最多有 4 个线程?
  • 这是否意味着每个工作人员只需要 4 个 http 连接,或者如果我让它们保持事件状态以获得最佳性能,则可以建立它们?
  • 除了使用更多内核或更多 worker 之外,我如何调整并行度?
  • 使用我当前的设置(30*4vCPU worker),我可以在 http 服务器上建立大约 120 个 http 连接。但是 server 和 worker 的资源使用率都非常低。基本上我想通过每秒发送更多请求来让他们更加努力地工作。我该怎么办...

  • 代码片段 说明我的工作:
    public class NewCallServerDoFn extends DoFn<PreparedRequest,KV<PreparedRequest,String>> {


    private static final Logger Logger = LoggerFactory.getLogger(ProcessReponseDoFn.class);

    private static PoolingHttpClientConnectionManager _ConnManager = null;
    private static CloseableHttpClient _HttpClient = null;
    private static HttpRequestRetryHandler _RetryHandler = null;
    private static String[] _MapServers = MapServerBatchBeamApplication.CONFIG.getString("mapserver.client.config.server_host").split(",");

    @Setup
    public void setupHttpClient(){

    Logger.info("Setting up HttpClient");

    //Question: the value of maxConnection below is actually 10, but with 30 worker machines, I can only see 115 TCP connections established on the server side. So this setting doesn't really take effect as I expected.....

    int maxConnection = MapServerBatchBeamApplication.CONFIG.getInt("mapserver.client.config.max_connection");
    int timeout = MapServerBatchBeamApplication.CONFIG.getInt("mapserver.client.config.timeout");

    _ConnManager = new PoolingHttpClientConnectionManager();

    for (String mapServer : _MapServers) {
    HttpHost serverHost = new HttpHost(mapServer,80);
    _ConnManager.setMaxPerRoute(new HttpRoute(serverHost),maxConnection);
    }

    // config timeout
    RequestConfig requestConfig = RequestConfig.custom()
    .setConnectTimeout(timeout)
    .setConnectionRequestTimeout(timeout)
    .setSocketTimeout(timeout).build();

    // config retry
    _RetryHandler = new HttpRequestRetryHandler() {

    public boolean retryRequest(
    IOException exception,
    int executionCount,
    HttpContext context) {

    Logger.info(exception.toString());
    Logger.info("try request: " + executionCount);

    if (executionCount >= 5) {
    // Do not retry if over max retry count
    return false;
    }
    if (exception instanceof InterruptedIOException) {
    // Timeout
    return false;
    }
    if (exception instanceof UnknownHostException) {
    // Unknown host
    return false;
    }
    if (exception instanceof ConnectTimeoutException) {
    // Connection refused
    return false;
    }
    if (exception instanceof SSLException) {
    // SSL handshake exception
    return false;
    }
    return true;
    }

    };

    _HttpClient = HttpClients.custom()
    .setConnectionManager(_ConnManager)
    .setDefaultRequestConfig(requestConfig)
    .setRetryHandler(_RetryHandler)
    .build();

    Logger.info("Setting up HttpClient is done.");

    }

    @Teardown
    public void tearDown(){
    Logger.info("Tearing down HttpClient and Connection Manager.");
    try {
    _HttpClient.close();
    _ConnManager.close();
    }catch (Exception e){
    Logger.warn(e.toString());
    }
    Logger.info("HttpClient and Connection Manager have been teared down.");
    }




    @ProcessElement
    public void processElement(ProcessContext c) {

    PreparedRequest request = c.element();

    if(request == null)
    return;

    String response="{\"my_error\":\"failed to get response from map server with retries\"}";


    String chosenServer = _MapServers[request.getHardwareId() % _MapServers.length];

    String parameter;
    try {
    parameter = URLEncoder.encode(request.getRequest(),"UTF-8");
    } catch (UnsupportedEncodingException e) {
    Logger.error(e.toString());

    return;
    }

    StringBuilder sb = new StringBuilder().append(MapServerBatchBeamApplication.CONFIG.getString("mapserver.client.config.api_path"))
    .append("?coordinates=")
    .append(parameter);

    HttpGet getRequest = new HttpGet(sb.toString());
    HttpHost host = new HttpHost(chosenServer,80,"http");
    CloseableHttpResponse httpRes;

    try {
    httpRes = _HttpClient.execute(host,getRequest);
    HttpEntity entity = httpRes.getEntity();
    if(entity != null){
    try
    {
    response = EntityUtils.toString(entity);
    }finally{
    EntityUtils.consume(entity);
    httpRes.close();
    }
    }
    }catch(Exception e){
    Logger.warn("failed by get response from map server with retries for " + request.getRequest());
    }

    c.output(KV.of(request, response));

    }
    }

    最佳答案

  • 是的,基于此 answer .
  • 不,您可以建立更多连接。基于我的 answer ,您可以使用异步 http 客户端来处理更多并发请求。正如这个答案所描述的,您需要收集这些异步调用的结果并在任何 @ProcessElement 中同步输出。或 @FinishBundle .
  • 见2。
  • 由于您的资源使用率很低,这表明工作人员大部分时间都在等待响应。我认为通过上面描述的方法,您可以更好地利用您的资源,并且可以用更少的 worker 实现相同的性能。
  • 关于google-cloud-dataflow - Cloud Dataflow - Dataflow 如何进行并行处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51179443/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com