- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在尝试创建一个 Hive UDAF 来查找最常出现的列(字符串)值(不是单个字符或子字符串,使用精确的列值)。假设以下是我的名为 my_table 的表(破折号用于在视觉上分隔列)。
User_Id - Item - Count
1 - A - 1
1 - B - 1
1 - A - 1
1 - A - 1
1 - A - 1
1 - C - 1
2 - C - 1
2 - C - 1
2 - A - 1
2 - C - 1
假设我调用以下脚本:
Select User_Id, findFrequent(*) from my_table group by User_Id
我应该得到以下输出,因为对于 User_Id=1,A 出现了 4 次而 B 和 C 只出现了一次。所以,User_Id=1最频繁的是A。同样,User_Id=2最频繁的是C。换句话说,每个唯一的User_Id应该只有一个最频繁的列值。
1 - A
2 - C
我按照这个例子创建了一个类 https://github.com/rathboma/hive-extension-examples/blob/master/src/main/java/com/matthewrathbone/example/TotalNumOfLettersGenericUDAF.java但到目前为止还没有运气。这是我的代码:
@Description(name = "FindMostCommonString", value = "_FUNC_(expr) - Returns most commonly found string of a column.")
public class FindMostCommonString extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws SemanticException {
if (parameters.length != 1) {
throw new UDFArgumentTypeException(parameters.length - 1,
"Exactly one argument is expected.");
}
ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE){
throw new UDFArgumentTypeException(0,
"Argument must be PRIMITIVE, but "
+ oi.getCategory().name()
+ " was passed.");
}
PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi;
if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){
throw new UDFArgumentTypeException(0,
"Argument must be String, but "
+ inputOI.getPrimitiveCategory().name()
+ " was passed.");
}
return new MostCommonStringEvaluator();
}
public static class MostCommonStringEvaluator extends GenericUDAFEvaluator {
PrimitiveObjectInspector inputOI;
ObjectInspector outputOI;
MapObjectInspector mapOI;
HashMap<String, Integer> total = new HashMap<String, Integer>();
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException {
assert (parameters.length == 1);
super.init(m, parameters);
// init input object inspectors
if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
inputOI = (PrimitiveObjectInspector) parameters[0];
}
else{
mapOI = (MapObjectInspector) parameters[0];
}
outputOI = ObjectInspectorFactory.getReflectionObjectInspector(String.class,
ObjectInspectorOptions.JAVA);
return outputOI;
}
static class StringCountAgg implements AggregationBuffer {
HashMap<String, Integer> strCount;
void add(String str){
if(strCount.containsKey(str)){
strCount.put(str,strCount.get(str)+1);
}
else{
strCount.put(str,1);
}
}
StringCountAgg(){
strCount = new HashMap<String, Integer>();
}
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
StringCountAgg result = new StringCountAgg();
return result;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
StringCountAgg myagg = new StringCountAgg();
}
private boolean warned = false;
@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
assert (parameters.length == 1);
if (parameters[0] != null) {
StringCountAgg myagg = (StringCountAgg) agg;
Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);
myagg.add((String)p1);
}
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
StringCountAgg myagg = (StringCountAgg) agg;
appendToHashMap(total, myagg.strCount);
return total;
}
@Override
public void merge(AggregationBuffer agg, Object partial)
throws HiveException {
if (partial != null) {
StringCountAgg myagg1 = (StringCountAgg) agg;
HashMap<String, Integer> partialRes = (HashMap<String, Integer> ) mapOI.getMap(partial);
appendToHashMap(myagg1.strCount, partialRes);
}
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
StringCountAgg myagg = (StringCountAgg) agg;
appendToHashMap(total, myagg.strCount);
String result = null;
int maxCount = 0;
for(String key: total.keySet()){
if(total.get(key) > maxCount){
maxCount = total.get(key);
result = key;
}
}
return result;
}
private void appendToHashMap(HashMap<String, Integer> main, HashMap<String, Integer> strCount) {
for(String key: strCount.keySet()){
if(main.containsKey(key)){
main.put(key,main.get(key)+strCount.get(key));
}
else{
main.put(key, strCount.get(key));
}
}
}
}
}
最佳答案
select User_Id,Item from HiveTable;
+---------+------+
| User_Id | Item |
+---------+------+
| 1 | A |
| 1 | B |
| 1 | A |
| 1 | A |
| 1 | A |
| 1 | C |
| 2 | C |
| 2 | C |
| 2 | C |
| 2 | A |
| 2 | C |
+---------+------+
查询-
select User_Id, Item from
(
select User_Id,count(*) as total,Item from HiveTable group by User_Id, Item order by total desc
)q3 group by User_Id;
输出
+---------+------+
| User_Id | Item |
+---------+------+
| 1 | A |
| 2 | C |
+---------+------+
希望对你有帮助
关于java - 用于查找最常出现的列值的 Hive UDAF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38425719/
我正在尝试使用 Spark UDAF 将两个现有列汇总到一个新列中。大多数关于 Spark UDAF 的教程都使用索引来获取输入行每一列中的值。像这样: input.getAs[String](1)
我是Scala开发的新手,正在尝试解决以下问题: 我有一个UDAF,它返回复杂对象的数组(是一个字符串和一个字符串数组)。在更新方法中,缓冲区返回的是wrappedArray类型,我不知道如何使用缓冲
我正在尝试在 Spark 中编写一些注重性能的代码,并想知道我是否应该编写 Aggregator或 User-defined Aggregate Function (UDAF) 用于我在 Datafr
对于我的特定要求,我想编写一个 UDAF,它只是收集所有输入行。 输入是一个两列的行,Double Type; “我认为”的中间模式是 ArrayList(如果我错了,请纠正我) 返回的数据类型是Ar
我从我的 mongodb 中获取了一些数据,如下所示: +------+-------+ | view | data | +------+-------+ |
我正在使用 Scala + Spark 2.0 并尝试编写一个 UDAF,该 UDAF 将元组数组作为其内部缓冲区以及返回类型:... def bufferSchema = new StructTyp
我想编写 Spark UDAF,其中列的类型可以是任何定义了 Scala Numeric 的类型。我在 Internet 上搜索过,但只找到了具体类型的示例,例如 DoubleType、LongTyp
我正在根据 UDAF example 实现 UDAF 。更新阶段如下所示: public void update(MutableAggregationBuffer buffer, Row in
我有一个具有第三方不可序列化属性的类,我需要将其发送到使用该类的一种方法的 UDAF。 由于不可序列化属性,我无法添加“实现可序列化”,并且我无法创建子类包装器,因为该属性在其构造函数中需要一个参数.
我正在尝试创建一个 Hive UDAF 来查找最常出现的列(字符串)值(不是单个字符或子字符串,使用精确的列值)。假设以下是我的名为 my_table 的表(破折号用于在视觉上分隔列)。 User_I
我猜 BigQuery 不支持 UDAF,因为我只能找到关于 UDF 的信息。 BigQuery 是否支持 UDAF?如果没有,有没有办法以某种方式在聚合结果上运行 UDF?也许通过使用 ARRAY_
Spark UDAF 要求您实现多种方法,特别是def update(buffer: MutableAggregationBuffer, input: Row): Unit和def merge(buf
我在名为 end_stats_df 的 pyspark 数据框中有以下数据: values start end cat1 cat2 10 1 2
我想知道在什么情况下 Spark 将执行合并作为 UDAF 功能的一部分。 动机: 我在 Spark 项目中的一个窗口上使用了很多 UDAF 函数。我经常想回答这样的问题: 信用卡交易在 30 天内与
有两个选择语句: select max(min(str)) from (select 0 id, 'a' str from dual) group by id having min(str
我想根据日期、var_currecy_code、 fxd_crncy_code。 我们的 hive 表中有所有数据,现在我们需要使用 hive UDAF 根据最大日期和上面提到的更多输入来计算 c
我有以下架构 - root |-- id:string (nullable = false) |-- age: long (nullable = true) |-- cars: struct (
例如,对于调试 pig udfs,这是可行的:http://ben-tech.blogspot.ie/2011/08/how-to-debug-pig-udfs-in-eclipse.html 我有一
我想创建一个 Map 列来计算出现次数。 例如: +---+----+ | b| a| +---+----+ | 1| b| | 2|null| | 1| a| | 1| a
在 Spark 的文档中,聚合器: abstract class Aggregator[-IN, BUF, OUT] extends Serializable A base class for use
我是一名优秀的程序员,十分优秀!