- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想使用实现 com.bazaarvoice.jolt.Transform 接口(interface)的自定义 JSON 转换。
我像这样使用“自定义转换类名称”和“自定义模块目录”:
但是,我无法让 JoltTransformJSON 处理器使用它;我得到一个 ClassNotFoundException:
2019-04-01 14:30:54,196 ERROR [Timer-Driven Process Thread-4] o.a.n.p.standard.JoltTransformJSON JoltTransformJSON[id=b407714f-0169-1000-d9b2-1709069238d7] Unable to transform StandardFlowFileRecord[uuid=72dc471b-c587-4da9-b54c-eb46247b0cf4,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1554129053747-21203, container=default, section=723], offset=607170, length=5363],offset=0,name=72dc471b-c587-4da9-b54c-eb46247b0cf4,size=5363] due to java.util.concurrent.CompletionException: java.lang.ClassNotFoundException: org.sentilo.nifi.elasticsearch.ElasticsearchToOpenTSDB: java.util.concurrent.CompletionException: java.lang.ClassNotFoundException: org.sentilo.nifi.elasticsearch.ElasticsearchToOpenTSDB
java.util.concurrent.CompletionException: java.lang.ClassNotFoundException: org.sentilo.nifi.elasticsearch.ElasticsearchToOpenTSDB
at com.github.benmanes.caffeine.cache.BoundedLocalCache$BoundedLocalLoadingCache.lambda$new$0(BoundedLocalCache.java:3373)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2039)
at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2037)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2020)
at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:112)
at com.github.benmanes.caffeine.cache.LocalLoadingCache.get(LocalLoadingCache.java:67)
at org.apache.nifi.processors.standard.JoltTransformJSON.getTransform(JoltTransformJSON.java:316)
at org.apache.nifi.processors.standard.JoltTransformJSON.onTrigger(JoltTransformJSON.java:277)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:205)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.sentilo.nifi.elasticsearch.ElasticsearchToOpenTSDB
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.nifi.processors.standard.util.jolt.TransformFactory.getCustomTransform(TransformFactory.java:65)
at org.apache.nifi.processors.standard.JoltTransformJSON.createTransform(JoltTransformJSON.java:346)
at org.apache.nifi.processors.standard.JoltTransformJSON.lambda$setup$0(JoltTransformJSON.java:324)
at com.github.benmanes.caffeine.cache.BoundedLocalCache$BoundedLocalLoadingCache.lambda$new$0(BoundedLocalCache.java:3366)
... 19 common frames omitted
我使用 maven-assembly-plugin 编译了该类及其所有依赖项,并将其放在目录“/data/bin/nifi-1.9.1/jolt_modules”中。
package org.sentilo.nifi.elasticsearch;
import com.bazaarvoice.jolt.SpecDriven;
import com.bazaarvoice.jolt.Transform;
import com.bazaarvoice.jolt.exception.TransformException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.beanutils.BeanUtils;
import org.sentilo.agent.historian.domain.OpenTSDBDataPoint;
import org.sentilo.agent.historian.utils.OpenTSDBValueConverter;
import org.sentilo.common.domain.EventMessage;
import org.sentilo.nifi.elasticsearch.model.Hits;
import org.springframework.util.StringUtils;
import javax.inject.Inject;
import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.sentilo.agent.historian.utils.OpenTSDBValueConverter.replaceIllegalCharacters;
public class ElasticsearchToOpenTSDB implements SpecDriven, Transform {
private final Object spec;
private final ObjectMapper mapper = new ObjectMapper();
public ElasticsearchToOpenTSDB() {
this.spec = "{}";
}
@Inject
public ElasticsearchToOpenTSDB( Object spec ) {
this.spec = spec;
}
public Object transform( final Object input ) {
try{
Hits hits = mapper.readValue(input.toString(), Hits.class);
List<EventMessage> newEventList = new ArrayList<EventMessage>();
List<OpenTSDBDataPoint> dataPoints = new ArrayList<OpenTSDBDataPoint>();
for(EventMessage event : hits.hits) {
if (OpenTSDBValueConverter.isComplexValue(event.getMessage())) {
addComplexValueToQueue(event,newEventList);
} else {
addSimpleValueToQueue(event, newEventList);
}
}
for(EventMessage event2 : newEventList) {
OpenTSDBDataPoint dp = unmarshal(event2);
dataPoints.add(dp);
}
return dataPoints;
}catch(Exception e) {
throw new TransformException(e.getMessage());
}
}
private void addComplexValueToQueue(final EventMessage event, List<EventMessage> eventList) throws IllegalAccessException, InvocationTargetException {
// Flatten JSON message into N measures
final String metricName = OpenTSDBValueConverter.createMetricName(event);
final Map<String, Object> unfoldValues = OpenTSDBValueConverter.extractMeasuresFromComplexType(metricName, event.getMessage());
for (final Map.Entry<String, Object> e : unfoldValues.entrySet()) {
final EventMessage newEvent = new EventMessage();
BeanUtils.copyProperties(newEvent, event);
newEvent.setTopic(e.getKey());
newEvent.setMessage(e.getValue().toString());
eventList.add(newEvent);
}
}
private void addSimpleValueToQueue(final EventMessage event, List<EventMessage> eventList) {
// The value should be long, float or boolean
try {
final Object numericValue = OpenTSDBValueConverter.getSimpleValue(event.getMessage());
final String metricName = OpenTSDBValueConverter.createMetricName(event);
event.setMessage(numericValue.toString());
event.setTopic(metricName);
eventList.add(event);
} catch (final ParseException e) {
// Probably String or some non-numeric value that we cannot store in OpenTSDB. Pass
return;
}
}
public static OpenTSDBDataPoint unmarshal(final EventMessage event) throws ParseException {
final OpenTSDBDataPoint dataPoint = new OpenTSDBDataPoint();
dataPoint.setMetric(event.getTopic());
dataPoint.setValue(OpenTSDBValueConverter.getSimpleValue(event.getMessage()));
if (event.getPublishedAt() != null) {
dataPoint.setTimestamp(event.getPublishedAt());
} else {
dataPoint.setTimestamp(event.getTime());
}
dataPoint.setTags(createTags(event));
return dataPoint;
}
private static Map<String, String> createTags(final EventMessage event) {
final Map<String, String> tags = new LinkedHashMap<String, String>();
putTag(tags, OpenTSDBDataPoint.Tags.type.name(), replaceIllegalCharacters(event.getType()));
putTag(tags, OpenTSDBDataPoint.Tags.sensor.name(), replaceIllegalCharacters(event.getSensor()));
putTag(tags, OpenTSDBDataPoint.Tags.provider.name(), replaceIllegalCharacters(event.getProvider()));
putTag(tags, OpenTSDBDataPoint.Tags.component.name(), replaceIllegalCharacters(event.getComponent()));
putTag(tags, OpenTSDBDataPoint.Tags.alertType.name(), replaceIllegalCharacters(event.getAlertType()));
putTag(tags, OpenTSDBDataPoint.Tags.sensorType.name(), replaceIllegalCharacters(event.getSensorType()));
putTag(tags, OpenTSDBDataPoint.Tags.publisher.name(), replaceIllegalCharacters(event.getPublisher()));
putTag(tags, OpenTSDBDataPoint.Tags.tenant.name(), replaceIllegalCharacters(event.getTenant()));
putTag(tags, OpenTSDBDataPoint.Tags.publisherTenant.name(), replaceIllegalCharacters(event.getPublisherTenant()));
return tags;
}
private static void putTag(final Map<String, String> tags, final String tagName, final String tagValue) {
if (StringUtils.hasText(tagValue)) {
tags.put(tagName, tagValue);
}
}
}
更新
最佳答案
该问题尚未解决,已作为错误报告提交。最新状态可以在这里看到:
https://issues.apache.org/jira/browse/NIFI-6213
关于java - Nifi JSON ETL : Custom Transformation Class not found with JoltTransformJSON Processor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55458351/
最近开始学习MongoDB。今天老师教了我们 mongoexport 命令。在练习时,我遇到了一个典型的问题,包括教练在内的其他同学都没有遇到过。我在我的 Windows 10 机器上使用 Mongo
我是 JSON Schema 的新手,读过什么是 JSON Schema 等等。但我不知道如何将 JSON Schema 链接到 JSON 以针对该 JSON Schema 进行验证。谁能解释一下?
在 xml 中,我可以在另一个 xml 文件中包含一个文件并使用它。如果您的软件从 xml 获取配置文件但没有任何方法来分离配置,如 apache/ngnix(nginx.conf - site-av
我有一个 JSON 对象,其中包含一个本身是 JSON 对象的字符串。我如何反序列化它? 我希望能够做类似的事情: #[derive(Deserialize)] struct B { c: S
考虑以下 JSON { "a": "{\"b\": 12, \"c\": \"test\"}" } 我想定义一个泛型读取 Reads[Outer[T]]对于这种序列化的 Json import
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 11 个月前关闭。 Improve
我的旧项目在 MySQL 中有 Standard JSON 格式的数据。 对于我在 JS (Node.js) 和 DynamoDB 中的全新项目,关于 Standard JSON格式: 是否建议将其转
JSON 值字符串、数字、true、false、null 是否是有效的 JSON? 即,是 true 一个有效的 JSON 文档?还是必须是数组/对象? 一些验证器接受这个(例如 http://jso
我有一个 JSON 字符串,其中一个字段是文本字段。这个文本字段可以包含用户在 UI 中输入的文本,如果他们输入的文本是 JSON 文本,也许是为了说明一些编码,我需要对他们的文本进行编码,以便它不会
我正在通过 IBM MQ 调用处理数据,当由 ColdFusion 10 (10,0,11,285437) 序列化时,0 将作为 +0.0 返回,它会导致无效的 JSON并且无法反序列化。 stPol
我正在从三个数组中生成一个散列,然后尝试构建一个 json。我通过 json object has array 成功了。 require 'json' A = [['A1', 'A2', 'A3'],
我从 API 接收 JSON,响应可以是 30 种类型之一。每种类型都有一组唯一的字段,但所有响应都有一个字段 type 说明它是哪种类型。 我的方法是使用serde .我为每种响应类型创建一个结构并
我正在下载一个 JSON 文件,我已将其检查为带有“https://jsonlint.com”的有效 JSON 到文档目录。然后我打开文件并再次检查,结果显示为无效的 JSON。这怎么可能????这是
我正在尝试根据从 API 接收到的数据动态创建一个 JSON 对象。 收到的示例数据:将数据解码到下面给出的 CiItems 结构中 { "class_name": "test", "
我想从字符串转换为对象。 来自 {"key1": "{\n \"key2\": \"value2\",\n \"key3\": {\n \"key4\": \"value4\"\n }\n
目前我正在使用以下代码将嵌套的 json 转换为扁平化的 json: import ( "fmt" "github.com/nytlabs/gojsonexplode" ) func
我有一个使用来自第三方 API 的数据的应用程序。我需要将 json 解码为一个结构,这需要该结构具有“传入”json 字段的 json 标签。传出的 json 字段具有不同的命名约定,因此我需要不同
我想使用 JSON 架构来验证某些值。我有两个对象,称它们为 trackedItems 和 trackedItemGroups。 trackedItemGroups 是组名称和 trackedItem
考虑以下案例类模式, case class Y (a: String, b: String) case class X (dummy: String, b: Y) 字段b是可选的,我的一些数据集没有字
我正在存储 cat ~/path/to/file/blah | 的输出jq tojson 在一个变量中,稍后在带有 JSON 内容的 curl POST 中使用。它运作良好,但它删除了所有换行符。我知
我是一名优秀的程序员,十分优秀!