gpt4 book ai didi

Java设置多线程处理

转载 作者:行者123 更新时间:2023-11-30 05:29:29 24 4
gpt4 key购买 nike

全部

我正在尝试检查包含从 1 到 N 的数字的某些数据集的多线程处理。例如,我想对所有这些数字求和:

1)保存总和(结果)。

public class ResultHolder {

public static AtomicLong total_time = new AtomicLong(0);

public static Long sum = 0l;

public Long getSum() {

return sum;

} // END: getSum()

@PostConstruct
public void init() {

} // END: init()

public void setSum(Long sum) {

this.sum = sum;

} // END: setSum()

public void printSum() {
System.out.println("Sum is " + sum);
}

public void clearSum() {
sum = 0l;
}

} // ENDC: ResultHolder

2)处理部分号码集合:

public class SumProcessor {

private static int global_id = 0;
final public int processor_id;
private final ArrayList<Long> numbers;
private Long processor_sum = 0l;

@Autowired
private final ResultHolder sumHoldder = null;

public SumProcessor(ArrayList<Long> numbers) {

this.numbers = numbers;

processor_id = ++global_id;

} // END: constructor

public void work() throws Exception {

long t1 = new java.util.Date().getTime();

int i = 0;

try {

if (numbers == null) throw new Exception("Не удалось получить массив чисел.");
for (i = 0; i < numbers.size(); i++) {
Long o = null;
try {
o = numbers.get(i);
if (o == null) throw new Exception("no number");
} catch (Exception e) {
throw new Exception("Ошибка извлечения числа из массива: " + e);
}
processor_sum += o;
} // END: for

if (sumHoldder == null) throw new Exception("No sum holder");

synchronized (sumHoldder) {
sumHoldder.setSum(sumHoldder.getSum() + processor_sum);
}

long t2 = new java.util.Date().getTime();

this.sumHoldder.total_time.addAndGet(t2 - t1);

} catch (Exception e) {

System.out.println("Work() error (" + i + ") " + e);

}

return;

} //END: method1

@PostConstruct
public void init() {

System.out.println("Initializated B: " + this);

} //END: method2

@PreDestroy
public void destroy() {

System.out.println("Destroy B: " + this);

} //END: method3

@Override
public String toString() {
return "" +
"Processor " + processor_id + " " +
"contain " + numbers.size() + " " +
"numbers from " + numbers.get(0) +
" to " + numbers.get(numbers.size() - 1);

} //END: toString()

} //END: class SumProcessor

3)非常简单的分析器(计算处理时间)

@Aspect
public class MethodLoggerBasic {

@Pointcut("execution(* *.work(..))")
void around_work() {};

@Around("around_work()")
public void logMethodName(ProceedingJoinPoint joinPoint) throws Throwable {

long starttime = new Date().getTime();
joinPoint.proceed();
long endtime = new Date().getTime();
long time = endtime - starttime;
MainApp.time += time;

} // END:
} // ENDC

4)主程序(可以开始线性或并行处理)

