- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
所以我试图让交互式查询与 Kafka 流一起工作。我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上)。我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和 Kafka。
我已经设置了这样的主题
kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-submit-topic
kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-output-topic
KTable
.实际的流处理似乎按预期进行,我可以打印
KTable
的结果并且它们与针对该主题产生的内容相匹配。
Streams.allMetadata()
时似乎会出现此问题。方法,它返回一个空列表。
package Processing.Ratings {
import java.util.concurrent.TimeUnit
import Entities.Ranking
import Serialization.JSONSerde
import Topics.RatingsTopics
import scala.util.Random
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.serialization.Serdes
import Utils.Settings
import org.apache.kafka.clients.producer.ProducerConfig
object RatingsProducerApp extends App {
run()
private def run(): Unit = {
val jSONSerde = new JSONSerde[Ranking]
val random = new Random
val producerProps = Settings.createBasicProducerProperties
val rankingList = List(
Ranking("jarden@here.com","sacha@here.com", 1.5f),
Ranking("miro@here.com","mary@here.com", 1.5f),
Ranking("anne@here.com","margeret@here.com", 3.5f),
Ranking("frank@here.com","bert@here.com", 2.5f),
Ranking("morgan@here.com","ruth@here.com", 1.5f))
producerProps.put(ProducerConfig.ACKS_CONFIG, "all")
System.out.println("Connecting to Kafka cluster via bootstrap servers " +
s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}")
// send a random string from List event every 100 milliseconds
val rankingProducer = new KafkaProducer[String, Array[Byte]](
producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer)
//while (true) {
for (i <- 0 to 10) {
val ranking = rankingList(random.nextInt(rankingList.size))
val rankingBytes = jSONSerde.serializer().serialize("", ranking)
System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}")
rankingProducer.send(new ProducerRecord[String, Array[Byte]](
RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes))
Thread.sleep(100)
}
Runtime.getRuntime.addShutdownHook(new Thread(() => {
rankingProducer.close(10, TimeUnit.SECONDS)
}))
}
}
}
def createRatingStreamsProperties() : Properties = {
val props = createBasicStreamProperties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ratings-application")
props.put(StreamsConfig.CLIENT_ID_CONFIG, "ratings-application-client")
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
props
}
private def createBasicStreamProperties() : Properties = {
val props = new Properties()
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers)
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
// Records should be flushed every 10 seconds. This is less than the default
// in order to keep this example interactive.
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
// For illustrative purposes we disable record caches
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0.asInstanceOf[Object])
props
}
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream._
import Entities.Ranking
import Serialization.JSONSerde
import Topics.RatingsTopics
import Utils.Settings
package Processing.Ratings {
import Stores.StateStores
import org.apache.kafka.streams.state.HostInfo
class DummyRankingReducer extends Reducer[Ranking] {
override def apply(value1: Ranking, value2: Ranking): Ranking = {
value2
}
}
class RankingByEmailInitializer extends Initializer[List[Ranking]] {
override def apply(): List[Ranking] = List[Ranking]()
}
class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] {
override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = {
value :: aggregate
}
}
object RatingStreamProcessingApp extends App {
run()
private def run() : Unit = {
val stringSerde = Serdes.String
val rankingSerde = new JSONSerde[Ranking]
val listRankingSerde = new JSONSerde[List[Ranking]]
val builder: KStreamBuilder = new KStreamBuilder
val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC)
val rankingTable = rankings.groupByKey(stringSerde,rankingSerde)
.aggregate(
new RankingByEmailInitializer(),
new RankingByEmailAggregator(),
listRankingSerde,
StateStores.RANKINGS_BY_EMAIL_STORE
)
rankingTable.toStream.print()
val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties)
val restEndpoint:HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")
// Always (and unconditionally) clean local state prior to starting the processing topology.
// We opt for this unconditional call here because this will make it easier for you to play around with the example
// when resetting the application for doing a re-run (via the Application Reset Tool,
// http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
//
// The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch, which
// will take time and will require reading all the state-relevant data from the Kafka cluster over the network.
// Thus in a production scenario you typically do not want to clean up always as we do here but rather only when it
// is truly needed, i.e., only under certain conditions (e.g., the presence of a command line flag for your app).
// See `ApplicationResetExample.java` for a production-like example.
//streams.cleanUp();
streams.start()
val restService = new RatingRestService(streams, restEndpoint)
restService.start()
//****************************************************************
// WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
// WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
// WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
// WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
//****************************************************************
val SIZE = streams.allMetadata.size()
val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()
import org.apache.kafka.streams.state.KeyValueIterator
import org.apache.kafka.streams.state.QueryableStoreTypes
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)
val range = keyValueStore.all
val HASNEXT = range.hasNext
import org.apache.kafka.streams.KeyValue
while (range.hasNext ) {
val next = range.next
System.out.println(String.format("key: %s | value: %s", next.key, next.value))
}
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
restService.stop
}))
//return unit
()
}
}
kafka {
bootStrapServers = "localhost:9092"
zooKeepers = "zookeeper:2181"
schemaRegistryUrl = "http://localhost:8081"
partition = 0,
restApiDefaultHostName = "localhost",
restApiDefaultPort = "8080"
}
package Processing.Ratings
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.StreamsMetadata
import java.util.stream.Collectors
import Entities.HostStoreInfo
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.connect.errors.NotFoundException
import scala.collection.JavaConverters._
/**
* Looks up StreamsMetadata from KafkaStreams
*/
class MetadataService(val streams: KafkaStreams) {
/**
* Get the metadata for all of the instances of this Kafka Streams application
*
* @return List of { @link HostStoreInfo}
*/
def streamsMetadata() : List[HostStoreInfo] = {
// Get metadata for all of the instances of this Kafka Streams application
val metadata = streams.allMetadata
return mapInstancesToHostStoreInfo(metadata)
}
/**
* Get the metadata for all instances of this Kafka Streams application that currently
* has the provided store.
*
* @param store The store to locate
* @return List of { @link HostStoreInfo}
*/
def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {
// Get metadata for all of the instances of this Kafka Streams application hosting the store
val metadata = streams.allMetadataForStore(store)
return mapInstancesToHostStoreInfo(metadata)
}
/**
* Find the metadata for the instance of this Kafka Streams Application that has the given
* store and would have the given key if it exists.
*
* @param store Store to find
* @param key The key to find
* @return { @link HostStoreInfo}
*/
def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
// Get metadata for the instances of this Kafka Streams application hosting the store and
// potentially the value for key
val metadata = streams.metadataForKey(store, key, serializer)
if (metadata == null)
throw new NotFoundException(
s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")
return new HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
}
def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {
metadatas.stream.map[HostStoreInfo](metadata =>
HostStoreInfo(
metadata.host(),
metadata.port,
metadata.stateStoreNames.asScala.toList))
.collect(Collectors.toList())
.asScala.toList
}
}
package Processing.Ratings
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.HostInfo
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import Entities.AkkaHttpEntitiesJsonFormats._
import Entities._
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import scala.concurrent.Future
object RestService {
val DEFAULT_REST_ENDPOINT_HOSTNAME = "localhost"
}
class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {
val metadataService = new MetadataService(streams)
var bindingFuture: Future[Http.ServerBinding] = null
implicit val system = ActorSystem("rating-system")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
def start() : Unit = {
val emailRegexPattern = """\w+""".r
val route =
path("ratingByEmail" / emailRegexPattern) { email =>
get {
//TODO : This would come from Kafka store, either local or remote
complete(ToResponseMarshallable.apply(List[Ranking](
Ranking("fred@here.com", "sacha@there.com", 4.0f),
Ranking("sam@here.com", "sacha@there.com", 2.0f)))
)
}
} ~
path("instances") {
get {
val x = metadataService.streamsMetadata
complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
}
}
bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")
Runtime.getRuntime.addShutdownHook(new Thread(() => {
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}))
}
def stop() : Unit = {
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
hostStoreInfo.host.equals(hostInfo.host()) &&
hostStoreInfo.port == hostInfo.port
}
}
KTable
看看结果如何正在显示,然后我启动了生产者并通过流获取的主题推送了更多消息
localhost:8080/instances
获取元数据时, 我得到的只是一个空列表
[]
val SIZE = streams.allMetadata.size()
val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()
import org.apache.kafka.streams.state.KeyValueIterator
import org.apache.kafka.streams.state.QueryableStoreTypes
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)
val range = keyValueStore.all
val HASNEXT = range.hasNext
import org.apache.kafka.streams.KeyValue
while (range.hasNext ) {
val next = range.next
System.out.println(String.format("key: %s | value: %s", next.key, next.value))
}
最佳答案
所以我想通了,原来是由于缺少这个配置值
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080")
http://localhost:8080/instance
开始工作。但后来我开始收到这个奇怪的异常
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-key-value-store, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)
package Utils
import scala.concurrent._
import scala.concurrent.duration._
object Retry {
/**
* exponential back off for retry
*/
def exponentialBackoff(r: Int): Duration = scala.math.pow(2, r).round * 500 milliseconds
def noIgnore(t: Throwable): Boolean = false
/**
* retry a particular block that can fail
*
* @param maxRetry how many times to retry before to giveup
* @param deadline how long to retry before giving up; default None
* @param backoff a back-off function that returns a Duration after which to retry. default is an exponential backoff at 100 milliseconds steps
* @param ignoreThrowable if you want to stop retrying on a particular exception
* @param block a block of code to retry
* @param ctx an execution context where to execute the block
* @returns an eventual Future succeeded with the value computed or failed with one of:
* `TooManyRetriesException` if there were too many retries without an exception being caught. Probably impossible if you pass decent parameters
* `DeadlineExceededException` if the retry didn't succeed before the provided deadline
* `TimeoutException` if you provide a deadline and the block takes too long to execute
* `Throwable` the last encountered exception
*/
def retry[T](maxRetry: Int,
deadline: Option[Deadline] = None,
backoff: (Int) => Duration = exponentialBackoff,
ignoreThrowable: Throwable => Boolean = noIgnore)(block: => T)(implicit ctx: ExecutionContext): Future[T] = {
class TooManyRetriesException extends Exception("too many retries without exception")
class DeadlineExceededException extends Exception("deadline exceded")
val p = Promise[T]
def recursiveRetry(retryCnt: Int, exception: Option[Throwable])(f: () => T): Option[T] = {
if (maxRetry == retryCnt
|| deadline.isDefined && deadline.get.isOverdue) {
exception match {
case Some(t) =>
p failure t
case None if deadline.isDefined && deadline.get.isOverdue =>
p failure (new DeadlineExceededException)
case None =>
p failure (new TooManyRetriesException)
}
None
} else {
val success = try {
val rez = if (deadline.isDefined) {
Await.result(future(f()), deadline.get.timeLeft)
} else {
f()
}
Some(rez)
} catch {
case t: Throwable if !ignoreThrowable(t) =>
blocking {
val interval = backoff(retryCnt).toMillis
Thread.sleep(interval)
}
recursiveRetry(retryCnt + 1, Some(t))(f)
case t: Throwable =>
p failure t
None
}
success match {
case Some(v) =>
p success v
Some(v)
case None => None
}
}
}
def doBlock() = block
Future {
recursiveRetry(0, None)(doBlock)
}
p.future
}
}
def printStoreMetaData(streams:KafkaStreams) : Unit = {
import org.apache.kafka.streams.state.KeyValueIterator
import org.apache.kafka.streams.state.QueryableStoreTypes
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
val keyValueStoreTry = waitUntilStoreIsQueryable(
StateStores.RANKINGS_BY_EMAIL_STORE,
QueryableStoreTypes.keyValueStore[String,List[Ranking]](),
streams
) match {
case Success(keyValueStore) => {
val SIZE = streams.allMetadata.size()
val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()
val range = keyValueStore.all
val HASNEXT = range.hasNext
import org.apache.kafka.streams.KeyValue
while (range.hasNext ) {
val next = range.next
System.out.println(String.format("key: %s | value: %s", next.key, next.value))
}
}
case Failure(f) => println(f)
}
}
关于apache-kafka - Kafkastreams.allMetadata() 方法返回空列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46054763/
如标题所示,ans_list是一个答案列表,ans_index是一个数字(答案在词汇表中的索引,但与atm无关) 这里生成的 tree.anslist 是什么? (例如,仅针对第一个),忽略迭代。 f
我目前将用户的输入存储在逗号分隔的列表中,如下所示: Userid | Options 1 | 1,2,5 用户在一个数组形式中勾选一组选项,然后用逗号连接起来 1,2,5 然后 MySQ
我目前将用户的输入存储在逗号分隔的列表中,如下所示: Userid | Options 1 | 1,2,5 用户在一个数组形式中勾选一组选项,然后用逗号连接起来 1,2,5 然后 MySQ
我想知道如何完全展平列表和包含它们的东西。除其他外,我想出了一个解决方案,它可以将具有多个元素的东西滑倒并将它们放回原处,或者在滑倒后将具有一个元素的东西拿走。 这与 How do I “flatte
我想知道如何完全展平列表和包含它们的东西。除其他外,我想出了一个解决方案,它可以将具有多个元素的东西滑倒并将它们放回原处,或者在滑倒后将带有一个元素的东西拿走。 这与 How do I “flatte
这个问题已经有答案了: Convert nested list to 2d array (3 个回答) 已关闭 7 年前。 java中有没有快捷方式可以转换 List> 到 String[][] ?
我在排序时遇到问题 List> 。我创建了一个自定义比较器,在其中编写了对数据进行排序的代码。 public class CustomComparator implements Comparator
这个问题已经有答案了: 已关闭10 年前。 Possible Duplicate: Java Generics: Cannot cast List to List? 我只是想知道为什么下面的java代
试图想出一个 LINQy 方法来做到这一点,但我什么也没想到。 我有一个对象列表<>,其中包含一个属性,该属性是逗号分隔的字母代码列表: lst[0].codes = "AA,BB,DD" lst[1
假设我有这些任务: points = [] point = (1, 2) 我怎么会这样做: points += point 它工作得很好,并且给了我点 = [1, 2]。但是,如果我这样做: poin
如何在 scala 中将 List[Task[List[Header]]] 类型转换为 Task[List[Header]]。 我有一个方法返回 Task[List[Header]] 并多次调用 do
如何在 Java 中查找二维列表的元素? 我有一个参数为 List> 的函数我想知道如何找到这个列表的行和列。 最佳答案 如果你喜欢 List> obj 然后你就可以像这样访问 obj.get(cur
分配 List到 List工作正常。 分配 List>到 List>不编译。 代码 public class Main { public static void main(String[] a
我正在用 Java 编写一个方法,该方法必须接收并迭代 Serializable 的 List。 有什么区别: public void myMethod(List list) { } 和 public
我看到很多人想用 mvvm 更新网格/列表/树的一部分,但他们不想刷新整个列表。 对于所有遇到此问题的人,我做了以下示例。 希望这对你有用。 最佳答案 这是一个简单的例子。整个代码中最重要的是: Bi
我正在为现有的 C++ 库编写包装器,该库使用列表,其中 T 是自定义结构。我被建议使用 vector 而不是列表,但我试图避免修改库。 为了更好地理解这个场景,我做了一个简单的应用程序,使用一个列表
List list List list 这两种声明有什么区别吗? 谢谢, 最佳答案 是的。 List可以包含所有派生自 Base 的不同事物的混合物. List包含同质项(从某种意义上说,它们必须全部
有人可以尽可能详细地解释以下类型之间的区别吗? List List List 让我更具体一点。我什么时候想使用 // 1 public void CanYouGiveMeAnAnswer(List l
我有一个元组列表,每个元组都是一对列表。所以我的数据看起来像: mylist = [(['foo', 'bar'], ['bar', 'bar']),(['bar', 'bar'],['bar', '
也许是一个时髦的标题,但我遇到了以下问题: 给定一个类型为 (a * b) list 的列表,我想创建一个类型为 (a * b list) list 的新列表。一个例子: 给定列表 let testL
我是一名优秀的程序员,十分优秀!