gpt4 book ai didi

postgresql - 重新创建数据库容器时,Debezium源任务无法重新连接到PostgreSQL数据库

转载 作者:行者123 更新时间:2023-12-02 12:06:54 32 4
gpt4 key购买 nike

我们有一个Kubernetes集群,其中Debezium作为Postgresql的源任务运行并写入kafka。 Debezium,postgres和kafka都在单独的Pod中运行。
当删除postgres pod并kubernetes重新创建pod时,debezium pod无法重新连接。
来自debezium pod的日志:

    2018-07-17 08:31:38,311 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
2018-07-17 08:31:38,311 INFO || [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms. [org.apache.kafka.clients.producer.KafkaProducer]

Debezium继续尝试定期刷新未完成的消息,但给出以下异常:
    2018-07-17 08:32:38,167 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Exception thrown while calling task.commit()   [org.apache.kafka.connect.runtime.WorkerSourceTask]
org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Database connection failed when writing to copy
at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:151)
at io.debezium.connector.postgresql.PostgresConnectorTask.commit(PostgresConnectorTask.java:138)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:437)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:378)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:108)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:45)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:82)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:942)
at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:23)
at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:176)
at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(V3PGReplicationStream.java:99)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.doFlushLsn(PostgresReplicationConnection.java:246)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.flushLsn(PostgresReplicationConnection.java:239)
at io.debezium.connector.postgresql.RecordsStreamProducer.commit(RecordsStreamProducer.java:146)
... 13 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.postgresql.core.PGStream.flush(PGStream.java:553)
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:939)
... 19 more

有一种方法可以让Debezium在可用时重新建立其与postgres的连接吗?
还是我缺少一些配置?
  • Debezium版本0.8
  • kubernetes版本1.10.3
  • postgres 9.6版
  • 最佳答案

    看起来这是一个常见问题,在debezium和kafka中都有开放功能请求

    https://issues.jboss.org/browse/DBZ-248

    https://issues.apache.org/jira/browse/KAFKA-5352

    虽然这些是开放的,但看起来这是预期的行为

    作为解决方法,我已将该 Activity 探针添加到部署中

        livenessProbe:
    exec:
    command:
    - sh
    - -ec
    - ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/'); reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l); if [ $reply -lt 2 ]; then exit 1; fi;
    initialDelaySeconds: 30
    periodSeconds: 5

    第一个子句获取容器IP地址:
        ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/');

    第二个子句发出请求,并计算响应json中“RUNNING”的实例:
        reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l);

    如果“RUNNING”出现少于两次,则第三子句返回退出代码1
        if [ $reply -lt 2 ]; then exit 1; fi

    它似乎正在初步测试中工作-即重新启动postgres DB会触发重新启动debezium容器。我猜可以在图像中包含类似这样的脚本(尽管可能是“经过增强的”),以利于探查。

    关于postgresql - 重新创建数据库容器时,Debezium源任务无法重新连接到PostgreSQL数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51377720/

    32 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com