gpt4 book ai didi

Java JAX-RS 网络服务 : adding nodes to JAXB XML result as threads complete

转载 作者:可可西里 更新时间:2023-11-01 17:14:18 26 4
gpt4 key购买 nike

我用 Jersey 编写了一个 JAX-RS 网络服务,它从不同的网站查询价格,并通过 JAXB 注释类以 XML 形式返回结果。不幸的是,有些网站最多需要 15 秒才能做出响应,因此我使用多个线程来查询这些价格。

我现在想为这个网络服务编写一个客户端,我的网络用户不想在点击“搜索”后等待 30 秒以获取结果,所以我的想法是动态更新结果表作为来 self 的 JAX-RS 网络服务回来了。

30 秒后我的网络服务应该超时并关闭 <result> -元素或所有线程完成后。

现在我的网络服务运行所有线程并在所有 trhead 完成后返回结果,我想在结果出现时将结果动态添加到 XML 输出,我该如何实现?

XML 响应的结构是:

<result>
<articles>
<article>
content of article
</article>
</articles>
As the webservice gets results from websites it adds new articles to the XML
</result>

请求 Controller .java

@Path("/request")
public class RequestController {

@GET
@Produces("application/xml")
public Response getRequest(@QueryParam("part") String part) {
response = new Response();
driverController = new DriverController(this.response, this.part);
this.response = driverController.query();
return this.response;
}
}

DriverController.java

public class DriverController {


public Response query() {
CompletionService<Deque<Article>> completionService = new ExecutorCompletionService<Deque<Article>>(
Worker.getThreadPool());
final Deque<Article> articleQueue = new LinkedList<Article>();

int submittedTasks = 0;

// This threadwill take about 4 seconds to finish
Driver driverA = new DriverA(this.part,
this.currency, this.language);

// This thread will take about 15 seconds to finish
Driver driverN = new DriverN(this.part,
this.currency, this.language);

completionService.submit(driverA);
submittedTasks++;
completionService.submit(driverN);
submittedTasks++;

for (int i = 0; i < submittedTasks; i++) {
log.info("Tasks: " + submittedTasks);
try {
Future<Deque<Article>> completedFuture = completionService.take();
try {
Deque<Article> articleQueueFromThread = completedFuture.get();
if (articleQueueFromThread != null) {
articleQueue.addAll(articleQueueFromThread);
response.setStatus("OK");
}
} catch (ExecutionException e) {
log.error(e.getMessage());
e.printStackTrace();
}
} catch (InterruptedException e) {
log.error(e.getMessage());
e.printStackTrace();
}
}
for (Article article : articleQueue) {
this.response.addArticle(article);
}
return this.response;
}
}

响应.java

@XmlRootElement
public class Response {

Queue<Article> queue = new ConcurrentLinkedQueue<Article>();
private String status;
private String code;
private String message;
private List<Article> articles = new ArrayList<Article>();

public Response(){

}

public void setMessage(String message) {
this.message = message;
}
@XmlAttribute
public String getMessage() {
return message;
}
public void setStatus(String status) {
this.status = status;
}
@XmlAttribute
public String getStatus() {
return status;
}
public void setCode(String code) {
this.code = code;
}
@XmlAttribute
public String getCode() {
return code;
}

public void addArticle(Article article) {
this.articles.add(article);
System.out.println("Response: ADDED ARTICLE TO RESPONSE");
}
@XmlElement(name = "article")
@XmlElementWrapper(name = "articles")
public List<Article> getArticles() {
return articles;
}

}

最佳答案

我开始调整您的代码来执行此操作,但我认为编写一个独立示例更容易。该示例启动一个带有单个资源类的 Grizzly+Jersey 服务器。资源上的 GET 生成三个线程,它们在返回某些对象之前延迟 2、4 和 6 秒。服务器启动后,另一个线程向服务器发出请求。当您运行它时,您可以清楚地看到请求者收到了 XML block ,因为相应的线程在服务器中完成了它们的工作。它没有做的一件事是将单独交付的 XML block 包装在单个根元素中,因为这应该是相对微不足道的。

完整的可执行源代码如下,如果你有 maven 和 git,你可以从 github 克隆它并运行它:

git clone git://github.com/zzantozz/testbed.git tmp
cd tmp
mvn compile exec:java -Dexec.mainClass=rds.jersey.JaxRsResource -pl jersey-with-streaming-xml-response

来源:

import com.sun.grizzly.http.SelectorThread;
import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory;
import javax.ws.rs.*;
import javax.ws.rs.core.StreamingOutput;
import javax.xml.bind.*;
import javax.xml.bind.annotation.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;

