gpt4 book ai didi

mongodb - 将 dockerized kafka 接收器连接器实现到 mongo

转载 作者:行者123 更新时间:2023-12-02 19:53:41 26 4
gpt4 key购买 nike

我正在尝试使用 docker 实现与 mongodb 和 mysql 的 kafka 连接。

我要的是下图:

Kafka connect as sink to MongoDB and MySQL

Kafka 连接 MongoDB:

我看过official mongodb repository的docker-compose .它有两个问题:

  1. 它对于我的目的来说太复杂了。因为跑了多个mongodb的容器,也用了很多镜像,消耗了这么多资源。

  2. 它有一些 Unresolved 问题,最终导致 kafka 与 mongodb 的连接出现故障。 Here你可以看到我的问题。

我在 docker-compose.yml 中使用 debezium 进行连接的实现如下:

version: '3.2'
services:
kafka:
image: wurstmeister/kafka:latest
ports:
- target: 9094
published: 9094
protocol: tcp
mode: host
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_LOG_DIRS: /kafka/logs
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka:/kafka

zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
volumes:
- zookeeper:/opt/zookeeper-3.4.13

mongo:
image: mongo
container_name: mongo
ports:
- 27017:27017

connect:
image: debezium/connect
container_name: connect
ports:
- 8083:8083
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets

volumes:
kafka:
zookeeper:

正如@cricket_007 所说,我不应该将 debezium 用于我的目的。所以我使用了 confluentinc/kafka-connect-datagen 图像。在这里,我将以下内容添加到 docker-compose.yml 文件中,而不是 debezium:

connect:
image: confluentinc/kafka-connect-datagen
build:
context: .
dockerfile: Dockerfile
hostname: connect
container_name: connect
depends_on:
- zookeeper
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'"
volumes:
- ../build/confluent/kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb

docker 文件:

FROM confluentinc/cp-kafka-connect
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen

问题:

  1. Kafka-connect-datagen 图像生成假数据,正如它在 the repository 中提到的那样,它不适合生产。我想要的只是将 Kafka 连接到 mongodb,不多于它。明确地说,我如何使用 curl 从 kafka 发送数据并将它们保存在 mongodb 集合中?

  2. 我遇到了 CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL is required. 错误。正如@cricket_007 所说,schema-registry 是可选的。那么我怎样才能摆脱那个形象呢?

  3. 在最后一步,我尝试按照 README.md 中的说明运行存储库的 docker-compose 文件,不幸的是我遇到了另一个错误:

    WARNING: Could not reach configured kafka system on http://localhost:8083 Note: This script requires curl.

  4. 每当我没有对配置进行任何更改时,我都会遇到另一个错误:

Kafka Connectors: 

{"error_code":409,"message":"Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)"}

请帮助我找到问题的答案。

我的输出:

Building the MongoDB Kafka Connector

> Task :shadowJar
FatJar: /home/mostafa/Documents/Docker/kafka-mongo/build/libs/kafka-mongo-0.3-SNAPSHOT-all.jar (2.108904 MB)

Deprecated Gradle features were used in this build, making it incompatible with Gradle 6.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/5.2/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 4h 26m 25s
7 actionable tasks: 7 executed
Unzipping the confluent archive plugin....

Archive: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT.zip
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/MongoSinkConnector.properties
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/MongoSourceConnector.properties
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/lib/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/lib/kafka-mongo-0.3-SNAPSHOT-all.jar
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/manifest.json
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/mongodb-leaf.png
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/mongodb-logo.png
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/README.md
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/LICENSE.txt
Starting docker .
Creating volume "docker_rs2" with default driver
Creating volume "docker_rs3" with default driver
Building connect
Step 1/3 : FROM confluentinc/cp-kafka-connect:5.2.2
---> 32bb41f78617
Step 2/3 : ENV CONNECT_PLUGIN_PATH="/usr/share/confluent-hub-components"
---> Using cache
---> 9e4fd4f10a38
Step 3/3 : RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
---> Using cache
---> 5f879008bb73

Successfully built 5f879008bb73
Successfully tagged confluentinc/kafka-connect-datagen:latest
Recreating mongo1 ...
Recreating mongo1 ... done
Creating mongo3 ... done
Starting broker ... done
Creating mongo2 ... done
Starting schema-registry ... done
Starting connect ... done
Creating rest-proxy ... done
Creating ksql-server ... done
Creating docker_kafka-topics-ui_1 ... done
Creating control-center ... done
Creating ksql-cli ... done


Waiting for the systems to be ready.............
WARNING: Could not reach configured kafka system on http://localhost:8082
Note: This script requires curl.



SHUTTING DOWN


% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 68 100 68 0 0 23 0 0:00:02 0:00:02 --:--:-- 23
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 61 100 61 0 0 4066 0 --:--:-- --:--:-- --:--:-- 4066
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 63 100 63 0 0 9000 0 --:--:-- --:--:-- --:--:-- 9000
MongoDB shell version v4.0.12
connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("80ebb904-f81a-4230-b63b-4e62f65fbeb7") }
MongoDB server version: 4.0.12
{
"ok" : 1,
"operationTime" : Timestamp(1567235833, 1),
"$clusterTime" : {
"clusterTime" : Timestamp(1567235833, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
Stopping ksql-cli ... done
Stopping control-center ... done
Stopping docker_kafka-topics-ui_1 ... done
Stopping ksql-server ... done
Stopping rest-proxy ... done
Stopping mongo1 ... done
Stopping mongo2 ... done
Stopping mongo3 ... done
Stopping connect ... done
Stopping broker ... done
Stopping zookeeper ... done
Removing ksql-cli ...
Removing control-center ... done
Removing docker_kafka-topics-ui_1 ... done
Removing ksql-server ... done
Removing rest-proxy ... done
Removing mongo1 ... done
Removing mongo2 ... done
Removing mongo3 ... done
Removing connect ... done
Removing schema-registry ... done
Removing broker ... done
Removing zookeeper ... done
Removing network docker_default
Removing network docker_localnet

WARNING: Could not reach configured kafka system on http://localhost:8082
Note: This script requires curl.

最佳答案

我创建了以下 docker-compose 文件(查看 GitHub 中的所有文件):

version: '3.6'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- localnet
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-enterprise-kafka:5.1.2
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
networks:
- localnet
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

connect:
image: confluentinc/cp-kafka-connect:5.1.2
build:
context: .
dockerfile: Dockerfile
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
ports:
- "8083:8083"
networks:
- localnet
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,com.mongodb.kafka=DEBUG"
CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'"
volumes:
- ./kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb

# MongoDB Replica Set
mongo1:
image: "mongo:4.0-xenial"
container_name: mongo1
command: --replSet rs0 --smallfiles --oplogSize 128
volumes:
- rs1:/data/db
networks:
- localnet
ports:
- "27017:27017"
restart: always
mongo2:
image: "mongo:4.0-xenial"
container_name: mongo2
command: --replSet rs0 --smallfiles --oplogSize 128
volumes:
- rs2:/data/db
networks:
- localnet
ports:
- "27018:27017"
restart: always
mongo3:
image: "mongo:4.0-xenial"
container_name: mongo3
command: --replSet rs0 --smallfiles --oplogSize 128
volumes:
- rs3:/data/db
networks:
- localnet
ports:
- "27019:27017"
restart: always

networks:
localnet:
attachable: true

volumes:
rs1:
rs2:
rs3:

执行 docker-compose up 后,您必须配置您的 MongoDB 集群:

docker-compose exec mongo1 /usr/bin/mongo --eval '''if (rs.status()["ok"] == 0) {
rsconf = {
_id : "rs0",
members: [
{ _id : 0, host : "mongo1:27017", priority: 1.0 },
{ _id : 1, host : "mongo2:27017", priority: 0.5 },
{ _id : 2, host : "mongo3:27017", priority: 0.5 }
]
};
rs.initiate(rsconf);
}
rs.conf();'''

确保您的插件已安装:

curl localhost:8083/connector-plugins | jq

[
{
"class": "com.mongodb.kafka.connect.MongoSinkConnector",
"type": "sink",
"version": "0.2"
},
{
"class": "com.mongodb.kafka.connect.MongoSourceConnector",
"type": "source",
"version": "0.2"
},
{
"class": "io.confluent.connect.gcs.GcsSinkConnector",
"type": "sink",
"version": "5.0.1"
},
{
"class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
"type": "source",
"version": "2.1.1-cp1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.1.1-cp1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.1.1-cp1"
}
]

正如您在上面看到的,MongoDB 连接器插件可供使用。假设您有一个名为 mydb 的数据库和一个名为 products 的集合,我创建了一个名为 sink-connector.json 的 JSON 文件:

{
"name": "mongo-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "product.events",
"connection.uri": "mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
"database": "mydb",
"collection": "products",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}

现在使用连接 RESTful API 创建连接器:

curl -X POST -H "Content-Type: application/json" -d @sink-connector.json http://localhost:8083/connectors | jq

您可以查看连接器的状态:

curl http://localhost:8083/connectors/mongo-sink/status | jq
{
"name": "mongo-sink",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect:8083"
}
],
"type": "sink"
}

现在让我们创建一个 Kafka 主题。首先,我们必须连接到 Kafka 容器:

docker-compose exec broker bash

然后创建主题:

kafka-topics --zookeeper zookeeper:2181 --create --topic product.events --partitions 1 --replication-factor 1

现在生产产品进入主题:

kafka-console-producer --broker-list localhost:9092 --topic product.events
>{"Name": "Hat", "Price": 25}
>{"Name": "Shoe", "Price": 15}

您可以在图像中查看结果: enter image description here

希望对您有所帮助。

关于mongodb - 将 dockerized kafka 接收器连接器实现到 mongo,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57544201/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com