卡桑德拉抛出 NoHostAvailableException

Cassandra throw NoHostAvailableException

我正在使用以下代码将我的 .net 客户端(基于 CQL)连接到 3 节点 Cassandra 集群。我正在获取数据(来自 RabbitMQ)30 records/sec,它们顺利地存储在 cassandra 中多达 800-900 行。但在那之后我得到了这个例外。任何人都可以告诉我我可以做什么来避免这个异常。我在任何地方都找不到这个问题的具体解决方案。

错误: ERROR ErrorLog - error in Cassandra GetCWCRow Function Connection :None of the hosts tried for query are available (tried: X.X.X.201:9042, X.X.X.200:9042, X.X.X.X:9042)

代码:

using Cassandra;
using Consumer;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;



namespace RabbitMqCarWaleUserTracking
{
    class DataAccessCassandra
    {

        public bool InsertCookieLogData(string cwc, string page_uri)
        {
            try
            {
                Logs.WriteInfoLog("Cassandra InsertCookieLogData a Function called");
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
                string pageCategory = string.Empty;
                try
                {

                    if ((Regex.IsMatch(page_uri, "/newcars/upcomingcars", RegexOptions.IgnoreCase)))
                    {
                        pageCategory = "upcomingCars";
                    }
                    else
                        if ((Regex.IsMatch(page_uri, "/newcars/dealers/newCarDealerShowroom", RegexOptions.IgnoreCase)) || (Regex.IsMatch(page_uri, "/newcars/dealers/listnewcardealersbycity", RegexOptions.IgnoreCase)
                            || (Regex.IsMatch(page_uri, "/newcars/dealers/dealerdetails", RegexOptions.IgnoreCase))))
                        {
                            pageCategory = "newcarsDealers";
                        }
                        else
                            if ((Regex.IsMatch(page_uri, "/offers", RegexOptions.IgnoreCase)) || (Regex.IsMatch(page_uri, "/alloffers", RegexOptions.IgnoreCase)))
                            {
                                pageCategory = "offers";
                            }
                            else
                                if ((Regex.IsMatch(page_uri, "/dealer/testdrive", RegexOptions.IgnoreCase)))
                                {
                                    pageCategory = "dealerTestDrive";
                                }

                    if (pageCategory != string.Empty)
                    {
                        Row result = session.Execute("select logdate from pageWiseCookieLog where cwc ='" + cwc + "' and page_uri ='" + pageCategory + "' and logdate= '" + DateTime.Today.ToString("yyyy-MM-dd") + "'").FirstOrDefault();
                        if (result == null)
                        {
                            session.Execute("insert into pageWiseCookieLog (cwc, page_uri, logdate) values ('" + cwc + "' , '" + pageCategory + "' , '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
                            session.Execute("insert into pageWiseCookieLogByld (cwc, page_uri, logdate) values ('" + cwc + "' , '" + pageCategory + "' , '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
                            session.Dispose();
                            cluster.Dispose();
                            return true;
                        }
                    }
                    else
                    {
                        //don't want to store the data for rest of the page category but need to return true  
                        session.Dispose();
                        cluster.Dispose();
                        return true;
                    }
                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra InsertCookieLogData function with cwc :" + cwc + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                }
            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra InsertCookieLogData function connection :" + ex.Message);
                SendMail.HandleException(ex, subject);              
            }

            return false;
        }

        public string GetCWCRow(string cwc, int index, string mobileId)
        {
            try
            {
                Logs.WriteInfoLog("Cassandra GetCWCRow Function called");
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
                try
                {
                    Row result = session.Execute("select cur_visit_id from usertracking where cwc ='" + cwc + "'").FirstOrDefault();

                    if (result != null)
                    {
                        session.Dispose();
                        cluster.Dispose();
                        return result[0].ToString();
                    }

                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra GetCWCRow function with cwc :" + cwc + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                }

            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra GetCWCRow Function Connection :" + ex.Message);
                SendMail.HandleException(ex, subject);
            }

            return string.Empty;
        }

        public bool InsertCWCRecords(string cwv, Cut_Case caseType, int index, string mobileId)
        {
            try
            {
                Logs.WriteInfoLog("Cassandra InsertCWCRecords function called for case:" + caseType);
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
                try
                {

                    bool _isProcessed = false;
                    string visitCount = "";
                    string[] leadParameters = cwv.Split('.');
                    string cwc = leadParameters[0];
                    string visitId = leadParameters[1];
                    string visitStartTime = leadParameters[2];
                    string visitPrevPageTime = leadParameters[3];
                    string visitLastPageTime = leadParameters[4];

                    if (leadParameters.Length == 6)
                    {
                        visitCount = leadParameters[5];
                    }
                    string TOT_TIME_SPENT = (Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime)).ToString();

                    if ((int)caseType == 1) //to enter new cwc data in summary table
                    {
                        session.Execute("insert into usertracking (cwc, cur_visit_id, cur_visit_last_ts,  tot_page_view, tot_time_spent, tot_visit_count, cur_visit_datetime) values ('" + cwc + "' , '" + visitId + "' ," + visitStartTime + "," + "1" + "," + TOT_TIME_SPENT + "," + "1" + ", '" + DateTime.Today.ToString("yyyy-MM-dd") + "' )");
                        _isProcessed = true;
                    }
                    if ((int)caseType == 2) //if cwc exits and visit id is same
                    {
                        Row result = session.Execute("select tot_page_view, tot_time_spent from usertracking where cwc ='" + cwc + "'").FirstOrDefault();
                        int page_cnt_val = int.Parse(result[0].ToString()) + 1;
                        Int64 time_spt_val = Int64.Parse(result[1].ToString()) + Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime);
                        session.Execute("update usertracking SET cur_visit_last_ts = " + visitLastPageTime + ", tot_page_view = " + page_cnt_val + ", tot_time_spent = " + time_spt_val + " WHERE cwc = '" + cwc.Trim() + "'");
                        _isProcessed = true;
                    }
                    if ((int)caseType == 3) //if cwc exits ans visit id is different
                    {
                        Row result = session.Execute("select tot_page_view, tot_time_spent, tot_visit_count, cur_visit_last_ts, cur_visit_datetime from usertracking where cwc = '" + cwc + "'").First();
                        int page_cnt_val = int.Parse(result[0].ToString()) + 1;
                        Int64 time_spt_val = Int64.Parse(result[1].ToString()) + Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime);
                        int visit_val = int.Parse(result[2].ToString()) + 1;
                        Int64 prev_visit_ts_val = Int64.Parse(result[3].ToString());
                        String prev_visit_datetime_val = Convert.ToDateTime(result[4].ToString()).ToString("yyyy-MM-dd");

                        session.Execute("update usertracking SET  cur_visit_id = '" + visitId + "' , tot_visit_count= " + visit_val
                            + " , prev_visit_last_ts= " + prev_visit_ts_val + ", prev_visit_datetime = '" + prev_visit_datetime_val
                            + "' , cur_visit_last_ts = " + visitLastPageTime
                            + ", tot_page_view = " + page_cnt_val + ", tot_time_spent = " + time_spt_val
                            + ", cur_visit_datetime='" + DateTime.Today.ToString("yyyy-MM-dd")
                            + "' WHERE cwc = '" + cwc.Trim() + "'");
                        _isProcessed = true;
                    }
                    session.Dispose();
                    cluster.Dispose();
                    return _isProcessed;
                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra InsertCWCRecords function with cwv :" + cwv + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                    return false;
                }
            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra InsertCWCRecords function connection :" + ex.Message);
                SendMail.HandleException(ex, subject);
                return false;
            }

        }

        public bool UpdateReferrerTimeSpent(string cwc, int referrerCategoryId, double referrerTimeSpent, int index, string mobileId)
        {
            bool _isUpdated = false;

            try
            {              
                Logs.WriteInfoLog("Cassandra UpdateReferrerTimeSpent function called");
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect("cw");

                try
                {
                    Row result = session.Execute("select time_spent_in_sec from userTimeSpentPage WHERE cwc = '" + cwc.Trim() + "' And logdate = '" + DateTime.Today.ToString("yyyy-MM-dd") + "' And page_category_id =" + referrerCategoryId).FirstOrDefault();
                    if (result != null)
                    {
                        if (result[0].ToString().Trim() != string.Empty)
                        {

                            Int64 page_time_spent_val = Int64.Parse(result[0].ToString());
                            Int64 tot_time_spt_val = page_time_spent_val + Int64.Parse(referrerTimeSpent.ToString());
                            session.Execute("update userTimeSpentPage set time_spent_in_sec= " + tot_time_spt_val + "WHERE cwc = '" + cwc.Trim() + "' And logdate = '" + DateTime.Today.ToString("yyyy-MM-dd") + "' And page_category_id=" + referrerCategoryId);
                        }
                    }
                    else
                    {
                        session.Execute("insert into userTimeSpentPage (cwc, page_category_id, time_spent_in_sec, logdate) values ('" + cwc + "' ," + referrerCategoryId + "," + referrerTimeSpent + ", '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
                    }
                    _isUpdated = true;
                    session.Dispose();
                    cluster.Dispose();
                    return _isUpdated;
                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra UpdateReferrerTimeSpent function with cwc:" + cwc + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                    return _isUpdated;
                }
            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra UpdateReferrerTimeSpent function connection" + ex.Message);
                SendMail.HandleException(ex, subject);
                return _isUpdated;
            }

        }
    }
}

已编辑问题:

output of netstat -an | awk '/^tcp/ {print $NF}' | sort | uniq -c | sort -rn

On Machine 1  While cassandra running :
      773 ESTABLISHED
      36 LISTEN
      1 CLOSE_WAIT

After cassandra stopped :
      274 ESTABLISHED
      36 LISTEN
      1 CLOSE_WAIT

Machine 2 while cassandra running :
       3941 ESTABLISHED
       26 LISTEN
       7 CLOSE_WAIT

After cassandra stopped :
       26 LISTEN
       9 ESTABLISHED

On machine 3 while cassandra running :
       500 ESTABLISHED
       21 LISTEN

After cassandra stopped :    
      21 LISTEN
      13 ESTABLISHED

可能出于多种原因抛出 NoHostAvailableException。然而,这都是关于 driver documentation:

中描述的问题

Exception thrown when a query cannot be performed because no host are available. This exception is thrown if

