gpt4 book ai didi

Cassandra 抛出 NoHostAvailableException

转载 作者:行者123 更新时间:2023-12-02 23:29:55 27 4
gpt4 key购买 nike

我使用以下代码将我的 .net 客户端(基于 CQL)连接到 3 节点 Cassandra 集群。我以 30 条记录/秒的速度(从 RabbitMQ)获取数据,并且它们顺利地存储在 cassandra 中,最多可达 800-900 行。但在那之后我收到了以下异常。谁能告诉我可以进行哪些优化/更改来避免此异常。我在任何地方都找不到此问题的具体解决方案。

错误: 错误错误日志 - Cassandra GetCWCRow 函数连接中出现错误:尝试查询的主机均不可用(已尝试:X.X.X.201:9042、X.X.X.200:9042、X.X.X.X:9042 )

代码:

using Cassandra;
using Consumer;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;



namespace RabbitMqCarWaleUserTracking
{
class DataAccessCassandra
{

public bool InsertCookieLogData(string cwc, string page_uri)
{
try
{
Logs.WriteInfoLog("Cassandra InsertCookieLogData a Function called");
Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
string pageCategory = string.Empty;
try
{

if ((Regex.IsMatch(page_uri, "/newcars/upcomingcars", RegexOptions.IgnoreCase)))
{
pageCategory = "upcomingCars";
}
else
if ((Regex.IsMatch(page_uri, "/newcars/dealers/newCarDealerShowroom", RegexOptions.IgnoreCase)) || (Regex.IsMatch(page_uri, "/newcars/dealers/listnewcardealersbycity", RegexOptions.IgnoreCase)
|| (Regex.IsMatch(page_uri, "/newcars/dealers/dealerdetails", RegexOptions.IgnoreCase))))
{
pageCategory = "newcarsDealers";
}
else
if ((Regex.IsMatch(page_uri, "/offers", RegexOptions.IgnoreCase)) || (Regex.IsMatch(page_uri, "/alloffers", RegexOptions.IgnoreCase)))
{
pageCategory = "offers";
}
else
if ((Regex.IsMatch(page_uri, "/dealer/testdrive", RegexOptions.IgnoreCase)))
{
pageCategory = "dealerTestDrive";
}

if (pageCategory != string.Empty)
{
Row result = session.Execute("select logdate from pageWiseCookieLog where cwc ='" + cwc + "' and page_uri ='" + pageCategory + "' and logdate= '" + DateTime.Today.ToString("yyyy-MM-dd") + "'").FirstOrDefault();
if (result == null)
{
session.Execute("insert into pageWiseCookieLog (cwc, page_uri, logdate) values ('" + cwc + "' , '" + pageCategory + "' , '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
session.Execute("insert into pageWiseCookieLogByld (cwc, page_uri, logdate) values ('" + cwc + "' , '" + pageCategory + "' , '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
session.Dispose();
cluster.Dispose();
return true;
}
}
else
{
//don't want to store the data for rest of the page category but need to return true
session.Dispose();
cluster.Dispose();
return true;
}
}
catch (Exception ex)
{
string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
Logs.WriteErrorLog("error in Cassandra InsertCookieLogData function with cwc :" + cwc + "error is :" + ex.Message);
SendMail.HandleException(ex, subject);
session.Dispose();
cluster.Dispose();
}
}
catch (Exception ex)
{
string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
Logs.WriteErrorLog("error in Cassandra InsertCookieLogData function connection :" + ex.Message);
SendMail.HandleException(ex, subject);
}

return false;
}

public string GetCWCRow(string cwc, int index, string mobileId)
{
try
{
Logs.WriteInfoLog("Cassandra GetCWCRow Function called");
Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
try
{
Row result = session.Execute("select cur_visit_id from usertracking where cwc ='" + cwc + "'").FirstOrDefault();

if (result != null)
{
session.Dispose();
cluster.Dispose();
return result[0].ToString();
}

}
catch (Exception ex)
{
string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
Logs.WriteErrorLog("error in Cassandra GetCWCRow function with cwc :" + cwc + "error is :" + ex.Message);
SendMail.HandleException(ex, subject);
session.Dispose();
cluster.Dispose();
}

}
catch (Exception ex)
{
string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
Logs.WriteErrorLog("error in Cassandra GetCWCRow Function Connection :" + ex.Message);
SendMail.HandleException(ex, subject);
}

return string.Empty;
}

public bool InsertCWCRecords(string cwv, Cut_Case caseType, int index, string mobileId)
{
try
{
Logs.WriteInfoLog("Cassandra InsertCWCRecords function called for case:" + caseType);
Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
try
{

bool _isProcessed = false;
string visitCount = "";
string[] leadParameters = cwv.Split('.');
string cwc = leadParameters[0];
string visitId = leadParameters[1];
string visitStartTime = leadParameters[2];
string visitPrevPageTime = leadParameters[3];
string visitLastPageTime = leadParameters[4];

if (leadParameters.Length == 6)
{
visitCount = leadParameters[5];
}
string TOT_TIME_SPENT = (Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime)).ToString();

if ((int)caseType == 1) //to enter new cwc data in summary table
{
session.Execute("insert into usertracking (cwc, cur_visit_id, cur_visit_last_ts, tot_page_view, tot_time_spent, tot_visit_count, cur_visit_datetime) values ('" + cwc + "' , '" + visitId + "' ," + visitStartTime + "," + "1" + "," + TOT_TIME_SPENT + "," + "1" + ", '" + DateTime.Today.ToString("yyyy-MM-dd") + "' )");
_isProcessed = true;
}
if ((int)caseType == 2) //if cwc exits and visit id is same
{
Row result = session.Execute("select tot_page_view, tot_time_spent from usertracking where cwc ='" + cwc + "'").FirstOrDefault();
int page_cnt_val = int.Parse(result[0].ToString()) + 1;
Int64 time_spt_val = Int64.Parse(result[1].ToString()) + Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime);
session.Execute("update usertracking SET cur_visit_last_ts = " + visitLastPageTime + ", tot_page_view = " + page_cnt_val + ", tot_time_spent = " + time_spt_val + " WHERE cwc = '" + cwc.Trim() + "'");
_isProcessed = true;
}
if ((int)caseType == 3) //if cwc exits ans visit id is different
{
Row result = session.Execute("select tot_page_view, tot_time_spent, tot_visit_count, cur_visit_last_ts, cur_visit_datetime from usertracking where cwc = '" + cwc + "'").First();
int page_cnt_val = int.Parse(result[0].ToString()) + 1;
Int64 time_spt_val = Int64.Parse(result[1].ToString()) + Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime);
int visit_val = int.Parse(result[2].ToString()) + 1;
Int64 prev_visit_ts_val = Int64.Parse(result[3].ToString());
String prev_visit_datetime_val = Convert.ToDateTime(result[4].ToString()).ToString("yyyy-MM-dd");

session.Execute("update usertracking SET cur_visit_id = '" + visitId + "' , tot_visit_count= " + visit_val
+ " , prev_visit_last_ts= " + prev_visit_ts_val + ", prev_visit_datetime = '" + prev_visit_datetime_val
+ "' , cur_visit_last_ts = " + visitLastPageTime
+ ", tot_page_view = " + page_cnt_val + ", tot_time_spent = " + time_spt_val
+ ", cur_visit_datetime='" + DateTime.Today.ToString("yyyy-MM-dd")
+ "' WHERE cwc = '" + cwc.Trim() + "'");
_isProcessed = true;
}
session.Dispose();
cluster.Dispose();
return _isProcessed;
}
catch (Exception ex)
{
string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
Logs.WriteErrorLog("error in Cassandra InsertCWCRecords function with cwv :" + cwv + "error is :" + ex.Message);
SendMail.HandleException(ex, subject);
session.Dispose();
cluster.Dispose();
return false;
}
}
catch (Exception ex)
{
string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
Logs.WriteErrorLog("error in Cassandra InsertCWCRecords function connection :" + ex.Message);
SendMail.HandleException(ex, subject);
return false;
}

}

public bool UpdateReferrerTimeSpent(string cwc, int referrerCategoryId, double referrerTimeSpent, int index, string mobileId)
{
bool _isUpdated = false;

try
{
Logs.WriteInfoLog("Cassandra UpdateReferrerTimeSpent function called");
Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
ISession session = cluster.Connect("cw");

try
{
Row result = session.Execute("select time_spent_in_sec from userTimeSpentPage WHERE cwc = '" + cwc.Trim() + "' And logdate = '" + DateTime.Today.ToString("yyyy-MM-dd") + "' And page_category_id =" + referrerCategoryId).FirstOrDefault();
if (result != null)
{
if (result[0].ToString().Trim() != string.Empty)
{

Int64 page_time_spent_val = Int64.Parse(result[0].ToString());
Int64 tot_time_spt_val = page_time_spent_val + Int64.Parse(referrerTimeSpent.ToString());
session.Execute("update userTimeSpentPage set time_spent_in_sec= " + tot_time_spt_val + "WHERE cwc = '" + cwc.Trim() + "' And logdate = '" + DateTime.Today.ToString("yyyy-MM-dd") + "' And page_category_id=" + referrerCategoryId);
}
}
else
{
session.Execute("insert into userTimeSpentPage (cwc, page_category_id, time_spent_in_sec, logdate) values ('" + cwc + "' ," + referrerCategoryId + "," + referrerTimeSpent + ", '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
}
_isUpdated = true;
session.Dispose();
cluster.Dispose();
return _isUpdated;
}
catch (Exception ex)
{
string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
Logs.WriteErrorLog("error in Cassandra UpdateReferrerTimeSpent function with cwc:" + cwc + "error is :" + ex.Message);
SendMail.HandleException(ex, subject);
session.Dispose();
cluster.Dispose();
return _isUpdated;
}
}
catch (Exception ex)
{
string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
Logs.WriteErrorLog("error in Cassandra UpdateReferrerTimeSpent function connection" + ex.Message);
SendMail.HandleException(ex, subject);
return _isUpdated;
}

}
}
}

