将自定义编解码器添加到 CassandraConnector
Adding custom codec to CassandraConnector
有没有办法在实例化 CassandraConnector
时注册自定义编解码器?
我目前每次调用时都在注册我的编解码器 cassandraConnector.withSessionDo
val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf)
...
...
.mapPartitions(partition => {
cassandraConnector.withSessionDo(session => {
// register custom codecs once for each partition so it isn't loaded as often for each data point
if (partition.nonEmpty) {
session.getCluster.getConfiguration.getCodecRegistry
.register(new TimestampLongCodec)
.register(new SummaryStatsBlobCodec)
.register(new JavaHistogramBlobCodec)
}
这样做似乎有点像反模式。它也确实阻塞了我们的日志,因为我们有一个 spark 流服务,每 30 秒 运行 并且它正在用以下内容填充我们的日志:
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec SummaryStatsBlobCodec [blob <-> SummaryStats] because it collides with previously registered codec SummaryStatsBlobCodec [blob <-> SummaryStats]
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec JavaHistogramBlobCodec [blob <-> Histogram] because it collides with previously registered codec JavaHistogramBlobCodec [blob <-> Histogram]
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec TimestampLongCodec [timestamp <-> java.lang.Long] because it collides with previously registered codec TimestampLongCodec [timestamp <-> java.lang.Long]
编辑:
我试过像这样立即注册它们:
val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf)
cassandraConnector.withClusterDo(cluster => {
cluster.getConfiguration.getCodecRegistry
.register(new TimestampLongCodec)
.register(new SummaryStatsBlobCodec)
.register(new JavaHistogramBlobCodec)
})
这个 ^ 在本地工作,但是当部署到我们的 mesos 集群时,它找不到编解码器。我假设这是因为它只在驱动程序中本地注册那些,并且从不将它们添加到执行程序版本中。
更好的方法是覆盖 cassandra 连接工厂,像这样
import com.datastax.driver.core.Cluster
import com.datastax.spark.connector.cql.{CassandraConnectionFactory, CassandraConnectorConf, DefaultConnectionFactory}
object MyConnectionFactory extends CassandraConnectionFactory {
override def createCluster(conf: CassandraConnectorConf): Cluster = {
val cluster = DefaultConnectionFactory.createCluster(conf)
cluster.getConfiguration.getCodecRegistry
.register(new TimestampLongCodec)
.register(new SummaryStatsBlobCodec)
.register(new JavaHistogramBlobCodec)
cluster
}
}
并设置 spark.cassandra.connection.factory
参数指向 class
有没有办法在实例化 CassandraConnector
时注册自定义编解码器?
我目前每次调用时都在注册我的编解码器 cassandraConnector.withSessionDo
val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf)
...
...
.mapPartitions(partition => {
cassandraConnector.withSessionDo(session => {
// register custom codecs once for each partition so it isn't loaded as often for each data point
if (partition.nonEmpty) {
session.getCluster.getConfiguration.getCodecRegistry
.register(new TimestampLongCodec)
.register(new SummaryStatsBlobCodec)
.register(new JavaHistogramBlobCodec)
}
这样做似乎有点像反模式。它也确实阻塞了我们的日志,因为我们有一个 spark 流服务,每 30 秒 运行 并且它正在用以下内容填充我们的日志:
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec SummaryStatsBlobCodec [blob <-> SummaryStats] because it collides with previously registered codec SummaryStatsBlobCodec [blob <-> SummaryStats]
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec JavaHistogramBlobCodec [blob <-> Histogram] because it collides with previously registered codec JavaHistogramBlobCodec [blob <-> Histogram]
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec TimestampLongCodec [timestamp <-> java.lang.Long] because it collides with previously registered codec TimestampLongCodec [timestamp <-> java.lang.Long]
编辑:
我试过像这样立即注册它们:
val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf)
cassandraConnector.withClusterDo(cluster => {
cluster.getConfiguration.getCodecRegistry
.register(new TimestampLongCodec)
.register(new SummaryStatsBlobCodec)
.register(new JavaHistogramBlobCodec)
})
这个 ^ 在本地工作,但是当部署到我们的 mesos 集群时,它找不到编解码器。我假设这是因为它只在驱动程序中本地注册那些,并且从不将它们添加到执行程序版本中。
更好的方法是覆盖 cassandra 连接工厂,像这样
import com.datastax.driver.core.Cluster
import com.datastax.spark.connector.cql.{CassandraConnectionFactory, CassandraConnectorConf, DefaultConnectionFactory}
object MyConnectionFactory extends CassandraConnectionFactory {
override def createCluster(conf: CassandraConnectorConf): Cluster = {
val cluster = DefaultConnectionFactory.createCluster(conf)
cluster.getConfiguration.getCodecRegistry
.register(new TimestampLongCodec)
.register(new SummaryStatsBlobCodec)
.register(new JavaHistogramBlobCodec)
cluster
}
}
并设置 spark.cassandra.connection.factory
参数指向 class