- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
Apache Kafka是一个分布式流处理平台,由LinkedIn开发并开源,后来成为Apache软件基金会的顶级项目。Kafka主要用于构建实时数据管道和流式应用程序.
从下面3张架构图中可以看出Kafka Server 实际扮演的是Broker的角色, 一个Kafka Cluster由多个Broker组成, 或者可以说是多个Topic组成.
图 1 。
图 2 。
图 3 。
一个Kafka集群是一个由多个Kafka代理组成的分布式系统,它们协同工作以处理实时流数据的存储和处理。它为大规模应用程序中高效的数据流和消息传递提供了容错性、可扩展性和高可用性.
Broker是构成Kafka集群的服务器。 每个Broker负责接收、存储和提供数据。 它们处理来自生产者和消费者的读写操作。 Broker还管理数据的复制以确保容错性.
Kafka中的数据被组织成主题(Topics),这些是生产者发送数据和消费者读取数据的逻辑通道。每个主题被划分为分区(partitions),它们是Kafka中并行处理的基本单位。分区允许Kafka通过在多个Broker 之间分布数据来水平扩展.
生产者是发布(写入)数据到Kafka主题的客户端应用程序。它们根据分区策略将记录发送到适当的主题和分区,分区策略可以是基于键(key-based)或轮询(round-robin).
消费者是订阅Kafka主题并处理数据的客户端应用程序。它们从主题中读取记录,并且可以是消费者组的一部分,这允许负载均衡和容错。每个组中的消费者从一组独特的分区中读取数据.
ZooKeeper是一个集中式服务,用于维护配置信息、命名、提供分布式同步和提供群组服务。在Kafka中,ZooKeeper用于管理和协调Kafka Broker。ZooKeeper被展示为与Kafka集群交互的独立组件.
偏移量(offsets)是分配给分区中每条消息的唯一标识符。消费者将使用这些偏移量来跟踪他们在消费主题中消息的进度.
本地docker环境启动一个kafka 。
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.4
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:7.4.4
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
使用.NET CORE + Kafka开发一个消息生产者, 一个消息消费者, 客户端需要安装组件** Confluent.Kafka** 。
public class ProducerService
{
private readonly IConfiguration _configuration;
private readonly IProducer<Null, string> _producer;
private readonly ILogger<ProducerService> _logger;
public ProducerService(IConfiguration configuration, ILogger<ProducerService> logger)
{
_configuration = configuration;
_logger = logger;
var config = new ProducerConfig
{
BootstrapServers = _configuration["Kafka:BootstrapServers"],
};
_producer = new ProducerBuilder<Null, string>(config).Build();
}
public async Task ProductAsync(string topic, string message)
{
var orderPlacedMessage = new Message<Null, string>
{
Value = message
};
await _producer.ProduceAsync(topic, orderPlacedMessage);
_logger.LogInformation("Message sent to topic: {Topic}", topic);
}
}
[Route("api/[controller]")]
[ApiController]
public class InventoryController : ControllerBase
{
private readonly ProducerService _producerService;
public InventoryController(ProducerService producerService)
{
_producerService = producerService;
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] InventoryUpdateRequest request)
{
var message = System.Text.Json.JsonSerializer.Serialize(request);
await _producerService.ProductAsync("inventory-update", message);
return Ok("Inventory Updated Successfully...");
}
}
启动项目,查看Swagger 。
消息消费者程序使用.net core BackgroundService开发, 这个类需要在程序启动时注入进去,不要忘记.
public class ConsumerService : BackgroundService
{
private readonly ILogger<ConsumerService> _logger;
private readonly IConfiguration _configuration;
private readonly IConsumer<Ignore, string> _consumer;
public ConsumerService(ILogger<ConsumerService> logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
var consumerConfig = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
GroupId = "InventoryConsumerGroup",
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("inventory-update");
try
{
while (!stoppingToken.IsCancellationRequested)
{
HandleMessage(stoppingToken);
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Consumer service has been cancelled.");
}
catch (Exception ex)
{
_logger.LogError($"Error in consuming messages: {ex.Message}");
}
finally
{
_consumer.Close();
}
}
public void HandleMessage(CancellationToken cancellation)
{
try
{
var consumeResult = _consumer.Consume(cancellation);
var message = consumeResult.Message.Value;
_logger.LogInformation($"Received inventory update: {message}");
}
catch (Exception ex)
{
_logger.LogError($"Error processing Kafka message: {ex.Message}");
}
}
}
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddHostedService<ConsumerService>();
运行程序 。
Publish Message 。
Consume Message 。
Apache Kafka不是消息中间件的一种实现。相反,它只是一种分布式流式系统。 不同于基于队列和交换器的RabbitMQ,Kafka的存储层是使用分区事务日志来实现的。Kafka也提供流式API用于实时的流处理以及连接器API用来更容易的和各种数据源集成.
最后此篇关于.NETCore+Kafka开发指南的文章就讲到这里了,如果你想了解更多关于.NETCore+Kafka开发指南的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
在一个新的 ASP.NET Core RC2 类库中,我有以下 project.json 文件,我试图在其中遵循 How to trim your package dependencies 上的文档.
我在本地提要上将 nuget 包从 2.2.x 更新到 3.1.0。 然后,在构建机器上尝试构建项目,但是: Project.csproj(0,0): Error NU1102: Unable to
什么是Kafka Apache Kafka是一个分布式流处理平台,由LinkedIn开发并开源,后来成为Apache软件基金会的顶级项目。Kafka主要用于构建实时数据管道和流式应用程序。
快递100的物流信息查询接口,官方提供了一些demo;还好官方提供的代码是.netcore版本写的,不过写的有点low;根据官方提供的代码,我按照.netcore 的风格重构了代码;核心代码如下:
1、docker介绍 docker是用go语言编写基于linux操作系统的一些特性开发的,其提供了操作系统级别的抽象,是一种容器管理技术,它隔离了应用程序对基础架构(操作系统等)的依赖。相较于虚
demo运行在windows的docker中,系统是win10,所以需要先下载docker for windows,安装完毕后系统会重启,然后桌面上可以找到docker for windows的快捷
NetCore WebSocket 即时通讯示例,供大家参考,具体内容如下 1.新建Netcore Web项目 2.创建简易通讯协议 ?
.NET Core:架构、特性和优势详解 在软件开发领域,保持领先地位至关重要。随着技术以指数级的速度发展,开发人员不断寻求高效、可扩展且多功能的解决方案来应对现代挑战。.NET Core 就是这样
需求场景: 我需要部署的项目是在Windows上开发的,目标框架为.net core 6.0 因此我们需要先在kylin上部署项目运行所需要的环境。 借助百
我正在 .NET Core 中重写一个调用外部 Web 服务的控制台应用程序。 我目前收到以下错误: One or more errors occurred. (The HTTP request is
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 6 年前。 Improve th
我有一组库,我想将其从 PCL 转移到 netcore。通过此举,我想简化 DI 系统并更新一些内部工作方式。 我想添加的其中一件事是内部对象的配置,就像在 Asp.Net Core 中一样(即 se
注:本文隶属于《理解ASP.NET Core》系列文章,请查看置顶博客或 点击此处查看全文目录 概述 在微服务化的架构设计中,网关扮演着重要的看门人角色,它所提供的功能之一
对于有多个应用系统的企业来说,每一个应用系统都有自己的用户体系,这就造成用户在切换不同应用系统时,就要多次输入账号密码,导致体验非常不好,也造成使用上非常不便。 针对这个问题,我们就可以采用单
就像 Web Api 接口可以对入参进行验证,避免用户传入非法的或者不符合我们预期的参数一样,选项也可以对配置源的内容进行验证,避免配置中的值与选项类中的属性不对应或者不满足预期,毕竟大部分配置
.NET Core 选项系统的主要实现在 Microsoft.Extensions.Options 和 Microsoft.Extensions.Options.ConfigurationExten
漏洞说明: 跨站脚本攻击(Cross Site Scripting),为了不和层叠样式表(Cascading Style Sheets, CSS)的缩写混淆,故将跨站脚本攻击缩写为XSS。恶意攻击
分布式缓存是由多个应用服务器共享的缓存,通常作为访问它的应用服务器的外部服务进行维护。 分布式缓存可以提高 ASP.NET Core 应用的性能和可伸缩性,尤其是当应用由云服务或服务器场托管时。
一个应用要运行起来,往往需要读取很多的预设好的配置信息,根据约定好的信息或方式执行一定的行为。 配置的本质就是软件运行的参数,在一个软件实现中需要的参数非常多,如果我们以 Hard Code(
2. 配置添加 配置系统可以读取到配置文件中的信息,那必然有某个地方可以将配置文件添加到配置系统中。之前的文章中讲到 ASP.NET Core 入口文件中,builder(WebApplica
我是一名优秀的程序员,十分优秀!