gpt4 book ai didi

spring - 将 SSE-Emitter 对象保存到 MongoDB/Redis 中,从数据库中获取它并通过它发送事件

转载 作者:IT王子 更新时间:2023-10-29 06:13:21 27 4
gpt4 key购买 nike

我正在尝试使我的 REST API 无状态。为此,我需要的是将客户端的 SSE-Emitter 对象保存到 mongo 或 redis,以便其他实例可以集中访问它。

当前行为:

我能够将 SSE 发射器对象保存到 mongoDb,但我认为该对象正在以某种方式被修改,因此,在从 mongoDb 获取它之后,我无法向客户端发送事件。如果我在本地 map /列表中保存相同的发射器对象,则事件将成功发送。

预期行为:

我应该能够从 mongoDb 获取发射器对象并通过它向客户端发送 EventData。

源代码:

客户端订阅的 Controller :

@GetMapping("/memory/{userName}")
public SseEmitter handle(@PathVariable("userName") String userName) {
SseEmitter emitter = new SseEmitter();
try{
MongoSession session = new MongoSession();
session.setId(userName);
session.setAttribute("emitter", emitter);
mongoSessionRepo.save(session);
}catch(Exception e){
e.printStackTrace();
}
this.emitters.add(emitter);// adding it to list as well just for testing.
emitter.onCompletion(() -> this.emitters.remove(emitter));
emitter.onTimeout(() -> this.emitters.remove(emitter));

return emitter;
}

在 mongoDb 中表示文档的 MongoSession 类:

package ch.rasc.sse;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.session.ExpiringSession;

@Document(collection = "springMongoSession")
public class MongoSession implements ExpiringSession{

public static final int DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS = 1800;

/**
* MongoDB Object ID
*/
@Indexed(unique = true)
@Id
private String id;

public void setId(String id) {
this.id = id;
}
/**
* Session ID
*/
public static final String KEY_SESSION_ID = "_id";

/**
* Serialized session attributes
*/
private byte[] serializedAttributes;

/**
* Session attributes (not saved to MongoDB)
*/
private Map<String,Object> attributes;

/**
* Creation time (epoch in ms)
*/
private long creationTime;

/**
* Last accessed time (epoch in ms)
*/
private long lastAccessedTime;

/**
* Max inactive interval (sec)
*/
private int maxInactiveIntervalInSeconds;

/**
* Expire time (epoch in ms)
*/
@Indexed
private long expireTime;
public static final String KEY_EXPIRE_TIME = "expireTime";

/**
* Constructor
*/
public MongoSession() {
attributes = new HashMap<>();
creationTime = System.currentTimeMillis();
lastAccessedTime = creationTime;
maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;
updateExpireTime();
}

/**
* Constructor
*/
public MongoSession(String sessionId) {
this.id = sessionId;
//this.sessionId = sessionId;
attributes = new HashMap<>();
creationTime = System.currentTimeMillis();
lastAccessedTime = creationTime;
maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;
updateExpireTime();
}


public String getId() {
return id;
}

public void setLastAccessedTime(long lastAccessedTime) {
this.lastAccessedTime = lastAccessedTime;
updateExpireTime();
}


public long getCreationTime() {
return creationTime;
}

public long getLastAccessedTime() {
return lastAccessedTime;
}

public void setMaxInactiveIntervalInSeconds(int interval) {
maxInactiveIntervalInSeconds = interval;
updateExpireTime();
}

public int getMaxInactiveIntervalInSeconds() {
return maxInactiveIntervalInSeconds;
}

protected long getExpireTime() {
return expireTime;
}

private void updateExpireTime() {
expireTime = lastAccessedTime + maxInactiveIntervalInSeconds * 1000;
}

public boolean isExpired() {
long now = System.currentTimeMillis();
return expireTime <= now;
}

public <T> T getAttribute(String attributeName) {
return (T)attributes.get(attributeName);
}

public Set<String> getAttributeNames() {
return attributes.keySet();
}

public void setAttribute(String attributeName, Object attributeValue) {

attributes.put(attributeName, attributeValue);
}

public void removeAttribute(String attributeName) {
attributes.remove(attributeName);
}
/**
* Serialize session attributes
*/
public void serializeAttributes() {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(attributes);
oos.flush();
serializedAttributes = bos.toByteArray();
} catch (IOException e) {
//e.printStackTrace();
serializedAttributes = new byte[0];
}
}
public void serializeAttributesThis(Object attributeValue) {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(attributeValue);
oos.flush();
serializedAttributes = bos.toByteArray();
} catch (IOException e) {
//e.printStackTrace();
serializedAttributes = new byte[0];
}
}
/**
* Deserialize session attributes
*/
public void deserializeAttributes() {
try (ByteArrayInputStream bis = new ByteArrayInputStream(serializedAttributes);
ObjectInputStream ois = new ObjectInputStream(bis)) {
attributes = (Map<String,Object>)ois.readObject();
} catch (IOException | ClassNotFoundException e) {
//e.printStackTrace();
attributes = new HashMap<>();
}
}
}

根据以下请求,我想将事件数据发送回客户端:

    @RequestMapping("/qmevents/{sessionId}")
public void readQmEvents(@PathVariable("sessionId") String userName)
{
try{
System.out.println("Emitter Object:
"+mongoSessionRepo._getSession(userName));
System.out.println("Emitter Object:
"+mongoSessionRepo._getSession(userName).getAttributeNames());
System.out.println("Emitter Object:
"+mongoSessionRepo._getSession(userName)
.getAttribute("emitter").toString());
sessionRepo.getSessionAttributes(userName, "emitter");
SseEmitter emitter =mongoSessionRepo._getSession(userName).
getAttribute("emitter");
MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heap = memBean.getHeapMemoryUsage();
MemoryUsage nonHeap = memBean.getNonHeapMemoryUsage();
MemoryInfo mi = new MemoryInfo(heap.getUsed(), nonHeap.getUsed());
mi.setForUserName("Event raised by QM");
System.out.println("Emitter from map:
"+SSEControllerPerUser.emitters.get(0));
SSEControllerPerUser.emitters.get(0).send(mi);
//emitter.send(mi);
}catch(Exception e){
e.printStackTrace();
}

}

最佳答案

子类化 Spring SseEmitter(请参阅下文)并使用该组件,我已将此解决方案用于您描述的类似场景(服务器崩溃)。

public class SerializableSSE extends SseEmitter implements Serializable{

public SerializableSSE() {
}

public SerializableSSE(Long timeout) {
super(timeout);
}
}

希望对您有所帮助!

关于spring - 将 SSE-Emitter 对象保存到 MongoDB/Redis 中,从数据库中获取它并通过它发送事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45036309/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com