gpt4 book ai didi

java - FLUME非法状态异常: begin() called when transaction is OPEN

转载 作者:行者123 更新时间:2023-12-02 17:53:43 24 4
gpt4 key购买 nike

我编写了自定义水槽,名为MySink,其处理方法如下面的第一个代码片段所示。我收到如下 IllegalStateException(详细的堆栈跟踪可在下面的第二个片段中找到):

Caused by: java.lang.IllegalStateException: begin() called when transaction is OPEN!

问题:在编写 process 方法时,我遵循了 Flume 代码库中的 KafkaSink 和类似的现有接收器实现,并且我对现有接收器应用了完全相同的事务处理逻辑。您能告诉我我的处理方法有什么问题吗?我该如何解决这个问题?

PROCESS方法(我已经标记了抛出异常的位置):

@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
Event event = null;

try {
LOG.info(getName() + " BEFORE txn.begin()");
//!!!! EXCEPTION IS THROWN in the following LINE !!!!!!
txn.begin();
LOG.info(getName() + " AFTER txn.begin()");
LOG.info(getName() + " BEFORE ch.take()");
event = ch.take();
LOG.info(getName() + " AFTER ch.take()");

if (event == null) {
// No event found, request back-off semantics from the sink runner
LOG.info(getName() + " - EVENT is null! ");
return Status.BACKOFF;
}

Map<String, String> keyValueMapInTheMessage = event.getHeaders();
if (!keyValueMapInTheMessage.isEmpty()) {
mDBWriter.insertDataToDB(keyValueMapInTheMessage);
}

LOG.info(getName() + " - EVENT: " + EventHelper.dumpEvent(event));
if (txn != null) {
txn.commit();
}

} catch (Exception ex) {
String errMsg = getName() + " - Failed to publish events. Exception: ";
LOG.info(errMsg);
status = Status.BACKOFF;
if (txn != null) {
try {
txn.rollback();
} catch (Exception e) {
LOG.info(getName() + " - EVENT: " + EventHelper.dumpEvent(event));
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errMsg, ex);
} finally {
if (txn != null) {
txn.close();
}
}

return status;
}

异常堆栈:

2016-01-22 14:01:15,440 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]  

Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: MySink - Failed to publish events.
Exception: at com.XYZ.flume.maprdb.MySink.process(MySink.java:116)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: begin() called when transaction is OPEN!
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
at com.XYZ.flume.maprdb.MySink.process(MySink.java:82)
... 3 more

最佳答案

if (event == null) {
// No event found, request back-off semantics from the sink runner
LOG.info(getName() + " - EVENT is null! ");
return Status.BACKOFF;
}

此代码会导致此问题。当 event 为 null 时,直接返回即可。但是,正确的做法是提交或回滚。一个事务应该经历三个阶段:begin、commit 或 rollback、最后 close。我们可以看下面的源码来了解它是如何实现的.

基本 channel 语义:

  public Transaction getTransaction() {

if (!initialized) {
synchronized (this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}

BasicTransactionSemantics transaction = currentTransaction.get();
if (transaction == null || transaction.getState().equals(
BasicTransactionSemantics.State.CLOSED)) {
transaction = createTransaction();
currentTransaction.set(transaction);
}
return transaction;
}

当currentTransaction为null或者State为close时,channel会创建一个新的,否则返回旧的。此异常不会立即发生。当第一次执行process方法时,你得到一个新的事务,但事件为空,你只是返回并最终关闭,close方法由于其实现而不起作用。所以第二次执行process方法时,你不需要没有得到新的交易,而是旧的交易。下面的代码是关于交易如何实现的。

基本事务语义:

  protected BasicTransactionSemantics() {
state = State.NEW;
initialThreadId = Thread.currentThread().getId();
}

public void begin() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"begin() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.NEW),
"begin() called when transaction is " + state + "!");

try {
doBegin();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
state = State.OPEN;
}

public void commit() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"commit() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"commit() called when transaction is %s!", state);

try {
doCommit();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
state = State.COMPLETED;
}

public void rollback() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"rollback() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"rollback() called when transaction is %s!", state);

state = State.COMPLETED;
try {
doRollback();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
}

public void close() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"close() called from different thread than getTransaction()!");
Preconditions.checkState(
state.equals(State.NEW) || state.equals(State.COMPLETED),
"close() called when transaction is %s"
+ " - you must either commit or rollback first", state);

state = State.CLOSED;
doClose();
}

创建时,状态是新的。

开始时,状态必须是新的,然后状态变为开放。

提交或回滚时,状态必须是打开的,然后状态才变为完成。

关闭时,状态必须完成,然后状态变为关闭。

所以当你以正确的方式执行close方法时,下次你会得到一个新的交易,否则旧的状态一定不是新的,所以你不能执行transaction.begin(),它需要一个新的。

关于java - FLUME非法状态异常: begin() called when transaction is OPEN,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34947836/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com