gpt4 book ai didi

apache-kafka - 如何修改kafka连接器?

转载 作者:行者123 更新时间:2023-12-05 01:38:27 26 4
gpt4 key购买 nike

我在终端上使用 curl 在 docker 上创建了一个 debezium 连接器,但我一直在修改现有的连接器。

我的 docker 文件:

---
version: '3'
services:

kafka-connect-02:
image: confluentinc/cp-kafka-connect:latest
container_name: kafka-connect-02
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "https://***9092"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-02'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group-01-v04
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://***9092"
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "***:***"
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
# Confluent Cloud config
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
#
CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
#
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"***\" password=\"**";"
CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
# External secrets config
# See https://docs.confluent.io/current/connect/security.html#externalizing-secrets
CONNECT_CONFIG_PROVIDERS: 'file'
CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
command:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:0.10.0
confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:0.5.5
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &

#
sleep infinity

我的 debezium 连接器:

curl -i -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/Procura_CDC/config  -d '{  "connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max":"1",
"database.server.name":"***",
"database.hostname":"***",
"database.port":"***",
"database.user":"Kafka",
"database.password":"***",
"database.dbname":"Procura_Prod",
"database.history.kafka.bootstrap.servers":"*****",
"database.history.kafka.topic":"dbhistory.procura",
"table.whitelist":"dbo.CLIENTS,dbo.VISITS",
"poll.interval.ms":"2000",
"snapshot.fetch.size":"2000",
"snapshot.mode":"initial",
"snapshot.isolation.mode":"snapshot",
"transforms":"unwrap,dropPrefix",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"procura.dbo.(.*)",
"transforms.dropPrefix.replacement":"$1" }'

我在修改已使用上述代码创建的 debezium 连接器时遇到错误。使用 PUTPOST 方法也不起作用。收到错误 “curl: (7) failed to connect to localhost port 8083: Connection refused”“curl: (56) Recv failure: connection reset by peer.”或者 "error_code":500,"message":"Request timed out" 不知道怎么修改。 docker 新手,如有任何帮助,我们将不胜感激。谢谢

最佳答案

您可以使用 PUT 创建和更新连接器配置。这是一个例子:

curl -i -X PUT -H  "Content-Type:application/json" \
http://localhost:8083/connectors/source-file-01/config \
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/tmp/totail.txt",
"topic": "foo",
"tasks.max": 6
}'

这将创建(或修改)名为 source-file-01 的连接器。如果你想改变它的配置,你可以在改变必要的值的同时重新发出 PUT

这种重新运行命令的能力是为什么在创建连接器时我总是更喜欢 PUT 而不是 POST 的原因,因为您不需要根据需要更改运行方式关于连接器是否已经存在。

关于apache-kafka - 如何修改kafka连接器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59834538/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com