  • either there is no host live in the cluster at the moment of the query
  • all host that have been tried have failed due to a connection problem

现在为什么会这样 - 可能有多种原因。

  1. 在所有 3 个节点上同时进行主要垃圾收集的可能性。我个人认为情况并非如此,但您一定要阅读这篇文章,看看它是否适用于您的情况。这是一个 link 到一个非常好的文档,描述了如何 tune GC in Cassandra。事实上,您实际上为任何调用创建集群和会话对象而不是将它们存储为单例并只是重复使用它们,这可能会使事情变得更糟。
  2. 在查看了抛出错误的函数之后,我或多或少确信问题出在经过这么多次插入之后,您的节点在这条语句中读取宽行时超时:Row result = session.Execute("select cur_visit_id from usertracking where cwc ='" + cwc + "'").FirstOrDefault();。在这条路上,您正在为相同的集群键 cws 执行大量更新,由于 SSTables 的 immutable 性质,会产生大量版本化数据,从而增加数据检索时间,因为集群需要为您的每个请求组合所有这些数据。

如果没有任何 table 架构,很难提出建议,逆向工程也无济于事,但我建议以某种方式利用复合主键来加快查找速度。调整您的 JVM,创建会话和集群单例并在您的代码中重用它们。看看这是否有帮助。

通读 Cassandra 集群日志,重点关注问题发生的时间。查看这些日志中是否有任何线索,例如垃圾收集 activity 或超时错误。

HTH

罗马