gpt4 book ai didi

api - Salesforce EMP 连接器,一段时间后停止接收通知

转载 作者:行者123 更新时间:2023-12-04 03:14:22 28 4
gpt4 key购买 nike

我正在做一个POC来检查Streaming API的稳定性,POC如下

程序 1:订阅针对 Account 对象创建的 pushtopic程序 2:每 10 分钟间隔创建、更新和删除单个记录

这两个程序都持续运行超过 12 小时(过夜),之后我验证是否收到所有通知,发现一段时间后(在这种情况下将近 ~ 2 小时 45 分钟)没有通知收到了,我重复了两次,这两种情况都在一段时间后停止接收通知。

使用的测试代码

流式 API 客户端(使用 EMP 连接器)

public class SFPoc {

static Long count = 0L;
static Long Leadcount = 0L;

public static void main(String[] argv) throws Exception {
String userName = "<user_name>";
String password = "<pwd>";
String pushTopicName = "/topic/AccountPT";
String pushTopicNameLead = "/topic/Leadwhere";
long replayFrom = EmpConnector.REPLAY_FROM_EARLIEST;
String securityToken = "<token>";

BayeuxParameters custom = getBayeuxParamWithSpecifiedAPIVersion("37.0");
BayeuxParameters params = null;
try {
params = login(userName, password + securityToken, custom);
} catch (Exception e) {
e.printStackTrace();
}

Consumer<Map<String, Object>> consumer = event -> System.out.println(String.format("Received:\n%s ** Recieved at %s, event count total %s", event, LocalDateTime.now() , ++count));
Consumer<Map<String, Object>> consumerLead = event -> System.out.println(String.format("****** LEADS ***** Received:\n%s ** Recieved at %s, event count total %s", event, LocalDateTime.now() , ++Leadcount));

EmpConnector connector = new EmpConnector(params);

connector.start().get(10, TimeUnit.SECONDS);

TopicSubscription subscription = connector.subscribe(pushTopicName, replayFrom, consumer).get(10, TimeUnit.SECONDS);
TopicSubscription subscriptionLead = connector.subscribe(pushTopicNameLead, replayFrom, consumerLead).get(10, TimeUnit.SECONDS);

System.out.println(String.format("Subscribed: %s", subscription));
System.out.println(String.format("Subscribed: %s", subscriptionLead));
}

private static BayeuxParameters getBayeuxParamWithSpecifiedAPIVersion(String apiVersion) {
BayeuxParameters params = new BayeuxParameters() {

@Override
public String version() {
return apiVersion;
}

@Override
public String bearerToken() {
return null;
}

};
return params;
}

定期执行记录创建/更新/删除以生成事件的代码

import com.sforce.soap.enterprise.*;
import com.sforce.soap.enterprise.Error;
import com.sforce.soap.enterprise.sobject.Account;
import com.sforce.soap.enterprise.sobject.Contact;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;

import java.time.LocalDateTime;

public class SFDCDataAdjustment {

static final String USERNAME = "<username>";
static final String PASSWORD = "<pwd&securitytoken>";

static EnterpriseConnection connection;

static Long count = 0L;

public static void main(String[] args) {

ConnectorConfig config = new ConnectorConfig();
config.setUsername(USERNAME);
config.setPassword(PASSWORD);
//config.setTraceMessage(true);

try {

connection = Connector.newConnection(config);

// display some current settings
System.out.println("Auth EndPoint: "+config.getAuthEndpoint());
System.out.println("Service EndPoint: "+config.getServiceEndpoint());
System.out.println("Username: "+config.getUsername());
System.out.println("SessionId: "+config.getSessionId());

// run the different examples

while (true) {
createAccounts();
updateAccounts();
deleteAccounts();

Thread.sleep(1 * 10 * 60 * 1000);
}


} catch (ConnectionException e1) {
e1.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}

}

// queries and displays the 5 newest contacts
private static void queryContacts() {

System.out.println("Querying for the 5 newest Contacts...");

try {

// query for the 5 newest contacts
QueryResult queryResults = connection.query("SELECT Id, FirstName, LastName, Account.Name " +
"FROM Contact WHERE AccountId != NULL ORDER BY CreatedDate DESC LIMIT 5");
if (queryResults.getSize() > 0) {
for (int i=0;i<queryResults.getRecords().length;i++) {
// cast the SObject to a strongly-typed Contact
Contact c = (Contact)queryResults.getRecords()[i];
System.out.println("Id: " + c.getId() + " - Name: "+c.getFirstName()+" "+
c.getLastName()+" - Account: "+c.getAccount().getName());
}
}

} catch (Exception e) {
e.printStackTrace();
}

}

// create 5 test Accounts
private static void createAccounts() {

System.out.println("Creating a new test Account...");
Account[] records = new Account[1];

try {

// create 5 test accounts
for (int i=0;i<1;i++) {
Account a = new Account();
a.setName("OptyAccount "+i);
records[i] = a;
}

// create the records in Salesforce.com
SaveResult[] saveResults = connection.create(records);

// check the returned results for any errors
for (int i=0; i< saveResults.length; i++) {
if (saveResults[i].isSuccess()) {
System.out.println(i+". Successfully created record - Id: " + saveResults[i].getId() + "At " + LocalDateTime.now());
System.out.println("************Event Count************" + ++count);
} else {
Error[] errors = saveResults[i].getErrors();
for (int j=0; j< errors.length; j++) {
System.out.println("ERROR creating record: " + errors[j].getMessage());
}
}
}

} catch (Exception e) {
e.printStackTrace();
}

}

// updates the 5 newly created Accounts
private static void updateAccounts() {

System.out.println("Update a new test Accounts...");
Account[] records = new Account[1];

try {

QueryResult queryResults = connection.query("SELECT Id, Name FROM Account ORDER BY " +
"CreatedDate DESC LIMIT 1");
if (queryResults.getSize() > 0) {
for (int i=0;i<queryResults.getRecords().length;i++) {
// cast the SObject to a strongly-typed Account
Account a = (Account)queryResults.getRecords()[i];
System.out.println("Updating Id: " + a.getId() + " - Name: "+a.getName());
// modify the name of the Account
a.setName(a.getName()+" -- UPDATED");
records[i] = a;
}
}

// update the records in Salesforce.com
SaveResult[] saveResults = connection.update(records);

// check the returned results for any errors
for (int i=0; i< saveResults.length; i++) {
if (saveResults[i].isSuccess()) {
System.out.println(i+". Successfully updated record - Id: " + saveResults[i].getId() + "At " + LocalDateTime.now());
System.out.println("************Event Count************" + ++count);
} else {
Error[] errors = saveResults[i].getErrors();
for (int j=0; j< errors.length; j++) {
System.out.println("ERROR updating record: " + errors[j].getMessage());
}
}
}

} catch (Exception e) {
e.printStackTrace();
}

}

// delete the 5 newly created Account
private static void deleteAccounts() {

System.out.println("Deleting new test Accounts...");
String[] ids = new String[1];

try {

QueryResult queryResults = connection.query("SELECT Id, Name FROM Account ORDER BY " +
"CreatedDate DESC LIMIT 1");
if (queryResults.getSize() > 0) {
for (int i=0;i<queryResults.getRecords().length;i++) {
// cast the SObject to a strongly-typed Account
Account a = (Account)queryResults.getRecords()[i];
// add the Account Id to the array to be deleted
ids[i] = a.getId();
System.out.println("Deleting Id: " + a.getId() + " - Name: "+a.getName());
}
}

// delete the records in Salesforce.com by passing an array of Ids
DeleteResult[] deleteResults = connection.delete(ids);

// check the results for any errors
for (int i=0; i< deleteResults.length; i++) {
if (deleteResults[i].isSuccess()) {
System.out.println(i+". Successfully deleted record - Id: " + deleteResults[i].getId() + "At " + LocalDateTime.now());
System.out.println("************Event Count************" + ++count);
} else {
Error[] errors = deleteResults[i].getErrors();
for (int j=0; j< errors.length; j++) {
System.out.println("ERROR deleting record: " + errors[j].getMessage());
}
}
}

} catch (Exception e) {
e.printStackTrace();
}

}

进一步的更新得到了下面提到的错误,之后通知是

2017-03-09T19:30:28.346 错误 [com.salesforce.emp.connector.EmpConnector] - 连接失败,正在重新连接org.cometd.common.TransportException: {httpCode=503} 在 org.cometd.client.transport.LongPollingTransport$2.onComplete(LongPollingTransport.java:278) 在 org.eclipse.jetty.client.ResponseNotifier.notifyComplete(ResponseNotifier.java:193)

在重新连接之后也发生了握手,但错误似乎在重新订阅()中 EMP 连接器似乎由于某种原因无法重新订阅

注意我正在使用 EMP connetor 的“resubscribe-on-disconnect”分支

最佳答案

我们已确定在 403 案例中服务器端存在错误。 Streaming API 使用 session 路由 cookie,此 cookie 会定期过期。当它过期时, session 被路由到另一台服务器,并以 403 响应。在当前版本中,此 403 响应不包括连接建议,客户端不会尝试重新连接。这已得到修复,修复程序目前正在运行。我的理解是,这应该可以解决客户端出现的重新连接问题。

关于api - Salesforce EMP 连接器,一段时间后停止接收通知,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42432553/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com