gpt4 book ai didi

scala - Hadoop : java. io.IOException:传递删除或放置

转载 作者:可可西里 更新时间:2023-11-01 15:34:36 25 4
gpt4 key购买 nike

我在控制台上收到了这些错误日志

java.io.IOException: Pass a Delete or a Put
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:586)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)
15/01/06 14:13:34 INFO mapred.JobClient: Job complete: job_local259887539_0001
15/01/06 14:13:34 INFO mapred.JobClient: Counters: 19
15/01/06 14:13:34 INFO mapred.JobClient: File Input Format Counters
15/01/06 14:13:34 INFO mapred.JobClient: Bytes Read=0
15/01/06 14:13:34 INFO mapred.JobClient: FileSystemCounters
15/01/06 14:13:34 INFO mapred.JobClient: FILE_BYTES_READ=12384691
15/01/06 14:13:34 INFO mapred.JobClient: FILE_BYTES_WRITTEN=12567287
15/01/06 14:13:34 INFO mapred.JobClient: Map-Reduce Framework
15/01/06 14:13:34 INFO mapred.JobClient: Reduce input groups=0
15/01/06 14:13:34 INFO mapred.JobClient: Map output materialized bytes=8188
15/01/06 14:13:34 INFO mapred.JobClient: Combine output records=0
15/01/06 14:13:34 INFO mapred.JobClient: Map input records=285
15/01/06 14:13:34 INFO mapred.JobClient: Reduce shuffle bytes=0
15/01/06 14:13:34 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
15/01/06 14:13:34 INFO mapred.JobClient: Reduce output records=0
15/01/06 14:13:34 INFO mapred.JobClient: Spilled Records=285
15/01/06 14:13:34 INFO mapred.JobClient: Map output bytes=7612
15/01/06 14:13:34 INFO mapred.JobClient: Total committed heap usage (bytes)=1029046272
15/01/06 14:13:34 INFO mapred.JobClient: CPU time spent (ms)=0
15/01/06 14:13:34 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
15/01/06 14:13:34 INFO mapred.JobClient: SPLIT_RAW_BYTES=77
15/01/06 14:13:34 INFO mapred.JobClient: Map output records=285
15/01/06 14:13:34 INFO mapred.JobClient: Combine input records=0
15/01/06 14:13:34 INFO mapred.JobClient: Reduce input records=0

当我尝试使用基于 http://hbase.apache.org/book/mapreduce.example.html#mapreduce.example.readwrite 的 Scala 实现制作 CopyTable 时

这是我的代码示例,有什么比这样做更好的吗?

package com.example

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Get
import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce._
import scala.collection.JavaConversions._

case class HString(name: String) {
lazy val bytes = name.getBytes
override def toString = name
}
object HString {
import scala.language.implicitConversions
implicit def hstring2String(src: HString): String = src.name
implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes
}

object Families {
val stream = HString("stream")
val identity = HString("identity")
}
object Qualifiers {
val title = HString("title")
val url = HString("url")
val media = HString("media")
val media_source = HString("media_source")
val content = HString("content")
val nolimitid_timestamp = HString("nolimitid.timestamp")
val original_id = HString("original_id")
val timestamp = HString("timestamp")
val date_created = HString("date_created")
val count = HString("count")
}
object Tables {
val rawstream100 = HString("raw_stream_1.0.0")
val rawstream = HString("rawstream")
}

class tmapper extends TableMapper[ImmutableBytesWritable, Put]{
def map (row: ImmutableBytesWritable, value: Result, context: Context) {
val put = new Put(row.get())
for (kv <- value.raw()) {
put.add(kv)
}
context.write(row, put)
}
}

object Hello {
val hbaseMaster = "127.0.0.1:60000"
val hbaseZookeper = "127.0.0.1"
def main(args: Array[String]): Unit = {
val conf = HBaseConfiguration.create()
conf.set("hbase.master", hbaseMaster)
conf.set("hbase.zookeeper.quorum", hbaseZookeper)
val hbaseAdmin = new HBaseAdmin(conf)

val job = Job.getInstance(conf, "CopyTable")
job.setJarByClass(classOf[Hello])
job.setMapperClass(classOf[tmapper])
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[Result])
//
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Put])

val scan = new Scan()
scan.setCaching(500) // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false) // don't set to true for MR jobs

TableMapReduceUtil.initTableMapperJob(
Tables.rawstream100.bytes, // input HBase table name
scan, // Scan instance to control CF and attribute selection
classOf[tmapper], // mapper class
null, // mapper output key class
null, // mapper output value class
job
)

TableMapReduceUtil.initTableReducerJob(
Tables.rawstream, // Table name
null, // Reducer class
job
)
val b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
}

class Hello {}

再次感谢

最佳答案

如果你的任务只是复制表(不是通过scala在hbase中实现mapreduce)你可以使用CopyTable hbase-server 包中的类,如下所示:

import org.apache.hadoop.hbase.mapreduce.CopyTable
CopyTable.main(Array("--peer.adr=127.0.0.1:2181:/hbase", "--new.name=rawstream", "raw_stream_1.0.0"))

查看CopyTable documentation用于其他参数。

关于scala - Hadoop : java. io.IOException:传递删除或放置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27539964/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com