gpt4 book ai didi

java - 如何防止hadoop流关闭?

转载 作者:可可西里 更新时间:2023-11-01 15:39:56 25 4
gpt4 key购买 nike

我构建了一个基本的网络解析器,它使用 hadoop 将 url 传递给多个线程。在我到达输入文件的末尾之前,这工作得很好,Hadoop 在仍有线程运行时声明自己已完成。这会导致错误 org.apache.hadoop.fs.FSError: java.io.IOException: Stream Closed。无论如何要保持流打开足够长的时间以使线程完成? (我可以相当准确地预测线程将花费在单个 url 上的最长时间)。

下面是我如何执行线程

public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
private Text word = new Text();
private URLPile pile = new URLPile();
private MSLiteThread[] Threads = new MSLiteThread[16];
private boolean once = true;

@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter) {

String url = value.toString();
StringTokenizer urls = new StringTokenizer(url);
Config.LoggerProvider = LoggerProvider.DISABLED;
System.out.println("In Mapper");
if (once) {
for (MSLiteThread thread : Threads) {
System.out.println("created thread");
thread = new MSLiteThread(pile);
thread.start();
}
once = false;
}

while (urls.hasMoreTokens()) {
try {
word.set(urls.nextToken());
String currenturl = word.toString();
pile.addUrl(currenturl, output);

} catch (Exception e) {
e.printStackTrace();
continue;
}

}

}

线程自己获取这样的 url

    public void run(){
try {
sleep(3000);
while(!done()){
try {
System.out.println("in thread");
MSLiteURL tempURL = pile.getNextURL();
String currenturl = tempURL.getURL();
urlParser.parse(currenturl);
urlText.set("");
titleText.set(currenturl+urlParser.export());
System.out.println(urlText.toString()+titleText.toString());
tempURL.getOutput().collect(urlText, titleText);
pile.doneParsing();
sleep(30);
} catch (Exception e) {
pile.doneParsing();
e.printStackTrace();
continue;
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Thread done");

}

而urlpile中的相关方法是

public synchronized void addUrl(String url,OutputCollector<Text, Text> output) throws InterruptedException {
while(queue.size()>16){
System.out.println("queue full");
wait();
}
finishedParcing--;
queue.add(new MSLiteURL(output,url));
notifyAll();
}

private Queue<MSLiteURL> queue = new LinkedList<MSLiteURL>();
private int sent = 0;
private int finishedParcing = 0;
public synchronized MSLiteURL getNextURL() throws InterruptedException {

notifyAll();
sent++;
//System.out.println(queue.peek());
return queue.remove();

}

最佳答案

正如我可以从下面的评论中推断的那样,您可能可以在每个 map() 函数中执行此操作以使事情变得简单。我看到您执行以下操作以预先创建一些空闲线程。您可以将以下代码移动到

if (once) {
for (MSLiteThread thread : Threads) {
System.out.println("created thread");
thread = new MSLiteThread(pile);
thread.start();
}
once = false;
}

到,

public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
@Override
public void configure(JobConf job) {
for (MSLiteThread thread : Threads) {
System.out.println("created thread");
thread = new MSLiteThread(pile);
thread.start();
}
}

@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter) {
}

}

因此,这可以被初始化一次,就此而言,不再需要“一次”条件检查。

此外,您不需要像上面那样创建空闲线程。我不知道创建 16 个空闲线程会带来多少性能提升。

无论如何,这是一个解决方案(虽然可能并不完美)

您可以使用倒计时闩锁之类的东西 Read more here以 N 或更多的批处理处理您的 url 并阻止直到它们完成。这是因为,如果您将每个传入的 url 记录释放到一个线程,下一个 url 将被立即获取,并且很可能当您以相同的方式处理最后一个 url 时,即使您有剩余的线程,map() 函数也会返回在队列中进行处理。您将不可避免地遇到您提到的异常。

这里是一个示例,说明使用倒计时闩锁的可能性有多大。

 public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {

@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter) {

String url = value.toString();
StringTokenizer urls = new StringTokenizer(url);
Config.LoggerProvider = LoggerProvider.DISABLED;

//setting countdownlatch to urls.countTokens() to block off that many threads.
final CountDownLatch latch = new CountDownLatch(urls.countTokens());
while (urls.hasMoreTokens()) {
try {
word.set(urls.nextToken());
String currenturl = word.toString();
//create thread and fire for current URL here
thread = new URLProcessingThread(currentURL, latch);
thread.start();
} catch (Exception e) {
e.printStackTrace();
continue;
}

}

latch.await();//wait for 16 threads to complete execution
//sleep here for sometime if you wish

}

}

最后,在 URLProcessingThread 中,一旦 URL 被处理,就会减少闩锁计数器,

public class URLProcessingThread implments Runnable {
CountDownLatch latch;
URL url;
public URLProcessingThread(URL url, CountDownLatch latch){
this.latch = latch;
this.url = url;
}
void run() {
//process url here
//after everything finishes decrement the latch
latch.countDown();//reduce count of CountDownLatch by 1

}
}

您的代码可能存在问题:pile.addUrl(currenturl, output); 中,当你添加一个新的 url 时,同时所有 16 个线程都会得到更新(我不太确定),因为相同的 pile对象被传递给16个线程。您的网址有可能会被重新处理,或者您可能会产生其他一些副作用(对此我不太确定)。

其他建议:

此外,您可能希望使用

增加 map task 超时

mapred.task.timeout

(默认=600000 毫秒)= 10 分钟

Description: The number of milliseconds before a task will be terminated if it neither reads an input, writes an output, nor updates its status string.

您可以在 mapred-site.xml 中添加/覆盖此属性

关于java - 如何防止hadoop流关闭?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17751557/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com