使用 Spark 的 Cassandra 写入性能非常低
Cassandra write giving very slow perfomance using Spark
我有一个 cassandra table,有大约 500+ 百万条记录(在 6 个节点中),现在我正在尝试使用 Amazon EMR 中的 spark-cassandra-connector 插入数据
Table结构
CREATE TABLE dmp.dmp_user_profiles_latest (
pid text PRIMARY KEY,
xnid int,
day_count map<text, int>,
first_seen map<text, timestamp>,
last_seen map<text, timestamp>,
usage_count map<text, int>,
city text,
country text,
lid set<text>,
)WITH bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"NONE", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32'}
AND compression = {'chunk_length_kb': '256', 'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 172800
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.1
AND speculative_retry = '99.0PERCENTILE';
CREATE INDEX dmp_user_profiles_latest_app_day_count_idx ON dmp.dmp_user_profiles_latest (day_count);
CREATE INDEX dmp_user_profiles_latest_country_idx ON dmp.dmp_user_profiles_latest (country);
以下是我的 spark-submit 选项
--class com.mobi.vserv.driver.Query5kPids1
--conf spark.dynamicAllocation.enabled=true
--conf spark.yarn.executor.memoryOverhead=1024
--conf spark.yarn.driver.memoryOverhead=1024
--executor-memory 1g
--executor-cores 2
--driver-memory 4g
但在日志中我看到写入 Cassandra 大约需要 4-5 分钟来加载 20 万(200,000)条记录(而总执行时间为 6 分钟以上)
我也在 Spark conf 中添加了以下内容
conf.set("spark.cassandra.output.batch.size.rows", "auto");
conf.set("spark.cassandra.output.concurrent.writes", "500");
conf.set("spark.cassandra.output.batch.size.bytes", "100000");
conf.set("spark.cassandra.output.throughput_mb_per_sec","1");
但仍然没有性能提升,增加 Amazon EMR 中的内核数也无济于事。
请注意,在我的 Cassandra table 中,我们没有使用任何 partitioning/clustering 列,这可能是性能如此缓慢的原因。
请注意网络速度为 30 MB PS 主键是字母数字值,例如 - a9be3eb4-751f-48ee-b593-b3f89e18622d
Cassandra.yaml
cluster_name: 'dmp Cluster'
num_tokens: 100
hinted_handoff_enabled: true
max_hint_window_in_ms: 10800000 # 3 hours
hinted_handoff_throttle_in_kb: 1024
max_hints_delivery_threads: 2
batchlog_replay_throttle_in_kb: 1024
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
permissions_validity_in_ms: 2000
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
data_file_directories:
- /data/cassandra/data
disk_failure_policy: stop
commit_failure_policy: stop
key_cache_size_in_mb:
key_cache_save_period: 14400
row_cache_size_in_mb: 0
row_cache_save_period: 0
counter_cache_size_in_mb:
counter_cache_save_period: 7200
saved_caches_directory: /data/cassandra/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "10.142.76.97,10.182.19.301"
concurrent_reads: 256
concurrent_writes: 128
concurrent_counter_writes: 32
memtable_allocation_type: heap_buffers
memtable_flush_writers: 8
index_summary_capacity_in_mb:
index_summary_resize_interval_in_minutes: 60
trickle_fsync: false
trickle_fsync_interval_in_kb: 10240
storage_port: 7000
ssl_storage_port: 7001
listen_address: 10.142.76.97
start_rpc: true
rpc_address: 10.23.244.172
rpc_port: 9160
rpc_keepalive: true
rpc_server_type: sync
thrift_framed_transport_size_in_mb: 15
incremental_backups: false
snapshot_before_compaction: false
auto_snapshot: true
tombstone_warn_threshold: 1000
tombstone_failure_threshold: 100000
column_index_size_in_kb: 64
batch_size_warn_threshold_in_kb: 5
concurrent_compactors: 4
compaction_throughput_mb_per_sec: 64
sstable_preemptive_open_interval_in_mb: 50
read_request_timeout_in_ms: 500000
range_request_timeout_in_ms: 1000000
write_request_timeout_in_ms: 200000
counter_write_request_timeout_in_ms: 500000
cas_contention_timeout_in_ms: 100000
endpoint_snitch: Ec2Snitch
dynamic_snitch_update_interval_in_ms: 100
dynamic_snitch_reset_interval_in_ms: 600000
dynamic_snitch_badness_threshold: 0.1
request_scheduler: org.apache.cassandra.scheduler.NoScheduler
server_encryption_options:
internode_encryption: none
keystore: conf/.keystore
keystore_password: cassandra
truststore: conf/.truststore
truststore_password: cassandra
client_encryption_options:
enabled: false
keystore: conf/.keystore
keystore_password: cassandra
internode_compression: all
inter_dc_tcp_nodelay: false
正如评论中所说,您的问题似乎来自 day_count 上的索引。
如本 page 所示,如果您必须一直更新索引,索引将不会有效,而当您将不同的值插入 day_count 时(可能每次都).
您需要重新处理您的数据库,但由于这是您的生产环境,如果需要此索引,您不能只 DROP INDEX IF EXISTS keyspace.index_name
,但您可以使用 day_count
创建一个辅助数据库作为主键,或使用 day_count
作为排序索引。
我有一个 cassandra table,有大约 500+ 百万条记录(在 6 个节点中),现在我正在尝试使用 Amazon EMR 中的 spark-cassandra-connector 插入数据
Table结构
CREATE TABLE dmp.dmp_user_profiles_latest (
pid text PRIMARY KEY,
xnid int,
day_count map<text, int>,
first_seen map<text, timestamp>,
last_seen map<text, timestamp>,
usage_count map<text, int>,
city text,
country text,
lid set<text>,
)WITH bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"NONE", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32'}
AND compression = {'chunk_length_kb': '256', 'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 172800
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.1
AND speculative_retry = '99.0PERCENTILE';
CREATE INDEX dmp_user_profiles_latest_app_day_count_idx ON dmp.dmp_user_profiles_latest (day_count);
CREATE INDEX dmp_user_profiles_latest_country_idx ON dmp.dmp_user_profiles_latest (country);
以下是我的 spark-submit 选项
--class com.mobi.vserv.driver.Query5kPids1
--conf spark.dynamicAllocation.enabled=true
--conf spark.yarn.executor.memoryOverhead=1024
--conf spark.yarn.driver.memoryOverhead=1024
--executor-memory 1g
--executor-cores 2
--driver-memory 4g
但在日志中我看到写入 Cassandra 大约需要 4-5 分钟来加载 20 万(200,000)条记录(而总执行时间为 6 分钟以上)
我也在 Spark conf 中添加了以下内容
conf.set("spark.cassandra.output.batch.size.rows", "auto");
conf.set("spark.cassandra.output.concurrent.writes", "500");
conf.set("spark.cassandra.output.batch.size.bytes", "100000");
conf.set("spark.cassandra.output.throughput_mb_per_sec","1");
但仍然没有性能提升,增加 Amazon EMR 中的内核数也无济于事。
请注意,在我的 Cassandra table 中,我们没有使用任何 partitioning/clustering 列,这可能是性能如此缓慢的原因。
请注意网络速度为 30 MB PS 主键是字母数字值,例如 - a9be3eb4-751f-48ee-b593-b3f89e18622d
Cassandra.yaml
cluster_name: 'dmp Cluster'
num_tokens: 100
hinted_handoff_enabled: true
max_hint_window_in_ms: 10800000 # 3 hours
hinted_handoff_throttle_in_kb: 1024
max_hints_delivery_threads: 2
batchlog_replay_throttle_in_kb: 1024
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
permissions_validity_in_ms: 2000
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
data_file_directories:
- /data/cassandra/data
disk_failure_policy: stop
commit_failure_policy: stop
key_cache_size_in_mb:
key_cache_save_period: 14400
row_cache_size_in_mb: 0
row_cache_save_period: 0
counter_cache_size_in_mb:
counter_cache_save_period: 7200
saved_caches_directory: /data/cassandra/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "10.142.76.97,10.182.19.301"
concurrent_reads: 256
concurrent_writes: 128
concurrent_counter_writes: 32
memtable_allocation_type: heap_buffers
memtable_flush_writers: 8
index_summary_capacity_in_mb:
index_summary_resize_interval_in_minutes: 60
trickle_fsync: false
trickle_fsync_interval_in_kb: 10240
storage_port: 7000
ssl_storage_port: 7001
listen_address: 10.142.76.97
start_rpc: true
rpc_address: 10.23.244.172
rpc_port: 9160
rpc_keepalive: true
rpc_server_type: sync
thrift_framed_transport_size_in_mb: 15
incremental_backups: false
snapshot_before_compaction: false
auto_snapshot: true
tombstone_warn_threshold: 1000
tombstone_failure_threshold: 100000
column_index_size_in_kb: 64
batch_size_warn_threshold_in_kb: 5
concurrent_compactors: 4
compaction_throughput_mb_per_sec: 64
sstable_preemptive_open_interval_in_mb: 50
read_request_timeout_in_ms: 500000
range_request_timeout_in_ms: 1000000
write_request_timeout_in_ms: 200000
counter_write_request_timeout_in_ms: 500000
cas_contention_timeout_in_ms: 100000
endpoint_snitch: Ec2Snitch
dynamic_snitch_update_interval_in_ms: 100
dynamic_snitch_reset_interval_in_ms: 600000
dynamic_snitch_badness_threshold: 0.1
request_scheduler: org.apache.cassandra.scheduler.NoScheduler
server_encryption_options:
internode_encryption: none
keystore: conf/.keystore
keystore_password: cassandra
truststore: conf/.truststore
truststore_password: cassandra
client_encryption_options:
enabled: false
keystore: conf/.keystore
keystore_password: cassandra
internode_compression: all
inter_dc_tcp_nodelay: false
正如评论中所说,您的问题似乎来自 day_count 上的索引。
如本 page 所示,如果您必须一直更新索引,索引将不会有效,而当您将不同的值插入 day_count 时(可能每次都).
您需要重新处理您的数据库,但由于这是您的生产环境,如果需要此索引,您不能只 DROP INDEX IF EXISTS keyspace.index_name
,但您可以使用 day_count
创建一个辅助数据库作为主键,或使用 day_count
作为排序索引。