- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
目前我们在构建一个多租户多产品类网站,为了让用户更好的找到他们所需要的产品,我们需要构建站内搜索功能,并且它应该是实时更新的。本文将会讨论构建这一功能的核心基础设施,以及支持此搜索能力的技术栈.
为了构建一个快速、实时的搜索引擎,我们必须做出某些设计决策。我们使用 MySQL 作为主数据库存储,因此有以下选择:
%#{word1}%#{word2}%...
这样。 😐 考虑到我们是一个 多租户应用程序 ,同时被搜索的实体可能需要大量的 关联操作 (如果我们使用的是 MySQL 一类的关系型数据库),因为不同类型的产品有不同的数据结构,所以我们还可以能需要同时遍历 多个数据表 来查询用户输入的关键词。所以我们决定不使用直接在 MySQL 中查询关键词的方案。🤯 。
因此,我们必须决定一种高效、可靠的方式,将数据 实时 地从 MySQL 迁移到 Elasticsearch 中。接下来需要做出如下的决定:
选项 1 并不是实时的,所以可以直接排除,而且即使我们缩短轮询间隔,也会造成全表扫描给数据库造成查询压力。除了不是实时的之外,选项 1 无法支持对数据的删除操作,如果对数据进行了删除,那么我们需要额外的表记录之前存在过的数据,这样才能保证用户不会搜索到已经删除了的脏数据。对于其他两种选择,不同的应用场景做出的决定可能会有所不同。在我们的场景中,如果选择选项 2,那么我们可以预见一些问题:如过 Elasticsearch 建立网络连接并确认更新时速度很慢,那么这可能会降低我们应用程序的速度;或者在写入 Elasticsearch 时发生了未知异常,我们该如何对这一操作进行重试来保证数据完整性;不可否认开发团队中不是所有开发人员都能了解所有的功能,如果有开发人员在开发新的与产品有关的业务逻辑时没有引入 Elasticsearch 客户端,那么我们将在 Elasticsearch 中更新这次数据的更改,无法保证 MySQL 与 Elasticsearch 间的数据一致性.
接下来我们该考虑如何将 MySQL 数据库中的数据更改作为事件,发送到流处理服务器上。我们可以在数据库变更后,在应用程序中使用消息管道的客户端同步地将事件发送到消息管道,但是这并没有解决上面提到的使用 Elasticsearch 客户端带来的问题,只不过是将风险从 Elasticsearch 转移到了消息管道。最终我们决定通过采集 MySQL Binlog,将 MySQL Binlog 作为事件发送到消息管道中的方式来实现 基于事件的流引擎 。关于 binlog 的内容可以 点击链接 ,在这里不再赘述.
为了对外提供统一的搜索接口,我们首先需要定义用于搜索的数据结构。对于大部分的搜索系统而言,对用户展示的搜索结果通常包括为 标题 和 内容 ,这部分内容我们称之 可搜索内容(Searchable Content) 。在多租户系统中我们还需要在搜索结果中标示出该搜索结果属于哪个租户,或用来过滤当前租户下可搜索的内容,我们还需要额外的信息来帮助用户筛选自己想要搜索的产品类别,我们将这部分通用的但不用来进行搜索的内容称为 元数据(Metadata) 。最后,在我们展示搜索结果时可能希望根据不同类型的产品提供不同的展示效果,我们需要在搜索结果中返回这些个性化展示所需要的 原始内容(Raw Content) 。到此为止我们可以定义出了存储到 Elasticsearch 中的通用数据结构:
{
"searchable": {
"title": "string",
"content": "string"
},
"metadata": {
"tenant_id": "long",
"type": "long",
"created_at": "date",
"created_by": "string",
"updated_at": "date",
"updated_by": "string"
},
"raw": {}
}
Apache Kafka: Apache Kafka 是开源的分布式事件流平台。我们使用 Apache kafka 作为数据库事件(插入、修改和删除)的持久化存储.
mysql-binlog-connector-java: 我们使用 mysql-binlog-connector-java 从 MySQL Binlog 中获取数据库事件,并将它发送到 Apache Kafka 中。我们将单独启动一个服务来完成这个过程.
在接收端我们也将单独启动一个服务来消费 Kafka 中的事件,并对数据进行处理然后发送到 Elasticsearch 中.
Q:为什么不使用Elasticsearch connector之类的连接器对数据进行处理并发送到Elasticsearch中?
A:在我们的系统中是不允许将大文本存入到MySQL中的,所以我们使用了额外的对象存储服务来存放我们的产品文档,所以我们无法直接使用连接器将数据发送到Elasticsearch中。
Q:为什么不在发送到Kafka前就将数据进行处理?
A:这样会有大量的数据被持久化到Kafka中,占用Kafka的磁盘空间,而这部分数据实际上也被存储到了Elasticsearch。
Q:为什么要用单独的服务来采集binlog,而不是使用Filebeat之类的agent?
A:当然可以直接在MySQL数据库中安装agent来直接采集binlog并发送到Kafka中。但是在部分情况下开发者使用的是云服务商或其他基础设施部门提供的MySQL服务器,这种情况下我们无法直接进入服务器安装agent,所以使用更加通用的、无侵入性的C/S结构来消费MySQL的binlog。
我们使用 docker 和 docker-compose 来配置和部署服务。为了简单起见,MySQL 直接使用了 root 作为用户名和密码,Kafka 和 Elasticsearch 使用的是单节点集群,且没有设置任何鉴权方式,仅供开发环境使用,请勿直接用于生产环境.
version: "3"
services:
mysql:
image: mysql:5.7
container_name: mysql
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: app
ports:
- 3306:3306
volumes:
- mysql:/var/lib/mysql
zookeeper:
image: bitnami/zookeeper:3.6.2
container_name: zookeeper
ports:
- 2181:2181
volumes:
- zookeeper:/bitnami
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:2.7.0
container_name: kafka
ports:
- 9092:9092
volumes:
- kafka:/bitnami
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
container_name: elasticsearch
environment:
- discovery.type=single-node
volumes:
- elasticsearch:/usr/share/elasticsearch/data
ports:
- 9200:9200
volumes:
mysql:
driver: local
zookeeper:
driver: local
kafka:
driver: local
elasticsearch:
driver: local
在服务启动成功后我们需要为 Elasticsearch 创建索引,在这里我们直接使用 curl 调用 Elasticsearch 的 RESTful API,也可以使用 busybox 基础镜像创建服务来完成这个步骤.
# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '
{
"mappings": {
"properties": {
"searchable": {
"type": "nested",
"properties": {
"title": {
"type": "text"
},
"content": {
"type": "text"
}
}
},
"metadata": {
"type": "nested",
"properties": {
"tenant_id": {
"type": "long"
},
"type": {
"type": "integer"
},
"created_at": {
"type": "date"
},
"created_by": {
"type": "keyword"
},
"updated_at": {
"type": "date"
},
"updated_by": {
"type": "keyword"
}
}
},
"raw": {
"type": "nested"
}
}
}
}'
Binlog 采集端:
override fun run() {
client.serverId = properties.serverId
val eventDeserializer = EventDeserializer()
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
)
client.setEventDeserializer(eventDeserializer)
client.registerEventListener {
val header = it.getHeader<EventHeader>()
val data = it.getData<EventData>()
if (header.eventType == EventType.TABLE_MAP) {
tableRepository.updateTable(Table.of(data as TableMapEventData))
} else if (EventType.isRowMutation(header.eventType)) {
val events = when {
EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
else -> emptyList()
}
logger.info("Mutation events: {}", events)
for (event in events) {
kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
}
}
}
client.connect()
}
在这段代码里面,我们首先是对 binlog 客户端进行了初始化,随后开始监听 binlog 事件。binlog 事件类型有很多,大部分都是我们不需要关心的事件,我们只需要关注 TABLE_MAP 和 WRITE/UPDATE/DELETE 就可以。当我们接收到 TABLE_MAP 事件,我们会对内存中的数据库表结构进行更新,在后续的 WRITE/UPDATE/DELETE 事件中,我们会使用内存缓存的数据库结构进行映射。整个过程大概如下所示:
Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
"id": 1,
"title": "Foo",
"content": "Bar"
}
随后我们将收集到的事件发送到 Kafka 中,并由 Event Processor 进行消费处理.
事件处理器 。
@Component
class KafkaBinlogTopicListener(
val binlogEventHandler: BinlogEventHandler
) {
companion object {
private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
}
private val objectMapper = jacksonObjectMapper()
@KafkaListener(topics = ["binlog"])
fun process(message: String) {
val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
logger.info("Consume binlog event: {}", binlogEvent)
binlogEventHandler.handle(binlogEvent)
}
}
首先使用 SpringBoot Message Kafka 提供的注解对事件进行消费,接下来将事件委托到 binlogEventHandler 去进行处理。实际上 BinlogEventHandler 是个自定义的函数式接口,我们自定义事件处理器实现该接口后通过 Spring Bean 的方式注入到 KafkaBinlogTopicListener 中.
@Component
class ElasticsearchIndexerBinlogEventHandler(
val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler {
override fun handle(binlogEvent: BinlogEvent) {
val payload = binlogEvent.payload as Map<*, *>
val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
// Should delete from Elasticsearch
if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
val deleteRequest = DeleteRequest()
deleteRequest
.index("search")
.id(documentId)
restHighLevelClient.delete(deleteRequest, DEFAULT)
} else {
// Not ever WRITE or UPDATE, just reindex
val indexRequest = IndexRequest()
indexRequest
.index("search")
.id(documentId)
.source(
mapOf<String, Any>(
"searchable" to mapOf(
"title" to payload["title"],
"content" to payload["content"]
),
"metadata" to mapOf(
"tenantId" to payload["tenantId"],
"type" to payload["type"],
"createdAt" to payload["createdAt"],
"createdBy" to payload["createdBy"],
"updatedAt" to payload["updatedAt"],
"updatedBy" to payload["updatedBy"]
)
)
)
restHighLevelClient.index(indexRequest, DEFAULT)
}
}
}
在这里我们只需要简单地判断是否为删除操作就可以,如果是删除操作需要在 Elasticsearch 中将数据删除,而如果是非删除操作只需要在 Elasticsearch 重新按照为文档建立索引即可。这段代码简单地使用了 Kotlin 中提供的 mapOf 方法对数据进行映射,如果需要其他复杂的处理只需要按照 Java 代码的方式编写处理器即可.
其实 Binlog 的处理部分有很多开源的处理引擎,包括 Alibaba Canal ,本文使用手动处理的方式也是为其他使用非 MySQL 数据源的同学类似的解决方案。大家可以按需所取,因地制宜,为自己的网站设计属于自己的实时站内搜索引擎! 。
最后此篇关于基于Kafka和Elasticsearch构建实时站内搜索功能的实践的文章就讲到这里了,如果你想了解更多关于基于Kafka和Elasticsearch构建实时站内搜索功能的实践的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我在使用 gradle 构建一个特定应用程序时遇到问题。该应用程序可以用 eclipse 编译和构建,它在平板电脑上运行良好。当我尝试使用 Gradle 构建它时,“compileDebugJava”
我有一个 C 程序,是一位离开的开发人员留给我的。我试图弄清楚他到底在做什么,并将软件重新安排成更合乎逻辑的东西,这样我就可以更轻松地构建它。我正在使用 CMake 构建,而他使用的是 Make。 有
我刚开始阅读“Pro Spring MVC with web flow”,它附带了一个我想遵循的代码示例。 我要什么 - 我想像书中那样构建应用程序,使用 Gradle 有什么问题 - 我没用过 Gr
我希望有人已经这样做了。我正在尝试为我的一个 angular 2 项目在 teamcity 中建立一个连续的构建。在做了一些研究之后,我按照以下步骤操作: 构建步骤 1:为 teamcity 安装 j
我有一个旧的 ASP.Net 网站解决方案,看起来像: 当我在 Visual Studio 中构建解决方案时,我得到以下输出: ------ Build started: Project: C:\..
我使用 gulp-usref、gulp-if、gulp-uglify、gulp-csso 和 gulp-file-include 来构建我的应用程序。除了 HTML 保持原样外,构建中的一切都运行良好
我正在使用 ionic2 开发内部移动应用程序。我可以通过以下方式成功构建 ios: ionic build ios and ionic build ios --prod 但当我这样做时,它一直失败
我是一位经验丰富的 .NET/C# 开发人员,但对这里的几乎所有技术/库(包括 SQL/DB 工作)都是新手。 我正在开发一个具有 Azure/Entity Framework .NET 后端和可移植
我正在使用 VS 2008。我可以使用 IDE 成功编译我的解决方案。但是,当我尝试使用 devenv.com 构建它时,它失败并提示“错误:找不到项目输出组'(无法确定名称)的输出”。该组、其配置或
版本: ember.js 2.7,ember-data 2.7 ember-cli 2.9.1//同样适用于 ember-cli 2.7 node 6.9.1, npm 3.10.9//也适用于 no
我第一次修补 AzureDevops,设置一些 CI 任务。 我有一个公共(public)存储库(开源)和一个包含 3 个 F# 项目的解决方案(.sln)。该解决方案在 Windows/Mac/Li
目前 5.1.5 版本或 STLPort CVS 存储库似乎仍不支持 VS2008。如果有人已经完成了这项工作,那么如果可能的话,分享会很有用:) 同样,了解 VS2005 或 2008 x64 构建
我有一个 Python 2.7 项目,到目前为止一直使用 gfortran 和 MinGW 来构建扩展。我使用 MinGW,因为它似乎支持 Fortran 代码中的写入语句和可分配数组,而 MSVC
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题? Update the question所以它是on-topic对于堆栈溢出。 9年前关闭。 Improve this que
我想知道为什么在 Zimbra Wiki 中只列出了构建过程的特定平台。这意味着不可能在其他 Linux 发行版上构建 Zimbra? Zimbra 社区选择一个特殊的 Linux 发行版来构建 Zi
我将在 Swift 中构建一个 CLI 工具。我用这个命令创建了项目 swift package init --type executable当我构建我的项目并解析 时读取别名 Xcode 中的参数并
我想为添加到 docker 镜像的文件设置文件权限。我有这个简单的 Dockerfile: FROM ubuntu:utopic WORKDIR /app RUN groupadd -g 1000 b
当我使用 clBuildProgram在我的 OpenCl 代码中,它失败并显示错误代码 -11,没有任何日志信息。 这是我的代码的样子: ret = clBuildProgram(program
我有一个底部导航栏,它有一个列表页面,该页面使用状态块。 class _MainPageState extends State { int _index = 0; @override Wi
我在本地计算机上使用Jenkins(Jenkins URL未通过Internet公开,但该计算机上已启用Internet。) 我进行了以下配置更改: 在Jenkins工具上安装了Git和Github插
我是一名优秀的程序员,十分优秀!