gpt4 book ai didi

java - 如何在客户端使用 Java 读取 gRPC 中的元数据

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:13:10 25 4
gpt4 key购买 nike

我正在使用 Java 和 Protoc 3.0 编译器,下面提到了我的原型(prototype)文件。 https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang

syntax = "proto3";

package Telemetry;

// Interface exported by Agent
service OpenConfigTelemetry {
// Request an inline subscription for data at the specified path.
// The device should send telemetry data back on the same
// connection as the subscription request.
rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {}

// Terminates and removes an exisiting telemetry subscription
rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {}

// Get the list of current telemetry subscriptions from the
// target. This command returns a list of existing subscriptions
// not including those that are established via configuration.
rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {}

// Get Telemetry Agent Operational States
rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {}

// Return the set of data encodings supported by the device for
// telemetry data
rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {}
}

// Message sent for a telemetry subscription request
message SubscriptionRequest {
// Data associated with a telemetry subscription
SubscriptionInput input = 1;

// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;

// The below configuration is not defined in Openconfig RPC.
// It is a proposed extension to configure additional
// subscription request features.
SubscriptionAdditionalConfig additional_config = 3;
}

// Data associated with a telemetry subscription
message SubscriptionInput {
// List of optional collector endpoints to send data for
// this subscription.
// If no collector destinations are specified, the collector
// destination is assumed to be the requester on the rpc channel.
repeated Collector collector_list = 1;
}

// Collector endpoints to send data specified as an ip+port combination.
message Collector {
// IP address of collector endpoint
string address = 1;

// Transport protocol port number for the collector destination.
uint32 port = 2;
}

// Data model path
message Path {
// Data model path of interest
// Path specification for elements of OpenConfig data models
string path = 1;

// Regular expression to be used in filtering state leaves
string filter = 2;

// If this is set to true, the target device will only send
// updates to the collector upon a change in data value
bool suppress_unchanged = 3;

// Maximum time in ms the target device may go without sending
// a message to the collector. If this time expires with
// suppress-unchanged set, the target device must send an update
// message regardless if the data values have changed.
uint32 max_silent_interval = 4;

// Time in ms between collection and transmission of the
// specified data to the collector platform. The target device
// will sample the corresponding data (e.g,. a counter) and
// immediately send to the collector destination.
//
// If sample-frequency is set to 0, then the network device
// must emit an update upon every datum change.
uint32 sample_frequency = 5;
}

// Configure subscription request additional features.
message SubscriptionAdditionalConfig {
// limit the number of records sent in the stream
int32 limit_records = 1;

// limit the time the stream remains open
int32 limit_time_seconds = 2;
}

// Reply to inline subscription for data at the specified path is done in
// two-folds.
// 1. Reply data message sent out using out-of-band channel.
// 2. Telemetry data send back on the same connection as the
// subscription request.

// 1. Reply data message sent out using out-of-band channel.
message SubscriptionReply {
// Response message to a telemetry subscription creation or
// get request.
SubscriptionResponse response = 1;

// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;
}

// Response message to a telemetry subscription creation or get request.
message SubscriptionResponse {
// Unique id for the subscription on the device. This is
// generated by the device and returned in a subscription
// request or when listing existing subscriptions
uint32 subscription_id = 1;
}

// 2. Telemetry data send back on the same connection as the
// subscription request.
message OpenConfigData {
// router name:export IP address
string system_id = 1;

// line card / RE (slot number)
uint32 component_id = 2;

// PFE (if applicable)
uint32 sub_component_id = 3;

// Path specification for elements of OpenConfig data models
string path = 4;

// Sequence number, monotonically increasing for each
// system_id, component_id, sub_component_id + path.
uint64 sequence_number = 5;

// timestamp (milliseconds since epoch)
uint64 timestamp = 6;

// List of key-value pairs
repeated KeyValue kv = 7;
}

// Simple Key-value, where value could be one of scalar types
message KeyValue {
// Key
string key = 1;

// One of possible values
oneof value {
double double_value = 5;
int64 int_value = 6;
uint64 uint_value = 7;
sint64 sint_value = 8;
bool bool_value = 9;
string str_value = 10;
bytes bytes_value = 11;
}
}