public class MainApp {

static AnnotationConfigApplicationContext context;

public static long time = 0l;
public final static int SIZE = 40_000_000;
public final static int DIVIDE_FACTOR = 4;
public static ArrayList<Long>[] numbers = new ArrayList[DIVIDE_FACTOR];

public static ArrayList<SumProcessor> processors = new ArrayList<>();

public static void main(String[] args) throws Exception {

context = new AnnotationConfigApplicationContext(myConfig.class);

// form 4 datasets

int part_size = SIZE / DIVIDE_FACTOR;

int i;
int j;

for (j = 0; j < DIVIDE_FACTOR; j++) {
numbers[j] = new ArrayList<>();
for (i = 0; i < (int) part_size; i++) {
numbers[j].add(((j * part_size) + i + 1l));
}
}

// create 4 processors (bean)

for (i = 0; i < DIVIDE_FACTOR; i++) {
SumProcessor bean = context.getBean(SumProcessor.class, numbers[i]);
if (bean == null) throw new Exception("Error recive bean SumProcessor.class");
processors.add(bean);
}

// creates 4 threads fro processors
thread_process thread1 = new thread_process();
thread_process thread2 = new thread_process();
thread_process thread3 = new thread_process();
thread_process thread4 = new thread_process();

ResultHolder a;

a = context.getBean(ResultHolder.class);

try {

boolean isByPool = true; // flag

time = 0;

if (isByPool) {

System.out.println("-------------------");
System.out.println("Multithread compute");
System.out.println("-------------------");
ExecutorService pool = new ThreadPoolExecutor(
4,
4,
0,
TimeUnit.MICROSECONDS,
new ArrayBlockingQueue<>(4)
);

List<Callable<Boolean>> tasks = new ArrayList();

tasks.add(thread1);
tasks.add(thread2);
tasks.add(thread3);
tasks.add(thread4);

pool.invokeAll(tasks);

pool.shutdown();

pool.awaitTermination(60, TimeUnit.SECONDS);

} else {

thread1.start();
thread2.start();
thread3.start();
thread4.start();

thread1.join();
thread2.join();
thread3.join();
thread4.join();

}

a.printSum();
a.clearSum();

System.out.println("total time is " + a.total_time);
System.out.println("basic time is " + MainApp.time);

System.out.println("-------------");
System.out.println("Single thread");
System.out.println("-------------");

ArrayList<Long> numbers_tolal = new ArrayList<>();
for (i = 0; i < SIZE; i++) {
numbers_tolal.add((i + 1l));
}

SumProcessor sumProcessor = context.getBean(SumProcessor.class, numbers_tolal);

a.total_time.set(0l);
time = 0l;

sumProcessor.work();

a.printSum();

System.out.println("total time is " + a.total_time);
System.out.println("basic time is " + MainApp.time);

} catch (Exception e) {

throw new Exception("MainApp error: " + e);

}

context.close();

} // END: main

} // END: class MainApp

5)线程进程:

公共(public)类 thread_process 扩展了 Thread 实现 Callable、Runnable {

static int index = 0;

@Override
public void run() {

try {

SumProcessor next = MainApp.processors.get(index++);

if (next == null) {

System.out.println("Нет процессора");

System.exit(-1);

}

next.work();

System.out.println("Thread " + this + " complete!");

} catch (Exception e) {

System.out.println("Error in thread " + this + ": " + e);

}

} //END: run()

@Override
public Boolean call() throws Exception {

run();

return true;

} //END: call()
}; //END: class thread_process

输出为:

Initializated B: Processor 1 contain 10000000 numbers from 1 to 10000000
Initializated B: Processor 2 contain 10000000 numbers from 10000001 to 20000000
Initializated B: Processor 3 contain 10000000 numbers from 20000001 to 30000000
Initializated B: Processor 4 contain 10000000 numbers from 30000001 to 40000000
-------------------
Multithread compute
-------------------
Thread Thread[Thread-3,5,main] complete!
Thread Thread[Thread-4,5,main] complete!
Thread Thread[Thread-2,5,main] complete!
Thread Thread[Thread-1,5,main] complete!
Sum is 800000020000000
total time is 11254
basic time is 11254
-------------
Single thread
-------------
Initializated B: Processor 5 contain 40000000 numbers from 1 to 40000000
Sum is 800000020000000
total time is 6995
basic time is 6995

有没有一种方法可以使其并行比线性更快?或者我也许不需要 fork 这个任务?或者也许我的分析器不太好......

GitHub project

最佳答案

您正在尝试使用多线程执行顺序任务,这不是多线程的正确使用。在这里,您有一个资源需要执行一些工作。您正在使用多个线程来分配该工作,但同时,当另一个线程正在使用该资源时,您会阻塞一个线程。那么,如果您不希望工作线程并行访问资源,为什么首先要有工作线程呢?

如果没有必要,您可以删除数据集的 Set 实现并使用 List 或 Arrays,您可以在其中使用索引访问元素,而不会阻塞工作线程。

<小时/>

更新 1:只需在 pool.shutdown() 调用后添加一行即可。

pool.shutdown(); // starts thread shutdown, or force execution of worker threads
pool.awaitTermination(60, TimeUnit.SECONDS); // blocks main thread until thread pool finishes
// ...
// now you can do your single thread task

此外,不要创建太多线程,因为单个线程的速度足以处理数百万个数组元素。

<小时/>

更新 2:所以,我不知道为什么,但将单个线程放在 try block 之外似乎得到了预期的结果。

