时间戳分区键上的 Spark JoinWithCassandraTable STUCK
Spark JoinWithCassandraTable on TimeStamp partition key STUCK
我正在尝试使用以下方法过滤巨大 C* table 的一小部分:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tspark")
println("Done Join")
//*******
//get only the snapshots and create rdd temp table
val jsons = snapshotsFiltered.map(_._2.getString("snapshot"))
val jsonSchemaRDD = sqlContext.jsonRDD(jsons)
jsonSchemaRDD.registerTempTable("snapshots_json")
与:
case class TableKey(created: Long) //(created, imei, when)--> created = partititon key | imei, when = clustering key
而 cassandra table 模式是:
CREATE TABLE listener.snapshots_tspark (
created timestamp,
imei text,
when timestamp,
snapshot text,
PRIMARY KEY (created, imei, when) ) WITH CLUSTERING ORDER BY (imei ASC, when ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
问题是在 spark master ui.
上没有错误地完成 println 后进程冻结
[Stage 0:> (0 + 2) / 2]
Join 不能使用时间戳作为分区键吗?为什么它会冻结?
通过使用:
sc.parallelize(startDate to endDate)
将 startData 和 endDate 作为从 Dates 生成的 Longs,格式如下:
("yyyy-MM-dd HH:mm:ss")
我用 spark 构建了一个巨大的数组(超过 100,000 个对象)来与 C* 连接 table 并且它根本没有卡住 - C* 努力使连接发生并且 return 数据。
最后,我将范围更改为:
case class TableKey(created_dh: String)
val data = Array("2015-10-29 12:00:00", "2015-10-29 13:00:00", "2015-10-29 14:00:00", "2015-10-29 15:00:00")
val snapshotsFiltered = sc.parallelize(data, 2).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tnew")
现在好了。
我正在尝试使用以下方法过滤巨大 C* table 的一小部分:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tspark")
println("Done Join")
//*******
//get only the snapshots and create rdd temp table
val jsons = snapshotsFiltered.map(_._2.getString("snapshot"))
val jsonSchemaRDD = sqlContext.jsonRDD(jsons)
jsonSchemaRDD.registerTempTable("snapshots_json")
与:
case class TableKey(created: Long) //(created, imei, when)--> created = partititon key | imei, when = clustering key
而 cassandra table 模式是:
CREATE TABLE listener.snapshots_tspark (
created timestamp,
imei text,
when timestamp,
snapshot text,
PRIMARY KEY (created, imei, when) ) WITH CLUSTERING ORDER BY (imei ASC, when ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
问题是在 spark master ui.
上没有错误地完成 println 后进程冻结[Stage 0:> (0 + 2) / 2]
Join 不能使用时间戳作为分区键吗?为什么它会冻结?
通过使用:
sc.parallelize(startDate to endDate)
将 startData 和 endDate 作为从 Dates 生成的 Longs,格式如下:
("yyyy-MM-dd HH:mm:ss")
我用 spark 构建了一个巨大的数组(超过 100,000 个对象)来与 C* 连接 table 并且它根本没有卡住 - C* 努力使连接发生并且 return 数据。
最后,我将范围更改为:
case class TableKey(created_dh: String)
val data = Array("2015-10-29 12:00:00", "2015-10-29 13:00:00", "2015-10-29 14:00:00", "2015-10-29 15:00:00")
val snapshotsFiltered = sc.parallelize(data, 2).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tnew")
现在好了。