gpt4 book ai didi

Rust 声明先分配后模式

转载 作者:行者123 更新时间:2023-12-03 11:43:53 25 4
gpt4 key购买 nike

我有一个双向 grpc充当 kafka 集群桥梁的流。当流首次初始化时,我将创建 kafka 使用者并开始使用它。
为此,我想到初始化一个空 consumer ,等待第一个输入,然后将创建的消费者分配给一个空的消费者。我试图通过遵循这里的模式来做到这一点。
https://doc.rust-lang.org/rust-by-example/variable_bindings/declare.html
Rust 正在抛出一个 possibly-unitialized variable error ,这是因为它是在异步流中初始化的吗?

use std::pin::Pin;

use futures::{Stream, StreamExt};
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use tonic::transport::Server;
use tonic::{Request, Response, Status};

use bridge::kafka_stream_server::{KafkaStream, KafkaStreamServer};
use bridge::{KafkaResponse, PublishRequest};

pub mod bridge {
tonic::include_proto!("bridge"); // The string specified here must match the proto package name
}

#[derive(Default)]
pub struct KafkaStreamService {}

pub fn create_kafka_consumer(topic: String) -> Consumer {
Consumer::from_hosts(vec!["localhost:9092".to_owned()])
.with_topic(topic.to_owned())
.with_fallback_offset(FetchOffset::Latest)
.with_group("".to_owned())
.with_offset_storage(GroupOffsetStorage::Kafka)
.create()
.unwrap()
}

#[tonic::async_trait]
impl KafkaStream for KafkaStreamService {
type SubscribeStream =
Pin<Box<dyn Stream<Item = Result<KafkaResponse, Status>> + Send + Sync + 'static>>;

async fn subscribe(
&self,
request: Request<tonic::Streaming<PublishRequest>>,
) -> Result<Response<Self::SubscribeStream>, Status> {
println!("Initiated stream!");
let mut stream = request.into_inner();

let mut consumer_created_flag: bool = false;
let consumer: Consumer; //declared here
let output = async_stream::try_stream! {
while let Some(publication) = stream.next().await {
let message = publication?;
let topic = message.topic.clone();
if consumer_created_flag == false {
consumer = create_kafka_consumer(topic); //error occurs here
consumer_created_flag = true;
}
let reply = bridge::KafkaResponse {
content: format!("Hello {}!", "world"),
};
yield reply.clone();
}
};
Ok(Response::new(Box::pin(output) as Self::SubscribeStream))
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();

println!("KafkaService listening on: {}", addr);

let svc = KafkaStreamServer::new(KafkaStreamService::default());

Server::builder().add_service(svc).serve(addr).await?;

Ok(())
}

编辑:按要求出现详细错误:
error[E0381]: use of possibly-uninitialized variable: `consumer`
--> src/server.rs:42:22
|
42 | let output = async_stream::try_stream! {
| ______________________^
43 | | while let Some(publication) = stream.next().await {
44 | | let message = publication?;
45 | | let topic = message.topic.clone();
46 | | if consumer_created_flag == false {
47 | | consumer = create_kafka_consumer(topic);
| | -------- use occurs due to use in generator
... |
54 | | }
55 | | };
| |_________^ use of possibly-uninitialized `consumer`
|
= note: this error originates in a macro (in Nightly builds, run with -Z macro-backtrace for more info)

最佳答案

声明第一个模式仅适用于基本控制流( ifmatch{} 等)。当引用或移动到另一个对象时,它会崩溃,例如 async块或闭包:

fn main() {
let val: i32;
let func = move || {
val = 5;
};
}
error[E0594]: cannot assign to `val`, as it is not declared as mutable
--> src/main.rs:4:9
|
2 | let val: i32;
| --- help: consider changing this to be mutable: `mut val`
3 | let func = move || {
4 | val = 5;
| ^^^^^^^ cannot assign

error[E0381]: use of possibly-uninitialized variable: `val`
--> src/main.rs:3:16
|
3 | let func = move || {
| ^^^^^^^ use of possibly-uninitialized `val`
4 | val = 5;
| --- use occurs due to use in closure
一个潜在的解决方法是将其声明移到 try_stream! 中。宏:
let output = async_stream::try_stream! {
let mut consumer_created_flag: bool = false;
let consumer: Consumer;
while let Some(publication) = stream.next().await {
let message = publication?;
let topic = message.topic.clone();
if consumer_created_flag == false {
consumer = create_kafka_consumer(topic);
consumer_created_flag = true;
}
let reply = KafkaResponse {
content: format!("Hello {}!", "world"),
};
yield reply.clone();
}
};
但是,这会导致一个新错误,因为您可能会为其分配两次(编译器不知道 consumer_created_flag 正在保护它):
error[E0384]: cannot assign twice to immutable variable `consumer`
--> src\lib.rs:1348:21
|
44 | let consumer: Consumer; //declared here
| -------- help: make this binding mutable: `mut consumer`
...
49 | consumer = create_kafka_consumer(topic); //error occurs here
| ^^^^^^^^ cannot assign twice to immutable variable
幸运的是,快速修复是简单地制作 consumer可变的。然后编译器唯一提示的是它没有被使用,但我认为你把它放在那里是有原因的。

关于Rust 声明先分配后模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66265263/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com