编辑的问题:

output of netstat -an | awk '/^tcp/ {print $NF}' | sort | uniq -c | sort -rn

On Machine 1 While cassandra running :
773 ESTABLISHED
36 LISTEN
1 CLOSE_WAIT

After cassandra stopped :
274 ESTABLISHED
36 LISTEN
1 CLOSE_WAIT

Machine 2 while cassandra running :
3941 ESTABLISHED
26 LISTEN
7 CLOSE_WAIT

After cassandra stopped :
26 LISTEN
9 ESTABLISHED

On machine 3 while cassandra running :
500 ESTABLISHED
21 LISTEN

After cassandra stopped :
21 LISTEN
13 ESTABLISHED

最佳答案

抛出 NoHostAvailableException 的原因有很多。然而,这都是关于 driver documentation 中描述的问题。 :

Exception thrown when a query cannot be performed because no host are available. This exception is thrown if

  • either there is no host live in the cluster at the moment of the query
  • all host that have been tried have failed due to a connection problem

现在为什么会发生这种情况 - 可能有多种原因。

  1. 可以在所有 3 个节点上同时进行主要垃圾收集。我个人认为情况并非如此,但您绝对应该阅读本文,看看它是否适用于您的情况。这是一个非常好的文档的链接,描述了如何 tune GC in Cassandra 。实际上,您为任何调用创建集群和 session 对象,而不是将它们存储为单例并重用它们,这一事实可能会使事情变得更糟。
  2. 查看引发错误的函数后,我或多或少确信问题在于,经过如此多次插入后,您的节点在以下语句期间读取宽行时超时:Row result = session.执行("从用户跟踪中选择 cur_visit_id,其中 cwc ='"+ cwc + "'").FirstOrDefault();.接下来,您将对同一集群键 cws 执行大量更新,由于 SSTable 的不可变性质,这会产生大量版本化数据,从而导致数据检索时间增加,因为集群需要针对您的每个请求合并所有这些数据。

在没有任何表模式的情况下很难提出建议,并且逆向工程也没有多大帮助,但我建议以某种方式利用复合主键来加快查找速度。调整您的 JVM,创建 session 和集群单例并在代码中重用它们。看看这是否有帮助。

通读 Cassandra 集群日志,重点关注问题发生的时间。查看这些日志中是否有任何线索,例如垃圾收集事件或超时错误。

HTH

罗马

关于Cassandra 抛出 NoHostAvailableException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29078486/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com