// Message sent for a telemetry subscription cancellation request
message CancelSubscriptionRequest {
// Subscription identifier as returned by the device when
// subscription was requested
uint32 subscription_id = 1;
}

// Reply to telemetry subscription cancellation request
message CancelSubscriptionReply {
// Return code
ReturnCode code = 1;

// Return code string
string code_str = 2;
};

// Result of the operation
enum ReturnCode {
SUCCESS = 0;
NO_SUBSCRIPTION_ENTRY = 1;
UNKNOWN_ERROR = 2;
}

// Message sent for a telemetry get request
message GetSubscriptionsRequest {
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers
uint32 subscription_id = 1;
}

// Reply to telemetry subscription get request
message GetSubscriptionsReply {
// List of current telemetry subscriptions
repeated SubscriptionReply subscription_list = 1;
}

// Message sent for telemetry agent operational states request
message GetOperationalStateRequest {
// Per-subscription_id level operational state can be requested.
//
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers including agent-level
// operational stats
// --- or ---
// If subscription_id is not present then sent only agent-level
// operational stats
uint32 subscription_id = 1;

// Control verbosity of the output
VerbosityLevel verbosity = 2;
}

// Verbosity Level
enum VerbosityLevel {
DETAIL = 0;
TERSE = 1;
BRIEF = 2;
}

// Reply to telemetry agent operational states request
message GetOperationalStateReply {
// List of key-value pairs where
// key = operational state definition
// value = operational state value
repeated KeyValue kv = 1;
}

// Message sent for a data encoding request
message DataEncodingRequest {
}

// Reply to data encodings supported request
message DataEncodingReply {
repeated EncodingType encoding_list = 1;
}

// Encoding Type Supported
enum EncodingType {
UNDEFINED = 0;
XML = 1;
JSON_IETF = 2;
PROTO3 = 3;
}

为了执行服务调用 (rpc TelemetrySubscribe),我首先需要读取具有订阅 ID 的 header ,然后开始读取消息。现在,使用 Java 我能够连接到该服务,我确实引入了拦截器,但是当我打印/检索 header 时它是空的。我调用拦截器的代码如下,

 ClientInterceptor interceptor = new HeaderClientInterceptor();
originChannel = OkHttpChannelBuilder.forAddress(host, port)
.usePlaintext(true)
.build();
Channel channel = ClientInterceptors.intercept(originChannel, interceptor);
telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);

这是读取元数据的拦截器代码。

  @Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {

super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {

Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER);

System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));

想知道有没有其他方法可以读取其中包含订阅 ID 的元数据或第一条消息?我只需要读取具有订阅 ID 的第一条消息,并将相同的订阅 ID 返回给服务器,以便流式传输可以开始我有使用相同 proto 文件的等效 Python 代码,它通过下面提到的代码与服务器通信,仅供引用:

     sub_req = SubscribeRequestMsg("host",port)
data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS)
metadata = data_itr.initial_metadata()

if metadata[0][0] == "responseKey":
metainfo = metadata[0][1]
print metainfo

subreply = agent_pb2.SubscriptionReply()
subreply.SetInParent()
google.protobuf.text_format.Merge(metainfo, subreply)

if subreply.response.subscription_id:
SUB_ID = subreply.response.subscription_id

从上面的 python 代码我可以轻松检索元数据对象,不确定如何使用 Java 检索相同的对象?

阅读元数据后,我得到的是:Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]} )

但是我知道从元数据到它还有一行,就是

response {
subscription_id: 2
}

我如何从其中包含订阅 ID 的 header 中提取最后的响应。我确实尝试了很多选择,但我在这里迷路了。

最佳答案

您使用的方法是用于请求元数据,而不是响应元数据:

public void start(Listener<RespT> responseListener, Metadata headers) {

对于响应元数据,您将需要一个 ClientCall.Listener 并等待 onHeaders 回调:

public void onHeaders(Metadata headers)

我确实觉得您提到的元数据的用法似乎很奇怪。元数据通常用于附加错误详细信息或不特定于 RPC 方法的横切功能(如身份验证、跟踪等)。

关于java - 如何在客户端使用 Java 读取 gRPC 中的元数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43479217/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com