@Path("/streaming")
public class JaxRsResource {
private static ExecutorService executorService = Executors.newFixedThreadPool(4);
private static int fooCounter;
private Marshaller marshaller;

public JaxRsResource() throws JAXBException {
marshaller = JAXBContext.newInstance(Foo.class).createMarshaller();
marshaller.setProperty("jaxb.fragment", Boolean.TRUE);
}

@GET
@Produces("application/xml")
public StreamingOutput streamStuff() {
System.out.println("Got request for streaming resource; starting delayed response threads");
final List<Future<List<Foo>>> futureFoos = new ArrayList<Future<List<Foo>>>();
futureFoos.add(executorService.submit(new DelayedFoos(2)));
futureFoos.add(executorService.submit(new DelayedFoos(4)));
futureFoos.add(executorService.submit(new DelayedFoos(6)));
return new StreamingOutput() {
public void write(OutputStream output) throws IOException {
for (Future<List<Foo>> futureFoo : futureFoos) {
writePartialOutput(futureFoo, output);
output.write("\n".getBytes());
output.flush();
}
}
};
}

private void writePartialOutput(Future<List<Foo>> futureFoo, OutputStream output) {
try {
List<Foo> foos = futureFoo.get();
System.out.println("Server sending a chunk of XML");
for (Foo foo : foos) {
marshaller.marshal(foo, output);
}
} catch (JAXBException e) {
throw new IllegalStateException("JAXB couldn't marshal. Handle it.", e);
} catch (InterruptedException e) {
throw new IllegalStateException("Task was interrupted. Handle it.", e);
} catch (ExecutionException e) {
throw new IllegalStateException("Task failed to execute. Handle it.", e);
}
}

class DelayedFoos implements Callable<List<Foo>> {
private int delaySeconds;

public DelayedFoos(int delaySeconds) {
this.delaySeconds = delaySeconds;
}

public List<Foo> call() throws Exception {
Thread.sleep(delaySeconds * 1000);
return Arrays.asList(new Foo(fooCounter++), new Foo(fooCounter++), new Foo(fooCounter++));
}
}

public static void main(String[] args) throws IOException {
System.out.println("Starting Grizzly with the JAX-RS resource");
final String baseUri = "http://localhost:9998/";
final Map<String, String> initParams = new HashMap<String, String>();
initParams.put("com.sun.jersey.config.property.packages", "rds.jersey");
SelectorThread threadSelector = GrizzlyWebContainerFactory.create(baseUri, initParams);
System.out.println("Grizzly started");
System.out.println("Starting a thread to request the streamed XML");
executorService.submit(new HttpRequester(baseUri + "streaming"));
}
}

@XmlRootElement
class Foo {
@XmlElement
private int id;

Foo() {}

public Foo(int id) {
this.id = id;
}
}

class HttpRequester implements Runnable {
private String url;

public HttpRequester(String url) {
this.url = url;
}

public void run() {
try {
System.out.println("Doing HTTP GET on " + url);
HttpURLConnection urlConnection = (HttpURLConnection) new URL(url).openConnection();
BufferedReader in = new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
String line;
while ((line = in.readLine()) != null) {
System.out.println("Client got: " + line);
}
System.exit(0);
} catch (IOException e) {
throw new IllegalStateException("Some bad I/O happened. Handle it.", e);
}
}
}

需要注意的要点/差异:

  1. 从您的资源方法返回响应表明整个响应都包含在该对象中并且不允许对响应进行增量更新。而是返回一个 StreamingOutput。这告诉 Jersey 你将发回一个数据流,你可以随意添加数据直到完成。 StreamingOutput 使您可以访问 OutputStream,这是您用来发送增量更新的内容,也是整个过程的关键。当然,这意味着您必须自己处理编码。如果您一次返回整个响应,Jersey 只能进行编码(marshal)处理。
  2. 由于 OutputStream 是您一次发回一点数据的方式,因此您要么必须在 JAX-RS 资源中进行线程处理,要么将 OutputStream 向下传递到您的 DriverController 并在那里写入数据。
  3. 如果您想强制它立即发送数据,请务必在 OutputStream 上调用 flush()。否则,在填满任何内部缓冲区之前,不会向客户端发送任何内容。请注意,自己调用 flush() 会绕过缓冲区的用途,并使您的应用更加活跃。

总而言之,要将此应用到您的项目,首先要做的是更改您的资源方法以返回 StreamingOutput 实现并从该实现内部调用您的 DriverController,将 OutputStream 传递给 DriverController。然后在 DriverController 中,当您从线程中获取一些 Articles 时,不是将其添加到队列中以备后用,而是立即将其写入 OutputStream。

关于Java JAX-RS 网络服务 : adding nodes to JAXB XML result as threads complete,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6807897/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com