SaveToCassandra ,是否有写入行的顺序
SaveToCassandra , Is there any ordering in which the rows are written
这是我保存到 Cassandra 的 RDD 的内容 table。
但是看起来第二行是先写的,然后第一行覆盖它。所以我最终得到了糟糕的输出。
(494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H9M30S, WEDNESDAY)
(494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H10M0S, WEDNESDAY)
有没有办法强制写入 Cassandra 的行的顺序。
请帮忙。
谢谢
这完全取决于您对 table 的定义。不能保证分区键(主键的第一部分)的顺序。
主键的其余部分用于对分区内的键进行排序。这就是您的问题所在。您必须定义聚类列。
这里是这样描述的:
https://docs.datastax.com/en/cql/3.1/cql/ddl/ddl_compound_keys_c.html
插入的顺序仍然很重要,但仅在有两个相同信息的情况下,最后一个获胜。我认为这里不是这种情况。
您还可以考虑将 "PT0H9M30S" 中的信息放在聚类列下,这样您就可以保留数据而不是覆盖它。
Cassandra 是时间序列数据库。你应该设计你的 table 这样就不会发生覆盖。或者如果你想写 earliest/latest 时间戳,那么你应该使用像 reduceByKey 这样的转换来减少你的 RDD,只保留特定键的 earliest/latest 时间戳信息。
是否有 SaveToCassandra 命令?
在单个任务中执行是确定性的,但可能不是
订购您所期待的。这里有两件事需要考虑。
- RDD 由 Spark 分区组成,这些分区的执行顺序取决于系统条件。拥有不同数量的核心、异构机器或执行器故障都可能改变执行顺序。具有相同 Cassandra 分区数据的两个 Spark 分区可以基于系统以任何顺序执行。
- 对于每个 Spark 分区,记录的批处理顺序与接收顺序相同,但这并不一定意味着它们将以相同的顺序发送到 Cassandra。连接器中有一些设置可以确定何时发送批处理,并且可以想象包含较晚数据的批处理将在包含较早数据的批处理之前执行。这意味着虽然发送批次的顺序是确定的,但不一定与前一个迭代器的顺序相同。
这对您的申请有影响吗?
可能不会。只有当你的数据真的分散时,这才真正重要
在 RDD 中。如果特定 Cassandra 分区的条目分布在
多个 Spark 分区,那么 Spark 执行的顺序可能会混乱
你的更新。考虑
Spark Partition 1 has Record A
Spark Partition 2 has Record B
Both Spark Partitions have work start simultaneously, but Record B is
reached before Record A.
但我认为这不太可能是问题。
您 运行 遇到的问题很可能是常见问题:the order of statements in my batch is not respected。这个问题的核心是 Cassandra 批处理中的所有语句都是“同时”执行的。这意味着如果任何 Primary Key
存在冲突,则需要解决冲突。在这些情况下,Cassandra 会为所有冲突选择较大的单元格值。由于连接器会自动批量写入同一分区键,您最终可能会遇到冲突。
你可以在你的例子中看到这一点,较大的值(PT0H9M30S)被保留,较小的(PT0H10M0S)被丢弃。问题不在于顺序,而在于批处理正在发生。
那我怎样才能根据时间做upsert呢?
非常仔细。我会考虑采用几种方法。
最好的选择是不要根据时间进行更新插入。如果您有 PRIMARY_KEY
的多个条目但只想要最后一个条目,请在点击 Cassandra 之前减少 Spark。在尝试写入之前删除不需要的条目将节省时间并减轻 Cassandra 集群的负载。否则,您将 Cassandra 用作相当昂贵的 de-duping 机器。
一个更糟糕的选择是只禁用 Spark Cassandra 连接器中的批处理。这会损害性能,但如果您只关心 Spark 分区中的顺序,则可以解决问题。如果您有多个 Spark 分区,这仍然会导致冲突,因为您无法控制它们的执行顺序。
这个故事的寓意
状态不佳。订单很糟糕。尽可能将系统设计为幂等的。如果有多个记录并且您知道哪些重要,请在进入分布式 LWW 系统之前删除不重要的记录。
这是我保存到 Cassandra 的 RDD 的内容 table。 但是看起来第二行是先写的,然后第一行覆盖它。所以我最终得到了糟糕的输出。
(494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H9M30S, WEDNESDAY) (494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H10M0S, WEDNESDAY)
有没有办法强制写入 Cassandra 的行的顺序。 请帮忙。 谢谢
这完全取决于您对 table 的定义。不能保证分区键(主键的第一部分)的顺序。
主键的其余部分用于对分区内的键进行排序。这就是您的问题所在。您必须定义聚类列。
这里是这样描述的: https://docs.datastax.com/en/cql/3.1/cql/ddl/ddl_compound_keys_c.html
插入的顺序仍然很重要,但仅在有两个相同信息的情况下,最后一个获胜。我认为这里不是这种情况。
您还可以考虑将 "PT0H9M30S" 中的信息放在聚类列下,这样您就可以保留数据而不是覆盖它。
Cassandra 是时间序列数据库。你应该设计你的 table 这样就不会发生覆盖。或者如果你想写 earliest/latest 时间戳,那么你应该使用像 reduceByKey 这样的转换来减少你的 RDD,只保留特定键的 earliest/latest 时间戳信息。
是否有 SaveToCassandra 命令?
在单个任务中执行是确定性的,但可能不是 订购您所期待的。这里有两件事需要考虑。
- RDD 由 Spark 分区组成,这些分区的执行顺序取决于系统条件。拥有不同数量的核心、异构机器或执行器故障都可能改变执行顺序。具有相同 Cassandra 分区数据的两个 Spark 分区可以基于系统以任何顺序执行。
- 对于每个 Spark 分区,记录的批处理顺序与接收顺序相同,但这并不一定意味着它们将以相同的顺序发送到 Cassandra。连接器中有一些设置可以确定何时发送批处理,并且可以想象包含较晚数据的批处理将在包含较早数据的批处理之前执行。这意味着虽然发送批次的顺序是确定的,但不一定与前一个迭代器的顺序相同。
这对您的申请有影响吗?
可能不会。只有当你的数据真的分散时,这才真正重要 在 RDD 中。如果特定 Cassandra 分区的条目分布在 多个 Spark 分区,那么 Spark 执行的顺序可能会混乱 你的更新。考虑
Spark Partition 1 has Record A
Spark Partition 2 has Record B
Both Spark Partitions have work start simultaneously, but Record B is
reached before Record A.
但我认为这不太可能是问题。
您 运行 遇到的问题很可能是常见问题:the order of statements in my batch is not respected。这个问题的核心是 Cassandra 批处理中的所有语句都是“同时”执行的。这意味着如果任何 Primary Key
存在冲突,则需要解决冲突。在这些情况下,Cassandra 会为所有冲突选择较大的单元格值。由于连接器会自动批量写入同一分区键,您最终可能会遇到冲突。
你可以在你的例子中看到这一点,较大的值(PT0H9M30S)被保留,较小的(PT0H10M0S)被丢弃。问题不在于顺序,而在于批处理正在发生。
那我怎样才能根据时间做upsert呢?
非常仔细。我会考虑采用几种方法。
最好的选择是不要根据时间进行更新插入。如果您有 PRIMARY_KEY
的多个条目但只想要最后一个条目,请在点击 Cassandra 之前减少 Spark。在尝试写入之前删除不需要的条目将节省时间并减轻 Cassandra 集群的负载。否则,您将 Cassandra 用作相当昂贵的 de-duping 机器。
一个更糟糕的选择是只禁用 Spark Cassandra 连接器中的批处理。这会损害性能,但如果您只关心 Spark 分区中的顺序,则可以解决问题。如果您有多个 Spark 分区,这仍然会导致冲突,因为您无法控制它们的执行顺序。
这个故事的寓意
状态不佳。订单很糟糕。尽可能将系统设计为幂等的。如果有多个记录并且您知道哪些重要,请在进入分布式 LWW 系统之前删除不重要的记录。