Closed. This question needs to be more
focused。它当前不接受答案。
想要改善这个问题吗?更新问题,使它仅关注
editing this post的一个问题。
4年前关闭。
Improve this question
我正在一个项目中,我需要消耗大量记录,然后将这些记录发送到使用ZeroMQ的其他系统。
流程如下:
将来自多个线程的所有传入记录存储在CHM中。记录将以非常高的速度到达。
从每隔1分钟运行一次的后台线程,将这些记录从CHM发送到ZeroMQ服务器。
将每条记录发送到ZeroMQ服务器后,也将它们添加到重试存储桶中,这样,如果尚未收到该记录的确认,则可以在经过特定时间后重试该存储桶。
我们还有一个轮询器可运行线程,该线程从ZeroMQ服务器接收确认,告知已收到这些记录,因此,一旦我收到确认,便从重试存储桶中删除该记录,以使其不会被重试。
即使两次发送某些记录也可以,但是最好将其最小化。
我不确定在以下情况下将这种情况最小化的最佳方法是什么。
下面是我的
Processor
类,其中多个线程将调用
.add()
方法,从而以线程安全的方式填充
dataHolderByPartitionReference
CHM。然后,在
Processor
类的构造函数中,我启动后台线程,该线程每30秒运行一次,以通过调用
SendToZeroMQ
类将记录从同一CHM推送到一组ZeroMQ服务器,如下所示:
Processor
public class Processor {
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final AtomicReference<ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
new AtomicReference<>(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>());
private static class Holder {
private static final Processor INSTANCE = new Processor();
}
public static Processor getInstance() {
return Holder.INSTANCE;
}
private Processor() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
validateAndSendAllPartitions(dataHolderByPartitionReference
.getAndSet(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>()));
}
}, 0, 30, TimeUnit.SECONDS);
}
private void validateAndSendAllPartitions(
ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) {
// calling validateAndSend in parallel for each partition (which is map key)
// generally there will be only 5-6 unique partitions max
}
private void validateAndSend(final int partition,
final ConcurrentLinkedQueue<DataHolder> dataHolders) {
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
while (!dataHolders.isEmpty()) {
.........
.........
SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
}
// calling again with remaining values
SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
}
// called by multiple threads to populate dataHolderByPartitionReference CHM
public void add(final int partition, final DataHolder holder) {
// store records in dataHolderByPartitionReference in a thread safe way
}
}
下面是我的
SendToZeroMQ
类,该类将记录发送到一组ZeroMQ服务器,并根据确认传递相应地重试。
首先它将记录发送到ZeroMQ服务器。
然后,它将向retryBucket添加相同的记录,稍后将重试该记录,具体取决于是否收到确认。
在同一个类中,我启动了一个后台线程,该线程每1分钟运行一次以再次发送记录,这些记录仍在重试存储桶中。
同一类还启动了ResponsePoller
线程,该线程将永远运行,以查看已确认的记录(我们之前发送过的记录),因此,一旦记录被确认,ResponsePoller
线程将从retryBucket中删除这些记录,以便不要重试。
SendToZeroMQ
public class SendToZeroMQ {
// do I need these two ScheduledExecutorService or one is sufficient to start my both the thread?
private final ScheduledExecutorService executorServicePoller = Executors
.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final Cache<Long, byte[]> retryBucket = CacheBuilder.newBuilder().maximumSize(10000000)
.removalListener(RemovalListeners.asynchronous(new CustomListener(), executorService))
.build();
private static class Holder {
private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
}
public static SendToZeroMQ getInstance() {
return Holder.INSTANCE;
}
private SendToZeroMQ() {
executorServicePoller.submit(new ResponsePoller());
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Entry<Long, byte[]> entry : retryBucket.asMap().entrySet()) {
executeAsync(entry.getKey(), entry.getValue());
}
}
}, 0, 1, TimeUnit.MINUTES);
}
public boolean executeAsync(final long address, final byte[] encodedByteArray) {
Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
if (!liveSockets.isPresent()) {
return false;
}
return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
}
public boolean executeAsync(final long address, final byte[] encodedByteArray, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedByteArray);
boolean sent = msg.send(socket);
msg.destroy();
// add to retry bucket
retryBucket.put(address, encodedByteArray);
return sent;
}
public boolean executeAsync(final int partition,
final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
if (!liveSockets.isPresent()) {
return false;
}
Map<Long, byte[]> addressToencodedByteArray = encode(partition, clientKeyBytesAndProcessBytesHolder);
long address = addressToencodedByteArray.entrySet().iterator().next().getKey();
byte[] encodedByteArray = addressToencodedByteArray.entrySet().iterator().next().getValue();
return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
}
private Map<Long, byte[]> encode(final int partition,
final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
// this address will be unique always
long address = TestUtils.getAddress();
Frame frame = new Frame(............);
byte[] packedByteArray = frame.serialize();
// this map will always have one entry in it.
return ImmutableMap.of(address, packedByteArray);
}
public void removeFromRetryBucket(final long address) {
retryBucket.invalidate(address);
}
}
下面是我的
ResponsePoller
类,它等待所有其他记录已被另一个后台线程发送的记录的确认。如果收到确认,则将其从重试存储桶中删除,以使其不会被重试。
public class ResponsePoller implements Runnable {
private static final Random random = new Random();
private static final int listenerPort = 8076;
@Override
public void run() {
ZContext ctx = new ZContext();
Socket client = ctx.createSocket(ZMQ.PULL);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort);
PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};
while (!Thread.currentThread().isInterrupted()) {
// Tick once per second, pulling in arriving messages
for (int centitick = 0; centitick < 100; centitick++) {
ZMQ.poll(items, 10);
if (items[0].isReadable()) {
ZMsg msg = ZMsg.recvMsg(client);
Iterator<ZFrame> it = msg.iterator();
while (it.hasNext()) {
ZFrame frame = it.next();
try {
long address = TestUtils.getAddress(frame.getData());
// remove from retry bucket since we got the acknowledgment for this record
SendToZeroMQ.getInstance().removeFromRetryBucket(address);
} catch (Exception ex) {
// log error
} finally {
frame.destroy();
}
}
msg.destroy();
}
}
}
ctx.destroy();
}
}
问题:
我想从设计的角度来看是设计此问题的最佳方法是,以便我的所有逻辑都能无缝地工作吗?
我可以肯定的是,与我所拥有的相比,有一种更好的方法来设计此问题-哪种更好的方法可以是?
我是一名优秀的程序员,十分优秀!