- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
。
先简单说下常见的rocketMq的部署方式,上图中broker为真正计算和存储消息的地方,而nameServer负责维护broker地 。
。
图中右侧consume message部分即是本文重点描述的部分,主要分为ConsumerGroup和Consumer,consumerGroup可以参考 https://rocketmq.apache.org/docs/domainModel/07consumergroup/ 。简单的说,Consumer即是一个运行的应用,ComsumerGroup即为多个运行的应用组,而其中一个Consumer是如何启动并接受消息进行消费的呢?
以常见的java应用搭配spring为例,通常来说是在应用启动并实例化rocketmq sdk的相关类之后,调用相关类的初始化方法,获取nameServer地址和broker地址,启动netty客户端,并通过netty客户端向broker拉取消息,然后提交到消息消费服务中进行批量消费.
。
https://rocketmq.apache.org/docs/quickStart/01quickstart 。
上面是一个普通消息消费的demo,可以看到启动mq的消费代码主要分为设置cid、topic、tag和messageLister.
。
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start 。
这里会现有个switch方法判断当前mq客户端状态然后执行不同的启动逻辑。主要是未启动的话,执行启动客户端的逻辑;若已启动或启动失败的话,则抛出异常。然后是更新topic信息、发送心跳、负载均衡等。展开switch代码块可以看到如下代码,同时代码里serviceState默认为CREATE_JUST,所以在初次启动时会进入CREATE_JUST的流程.
public synchronized void start() throws MQClientException { switch ( this .serviceState) { // 初始化流程 case CREATE_JUST: log.info( "the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this .defaultMQPushConsumer.getConsumerGroup(), this .defaultMQPushConsumer.getMessageModel(), this .defaultMQPushConsumer.isUnitMode()); this .serviceState = ServiceState.START_FAILED; // 校验设置consumerGroup的配置 this .checkConfig(); // 解析订阅的topic和tag等数据,构建订阅关系模型放到rebalanceImpl里 this .copySubscription(); if ( this .defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this .defaultMQPushConsumer.changeInstanceNameToPID(); } // 初始化MQClientInstance、rebalaceImpl等 this .mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance( this .defaultMQPushConsumer, this .rpcHook); this .rebalanceImpl.setConsumerGroup( this .defaultMQPushConsumer.getConsumerGroup()); this .rebalanceImpl.setMessageModel( this .defaultMQPushConsumer.getMessageModel()); this .rebalanceImpl.setAllocateMessageQueueStrategy( this .defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this .rebalanceImpl.setmQClientFactory( this .mQClientFactory); if ( this .pullAPIWrapper == null ) { this .pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); } this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); // 初始化消费进度 // 集群模式的话,消费进度保存在服务端broker;广播模式,保存在客户端consumer上 if ( this .defaultMQPushConsumer.getOffsetStore() != null ) { this .offsetStore = this .defaultMQPushConsumer.getOffsetStore(); } else { switch ( this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this .offsetStore = new LocalFileOffsetStore( this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; case CLUSTERING: this .offsetStore = new RemoteBrokerOffsetStore( this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; default : break ; } this .defaultMQPushConsumer.setOffsetStore( this .offsetStore); } this .offsetStore.load(); // 根据是否顺序or并发消费,来创建不同的消费线程服务 // 本次主要关注并发消费, if ( this .getMessageListenerInner() instanceof MessageListenerOrderly) { this .consumeOrderly = true ; this .consumeMessageService = new ConsumeMessageOrderlyService( this , (MessageListenerOrderly) this .getMessageListenerInner()); // POPTODO reuse Executor ? this .consumeMessagePopService = new ConsumeMessagePopOrderlyService( this , (MessageListenerOrderly) this .getMessageListenerInner()); } else if ( this .getMessageListenerInner() instanceof MessageListenerConcurrently) { this .consumeOrderly = false ; this .consumeMessageService = new ConsumeMessageConcurrentlyService( this , (MessageListenerConcurrently) this .getMessageListenerInner()); // POPTODO reuse Executor ? this .consumeMessagePopService = new ConsumeMessagePopConcurrentlyService( this , (MessageListenerConcurrently) this .getMessageListenerInner()); } this .consumeMessageService.start(); // POPTODO this .consumeMessagePopService.start(); // 向mQClientFactory注册当前DefaultMQPushConsumerImpl // mQClientFactory里维护了一个consumerGroup和DefaultMQPushConsumerImpl的映射表 boolean registerOK = mQClientFactory.registerConsumer( this .defaultMQPushConsumer.getConsumerGroup(), this ); if (! registerOK) { this .serviceState = ServiceState.CREATE_JUST; this .consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException("The consumer group[" + this .defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null ); } // 核心启动方法 // 这块是重点的启动流程,稍后展开说明 mQClientFactory.start(); log.info( "the consumer [{}] start OK.", this .defaultMQPushConsumer.getConsumerGroup()); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this .serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null ); default : break ; } this .updateTopicSubscribeInfoWhenSubscriptionChanged(); this .mQClientFactory.checkClientInBroker(); this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this .mQClientFactory.rebalanceImmediately(); }
。
。
可以看到org.apache.rocketmq.client.impl.factory.MQClientInstance#start方法就是启动consumer的主要流程了 主要分了如下几步,下面会将下面的几步逐个展开说明:
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server // 这里拿到nameServer地址list if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel // 开启客户端和mq服务端的连接通道 this.mQClientAPIImpl.start(); // Start various schedule tasks // 启动定时任务 // 主要是定时2分钟调用上面fetchNameServerAddr方法,拉取最新的nameServerAddr this.startScheduledTask(); // Start pull service // 启动拉取消息的服务 this.pullMessageService.start(); // Start rebalance service // 启动负载均衡服务 this.rebalanceService.start(); // Start push service // 启动消息producer的推送服务,这块不展开了 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); // 以上操作都成功的话,将服务置为running this.serviceState = ServiceState.RUNNING; break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
首先进入到org.apache.rocketmq.client.impl.MQClientAPIImpl#fetchNameServerAddr可以看下面方法,可以看到主要就是调用topAddressing.fetchNSAddr方法 。
public String fetchNameServerAddr() { try { // 主要是执行该方法 String addrs = this.topAddressing.fetchNSAddr(); if (!UtilAll.isBlank(addrs)) { if (!addrs.equals(this.nameSrvAddr)) { log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs); this.updateNameServerAddressList(addrs); this.nameSrvAddr = addrs; return nameSrvAddr; } } } catch (Exception e) { log.error("fetchNameServerAddr Exception", e); } return nameSrvAddr; }
然后点到topAddressing.fetchNSAddr里,这块提供的spi拓展获取nameServerAddr,否则的话,走默认方法fetchNSAddr()获取nameServerAddr.
public final String fetchNSAddr() { // 看了下这块应该是提供了spi拓展 // 使用方可以通过spi的方式显示实现TopAddressing.fetchNSAddr方法获取nameServerAddr if (!topAddressingList.isEmpty()) { for (TopAddressing topAddressing : topAddressingList) { String nsAddress = topAddressing.fetchNSAddr(); if (!Strings.isNullOrEmpty(nsAddress)) { return nsAddress; } } } // Return result of default implementation // 没有的话,走默认实现 return fetchNSAddr(true, 3000); }
下面是默认的获取nameServerAddr的方法,主要是通过拿到wsAddr作为url,拼接para参数,发http请求获取nameServerAddr.
public final String fetchNSAddr(boolean verbose, long timeoutMills) { // 拿到wsAddr作为url,拼接para参数 String url = this.wsAddr; try { if (null != para && para.size() > 0) { if (!UtilAll.isBlank(this.unitName)) { url = url + "-" + this.unitName + "?nofix=1&"; } else { url = url + "?"; } for (Map.Entry<String, String> entry : this.para.entrySet()) { url += entry.getKey() + "=" + entry.getValue() + "&"; } url = url.substring(0, url.length() - 1); } else { if (!UtilAll.isBlank(this.unitName)) { url = url + "-" + this.unitName + "?nofix=1"; } } // 发送http请求,拿到地址 HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills); if (200 == result.code) { String responseStr = result.content; if (responseStr != null) { return clearNewLine(responseStr); } else { LOGGER.error("fetch nameserver address is null"); } } else { LOGGER.error("fetch nameserver address failed. statusCode=" + result.code); } } catch (IOException e) { if (verbose) { LOGGER.error("fetch name server address exception", e); } } if (verbose) { String errorMsg = "connect to " + url + " failed, maybe the domain name " + MixAll.getWSAddr() + " not bind in /etc/hosts"; errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL); LOGGER.warn(errorMsg); } return null; } // 默认的wsAddr获取方式,默认值为 http://jmenv.tbsite.net:8080/rocketmq/nsaddr public static String getWSAddr() { String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP); String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup; if (wsDomainName.indexOf(":") > 0) { wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup; } return wsAddr; }
在本地跑起来环境的话,可以看到返回值是一个ip:port的列表 。
org.apache.rocketmq.remoting.netty.NettyRemotingClient#start 这块主要分为三个部分:
public void start() { // 这里标准的编写netty客户端代码 // 主要是添加了NettyClientHandler // 但是此时bootstarp还没有绑定远程服务器host和端口 if (this.defaultEventExecutorGroup == null) { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactoryImpl("NettyClientWorkerThread_")); } Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); LOGGER.info("Prepend SSL handler"); } else { LOGGER.warn("Connections are insecure as SSLContext is null!"); } } ch.pipeline().addLast( nettyClientConfig.isDisableNettyWorkerGroup() ? null : defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyClientHandler()); } }); if (nettyClientConfig.getClientSocketSndBufSize() > 0) { LOGGER.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize()); handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()); } if (nettyClientConfig.getClientSocketRcvBufSize() > 0) { LOGGER.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize()); handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); } if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) { LOGGER.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}", nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()); handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark())); } if (nettyClientConfig.isClientPooledByteBufAllocatorEnable()) { handler.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } TimerTask timerTaskScanResponseTable = new TimerTask() { @Override public void run(Timeout timeout) { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { LOGGER.error("scanResponseTable exception", e); } finally { timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS); } } }; this.timer.newTimeout(timerTaskScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS); int connectTimeoutMillis = this.nettyClientConfig.getConnectTimeoutMillis(); TimerTask timerTaskScanAvailableNameSrv = new TimerTask() { @Override public void run(Timeout timeout) { try { NettyRemotingClient.this.scanAvailableNameSrv(); } catch (Exception e) { LOGGER.error("scanAvailableNameSrv exception", e); } finally { timer.newTimeout(this, connectTimeoutMillis, TimeUnit.MILLISECONDS); } } }; this.timer.newTimeout(timerTaskScanAvailableNameSrv, 0, TimeUnit.MILLISECONDS); }
org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask 。
主要做启动一些客户端的定时操作,这里不展开说明 。
private void startScheduledTask() { // 还是调用上面的获取nameServer的方法,定时取nameServer地址列表 if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } // 定时更新topic路由信息 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); // 定时发送信息 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); // 定时持久化消费位点 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); // 定时调整线程池 this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } }, 1, 1, TimeUnit.MINUTES); }
。
这一部分主要是mq客户端向broker拉取消息,然后解码、过滤后分批交给消费服务进行消费 。
this.pullMessageService.start()会走到如下方法,org.apache.rocketmq.common.ServiceThread#start 。
public void start() { log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); if (!started.compareAndSet(false, true)) { return; } // 设置stop标记我false,同时新起一个线程执行当前类的run方法 stopped = false; this.thread = new Thread(this, getServiceName()); this.thread.setDaemon(isDaemon); this.thread.start(); log.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread); }
pullMessageService实现了Runnable接口,会单独起一个线程并调用如下run方法 。
org.apache.rocketmq.client.impl.consumer.PullMessageService#run 。
@Override public void run() { logger.info(this.getServiceName() + " service started"); // 上文设置了stopped为false,会到while里 while (!this.isStopped()) { try { // 从messageRequestQueue里取一个MessageRequest执行。若队列为空则堵塞 MessageRequest messageRequest = this.messageRequestQueue.take(); if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) { this.popMessage((PopRequest) messageRequest); } else { this.pullMessage((PullRequest) messageRequest); } } catch (InterruptedException ignored) { } catch (Exception e) { logger.error("Pull Message Service Run Method exception", e); } } logger.info(this.getServiceName() + " service end"); }
以pullMessage方式为例,进入如下方法 。
org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage 。
private void pullMessage(final PullRequest pullRequest) { // 根据PullRequest的消费组名从map中获取一个MQConsumerInner final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { // consumer不为空执行拉消息方法 DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { logger.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
下面展开说明DefaultMQPushConsumerImpl#pullMessage方法 。
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 。
public void pullMessage(final PullRequest pullRequest) { // 获取处理队列 final ProcessQueue processQueue = pullRequest.getProcessQueue(); // droped为true直接返回 if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } // 设置最近的拉取时间戳 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); // 如果状态不是running的话,则将pullRequest延迟3秒放进messageRequestQueue,并返回 // executePullRequestLater方法操作即是延迟pullTimeDelayMillsWhenException时间将pullRequest放进messageRequestQueue尾部 // executePullRequestLater方法通过上述操作操作,来打到随后执行pullRequest目的 try { this.makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return; } // pause为true的话,延迟1s将pullRequest放进messageRequestQueue,并返回 if (this.isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; } // 下面这部分为消息拉取的流控代码 long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); // 当前处理队列消息超过1000时,延迟50ms执行该pullRequest;该操作超过1000次,打印日志 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } // 当前处理队列消息占用空间超过100mb时,延迟50ms后执行该pullRequest if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } // 非顺序消息的话 if (!this.consumeOrderly) { // 如果处理队列里的的最大消息和最小消息的queueSize差值大于2000的话,流控延后处理 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } // 下面为顺序消息,本次先不分析 } else { if (processQueue.isLocked()) { if (!pullRequest.isPreviouslyLocked()) { long offset = -1L; try { offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); if (offset < 0) { throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset); } } catch (Exception e) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); return; } boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setPreviouslyLocked(true); pullRequest.setNextOffset(offset); } } else { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}", pullRequest); return; } } // 拿到该消息topic的订阅信息,topic tag 表达式等数据 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}", pullRequest); return; } final long beginTimestamp = System.currentTimeMillis(); // 下面是拉取消息的回调函数,等下展开说明 PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL); } else { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } } }; boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } } // 拿到订阅表达式 类过滤标识等 String subExpression = null; boolean classFilter = false; SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null) { if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } // 构建标记 int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter ); try { // 执行真正拉取消息的方法,并传输回调函数pullCallBack this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), this.defaultMQPushConsumer.getPullBatchSizeInBytes(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
。
这一部分主要是通过netty客户端向broker拉取消息 。
如下方法 。
org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl(org.apache.rocketmq.common.message.MessageQueue, java.lang.String, java.lang.String, long, long, int, int, int, long, long, long.
org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.consumer.PullCallback) public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int maxSizeInBytes, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 根据messageQueue拿到broker信息,包括broker地址、是否slave、版本等 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), this.recalculatePullFromWhichNode(mq), false); // 拿不到的话,从nameServer更新topic信息后,重新拿 if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), this.recalculatePullFromWhichNode(mq), false); } if (findBrokerResult != null) { { // check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); } } int sysFlagInner = sysFlag; if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); } // 组装请求头 PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setMaxMsgBytes(maxSizeInBytes); requestHeader.setExpressionType(expressionType); requestHeader.setBname(mq.getBrokerName()); // 拿到broker地址 String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr); } // 通过mq客户端拉取消息 PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult; } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage
public PullResult pullMessage( final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request; // 这个先不纠结,根据不同的code组装不同的RemotingCommand if (PullSysFlag.hasLitePullFlag(requestHeader.getSysFlag())) { request = RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE, requestHeader); } else { request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); } // 上文的communicationMode字段传入的是ASYNC,所以这里走的ASYNC模式 switch (communicationMode) { case ONEWAY: assert false; return null; case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); return null; case SYNC: return this.pullMessageSync(addr, request, timeoutMillis); default: assert false; break; } return null; }
进入org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
可以看到remotingClient为netty客户端,通过netty想broker获取消息, 。
private void pullMessageAsync( final String addr, final RemotingCommand request, final long timeoutMillis, final PullCallback pullCallback ) throws RemotingException, InterruptedException { // 通过netty客户端想broker获取消息,具体不在赘述 // 这里传入InvokeCallback,在拉消息完成后执行InvokeCallback this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override // 拿到消息后 public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { // 处理response解析相关请求头和body放到pullResult里 PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr); assert pullResult != null; // 拉取成功的话执行success回调函数,处理pullResult pullCallback.onSuccess(pullResult); } catch (Exception e) { pullCallback.onException(e); } } else { if (!responseFuture.isSendRequestOK()) { pullCallback.onException(new MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); } else if (responseFuture.isTimeout()) { pullCallback.onException(new MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause())); } else { pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); } } } }); }
这一部分主要是在拉取消息成功后,执行回调函数解码、过滤消息列表,然后提交消费服务分批消费 。
重点看拉取状态为FOUND 的情况, 。
。
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { // 这里解析pullResult,将pullResult的messageBinary(byte数组)解码为MessageExt列表 // 再根据subscriptionData里的tag classFilter等过滤MessageExt列表 // 将过滤后的msgListFilterAgain列表塞道pullResult pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); // 设置下一次偏移量 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); // 计算拉取rt long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); // 如果本次拉取消息量为空,立即将pullRequest返回队列里,等后续继续拉 long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { // 拉到消息的情况如下 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); // 计算拉取的TPS DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); // 把消息放到处理队列里 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); // 向消费服务提交消费请求 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL); } else { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } } };
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest 。
public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { // 拿到批量消费size,默认为1 final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 小于等于1的话则直接消费 if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { // 这里开始批量消费,每批大小为consumeBatchSize for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { // 分批提交到线程池里消费 this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }
。
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run @Override public void run() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap<>()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { // msgs不为空的话,遍历设置消费开始时间戳 if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } // 批量消费msgs status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s", UtilAll.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue), e); hasException = true; } // 下面这些设置消费状态和返回类型 long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } // 有钩子函数的话,执行钩子函数 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); consumeMessageContext.setAccessChannel(defaultMQPushConsumer.getAccessChannel()); ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } // 计算消费rt ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
最后此篇关于RocketMq消费原理及源码解析的文章就讲到这里了,如果你想了解更多关于RocketMq消费原理及源码解析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我想使用我编写的类模块的事件。类模块如下所示 ''CError64Row Public Event ErrorClicked(ByVal row As Integer, ByVal column As
我正在寻找实现智能架构的良好实践,以及处理针对具有许多不同 wdsl web 服务的系统的集成的方法。 我已经有 2 年的爱好使用 C# 进行开发了~,因此我并不总是使用正确的术语,但我会尝试描述我正
目前,我正在为我的程序使用 Azure Consumer API。但它非常慢,几乎需要8秒才能给出响应。我现在应该怎么做?这是我正在使用的 azure API.. https://management
我的流程是: AcitveMQ 控制台在主题部分下显示了一个使用者,但是一旦
我一直在阅读类似 Why does a function that accepts a Box complain of a value being moved when a function that
AMQP 函数 consume() 是一个带有回调的阻塞函数,是否可以为 consume() 函数设置超时,以便在特定时间后不再阻塞并且代码执行完成? 最佳答案 是的,方法如下: $amqp = ne
我有一个客户端/服务器应用程序,其中客户端以 JSON 形式将对象发送到运行 PHP 脚本的服务器,然后将此数据放入数据库。 问题是解码是用 json_decode 函数完成的,它似乎适用于字符串而不
所以我已经模拟了我的生产者消费者问题并且我有下面的代码。我的问题是:如果消费者一直处于 while(true) 状态,他如何停止。 在下面的代码中,我添加了 i
我无法使用在delphi 中开发的dll 的功能。我在类型转换方面遇到了一些困难。 这是我要调用 DLL 的函数: function rData(ID: Cardinal; queue: WideSt
我想使用 Unity3D 可视化 Kafka 流。在 Unity 中访问数据流的最佳方式是什么? 我已经用 Node 和 C# 编写了基本使用者,但我不确定如何将它们合并到 Unity 中。任何帮助表
如果标题太笼统,我很抱歉,但我已经浏览了一个小时的互联网,但找不到任何架构解释。我对 RSS 和 Atom 协议(protocol)都是全新的,据我到目前为止所了解的是: 服务器发布文档 客户端订阅此
我很喜欢我刚刚发现的 Guzzle 框架。我正在使用它使用不同的响应结构跨多个 API 聚合数据。它可以使用 JSON 和 XML 找到,但我需要使用的服务之一使用 SOAP。是否有使用 Guzzle
有没有一种方法可以像访问 Microsoft.Azure.Management.Fluent 一样访问 Azure.Management.Conclusion.Models? 当我执行以下代码时,我看
我有这个部分场景图树: CustomPane (with onMouseClicked Handler) → ChildNode (with onMousePressed Handler) 当我在
我的问题是这个 json。 http://dev-rexolution.pantheonsite.io/api/noticias 我只需要使用 vuejs 2 使用数组的第一个元素才能显示它,使用我工
我是 ML 新手,一直在研究 CNTK 教程。我已经成功训练了几个模型。 我完成了迁移学习教程 ( https://github.com/Microsoft/CNTK/blob/v2.1/Tutori
我是 RabbitMq 和 AMQP 的新手,但我对 ActiveMQ 和 JMS 有一些经验。我尝试在主题(JMS 中的主题之类的主题)中发布一条消息,并从多个监听器中使用此消息。比如我发布一条消息
我正在尝试让我的服务器解析以下 JSON: {"hardwareId":1,"registerTime":"2017-02-14T03:42:11.482Z","sensorId":1,"temper
我正在开发一个从外部 url 使用 json 的网站,我试过了但是我得到了一个错误 XMLHttpRequest 无法加载 http://reuniyo.com/tst/json.php。 Acces
我正在尝试使用Kafka Streams(即不是简单的Kafka Consumer)从重试主题中读取之前无法处理的事件。我希望从重试主题中进行消费,如果处理仍然失败(例如,如果外部系统已关闭),我希望
我是一名优秀的程序员,十分优秀!