public class MainApp {static AnnotationConfigApplicationContext context;

public static long time = 0;
public final static int SIZE = 28_000_000;
public final static int DIVIDE_FACTOR = 4;
public static ArrayList<Long>[] numbers = new ArrayList[DIVIDE_FACTOR];

public static ArrayList<SumProcessor> processors = new ArrayList<>();

public static void main(String[] args) throws Exception {

context = new AnnotationConfigApplicationContext(AppConfig.class);

ResultHolder a = context.getBean(ResultHolder.class);

// form 4 datasets

int part_size = SIZE / DIVIDE_FACTOR;

int i;
int j;

for (j = 0; j < DIVIDE_FACTOR; j++) {
numbers[j] = new ArrayList<>(part_size);
for (i = 0; i < (int) part_size; i++) {
numbers[j].add(((j * part_size) + i + 1l));
}
}

// create 4 processors (bean)

for (i = 0; i < DIVIDE_FACTOR; i++) {
SumProcessor bean = context.getBean(SumProcessor.class, numbers[i]);
if (bean == null) throw new Exception("Error receive bean SumProcessor.class");
processors.add(bean);
}

// creates 4 threads fro processors
thread_process thread1 = new thread_process();
thread_process thread2 = new thread_process();
thread_process thread3 = new thread_process();
thread_process thread4 = new thread_process();

try {
boolean isByThread = true; // flag
time = 0;
System.out.println("-------------------");
System.out.println("Multithread compute");
System.out.println("-------------------");
ExecutorService pool = new ThreadPoolExecutor(
4,
4,
0,
TimeUnit.MICROSECONDS,
new LinkedBlockingDeque<>(4) // or ArrayBlockingDeque<>(4)
);
List<Callable<Boolean>> tasks = new ArrayList();
tasks.add(thread1);
tasks.add(thread2);
tasks.add(thread3);
tasks.add(thread4);
List<Future<Boolean>> futures = pool.invokeAll(tasks);
pool.shutdown();
pool.awaitTermination(60, TimeUnit.SECONDS);
System.out.println("Time is: " + time);

a.printSum();
a.clearSum();
time = 0;

} catch (Exception e) {
throw new Exception("MainApp error: " + e);

} // <---- moved single thread out of try block

ArrayList<Long> numbers_total = new ArrayList<>(SIZE);
for (i = 0; i < SIZE; i++) {
numbers_total.add((i + 1l));
}

System.out.println("-------------");
System.out.println("Single thread");
System.out.println("-------------");
SumProcessor sumProcessor = context.getBean(SumProcessor.class, numbers_total);
sumProcessor.work();
System.out.println("Time is: " + time);
a.printSum();
a.clearSum();
time = 0;

context.close();
} // END: main
}

输出:

Initialized B: Processor 1 contain 7000000 numbers from 1 to 7000000
Initialized B: Processor 2 contain 7000000 numbers from 7000001 to 14000000
Initialized B: Processor 3 contain 7000000 numbers from 14000001 to 21000000
Initialized B: Processor 4 contain 7000000 numbers from 21000001 to 28000000
-------------------
Multithread compute
-------------------
Thread[Thread-3,5,main] complete task.
Thread[Thread-2,5,main] complete task.
Thread[Thread-1,5,main] complete task.
Thread[Thread-4,5,main] complete task.
Time is: 5472
Sum is 392000014000000
-------------
Single thread
-------------
Initialized B: Processor 5 contain 28000000 numbers from 1 to 28000000
Time is: 10653
Sum is 392000014000000

输出[逆序]:

-------------
Single thread
-------------
Initialized B: Processor 1 contain 28000000 numbers from 1 to 28000000
Time is: 2265
Sum is 392000014000000
Initialized B: Processor 2 contain 7000000 numbers from 1 to 7000000
Initialized B: Processor 3 contain 7000000 numbers from 7000001 to 14000000
Initialized B: Processor 4 contain 7000000 numbers from 14000001 to 21000000
Initialized B: Processor 5 contain 7000000 numbers from 21000001 to 28000000
-------------------
Multithread compute
-------------------
Thread[Thread-2,5,main] complete task.
Thread[Thread-4,5,main] complete task.
Thread[Thread-1,5,main] complete task.
Thread[Thread-3,5,main] complete task.
Time is: 2115
Sum is 392000014000000

关于Java设置多线程处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57869997/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com