Scala 特征类型不匹配
Scala Trait type mismatch
我在我的项目中使用了一些 Kafka 通道层次结构:
我的基本特征是:
trait SendChannel[A, B] extends CommunicationChannel {
def send(data:A): B
}
现在我有一个像
这样的通用kafka发送频道
trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] {
val channelProps: KafkaSendChannelProperties
val kafkaProducer: Producer[String, B]
override def close(): Unit = kafkaProducer.close()
}
现在CommanKafkaSendChannel有2种变体,一种是带回调的,一种是带Future的:
trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] {
override def send(data: A): Future[RecordMetadata] = Future {
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get
}
}
KafkaSendChannelWithCallback
定义:
object KafkaSendChannelWithCallback {
def apply[A, B](oChannelProps: KafkaSendChannelProperties,
oKafkaProducer: Producer[String, B],
oCallback: Callback): KafkaSendChannelWithCallback[A, B] =
new KafkaSendChannelWithCallback[A,B] {
override val channelProps: KafkaSendChannelProperties = oChannelProps
override val kafkaProducer: Producer[String, B] = oKafkaProducer
override val callback: Callback = oCallback
}
}
trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] {
val callback: Callback
override def send(data: A): Unit =
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback)
}
现在根据配置值我 select 在 运行 时间正确的频道类型如下。我正在创建具有正确类型通道的 actor,它将数据发送到 kafka:
val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
error => {
logger.error("Exception while instantiating the KafkaSendChannel")
throw error
},
success => success
)
actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME)
演员定义:
object IngestionRouterActor {
def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props =
Props(classOf[IngestionActor[V]], config, sendChannel)
}
问题是当我使用 KafkaSendChannelWithCallback
时,我的代码编译正确,但是当我使用 KafkaSendChannelWithFuture
时,它在 actor =
声明中给出了以下错误:
[error]IngestionActor.scala:32: pattern type is incompatible with expected type;
[error] found : KafkaSendChannelWithFuture[String,V]
[error] required: SendChannel[V,Unit]
由于两个通道定义都是从 SendChannel
扩展的,所以这段代码应该没有任何错误地编译。我不确定为什么它没有编译。谢谢
Props
for IngestionActor
需要一个 SendChannel[V, Unit]
。将 KafkaSendChannelWithCallback
传递给此参数是有效的,因为它是 SendChannel[V, Unit]
.
另一方面,KafkaSendChannelWithFuture
是 SendChannel[V, Future[RecordMetadata]]
。 SendChannel[V, Future[RecordMetadata]]
是 而不是 SendChannel[V, Unit]
.
一个选项是重新定义 Props
以采用 SendChannel[V, Any]
,因为 Any
是 Unit
和 Future
的超类型:
def props[V](config: Config, sendChannel: SendChannel[V, Any]): Props = ???
在这一点上,编译器仍然不高兴,因为 SendChannel
作为泛型类型,默认情况下是不变的。换句话说,SendChannel[V, Unit]
和 SendChannel[V, Future[RecordMetadata]]
都不是 SendChannel[V, Any]
.
类型
要更改它,请使 SendChannel
在第二个类型参数上协变(通过在 B
前面添加 +
):
trait SendChannel[A, +B] extends CommunicationChannel {
def send(data: A): B
}
我在我的项目中使用了一些 Kafka 通道层次结构:
我的基本特征是:
trait SendChannel[A, B] extends CommunicationChannel {
def send(data:A): B
}
现在我有一个像
这样的通用kafka发送频道trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] {
val channelProps: KafkaSendChannelProperties
val kafkaProducer: Producer[String, B]
override def close(): Unit = kafkaProducer.close()
}
现在CommanKafkaSendChannel有2种变体,一种是带回调的,一种是带Future的:
trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] {
override def send(data: A): Future[RecordMetadata] = Future {
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get
}
}
KafkaSendChannelWithCallback
定义:
object KafkaSendChannelWithCallback {
def apply[A, B](oChannelProps: KafkaSendChannelProperties,
oKafkaProducer: Producer[String, B],
oCallback: Callback): KafkaSendChannelWithCallback[A, B] =
new KafkaSendChannelWithCallback[A,B] {
override val channelProps: KafkaSendChannelProperties = oChannelProps
override val kafkaProducer: Producer[String, B] = oKafkaProducer
override val callback: Callback = oCallback
}
}
trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] {
val callback: Callback
override def send(data: A): Unit =
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback)
}
现在根据配置值我 select 在 运行 时间正确的频道类型如下。我正在创建具有正确类型通道的 actor,它将数据发送到 kafka:
val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
error => {
logger.error("Exception while instantiating the KafkaSendChannel")
throw error
},
success => success
)
actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME)
演员定义:
object IngestionRouterActor {
def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props =
Props(classOf[IngestionActor[V]], config, sendChannel)
}
问题是当我使用 KafkaSendChannelWithCallback
时,我的代码编译正确,但是当我使用 KafkaSendChannelWithFuture
时,它在 actor =
声明中给出了以下错误:
[error]IngestionActor.scala:32: pattern type is incompatible with expected type; [error] found : KafkaSendChannelWithFuture[String,V] [error] required: SendChannel[V,Unit]
由于两个通道定义都是从 SendChannel
扩展的,所以这段代码应该没有任何错误地编译。我不确定为什么它没有编译。谢谢
Props
for IngestionActor
需要一个 SendChannel[V, Unit]
。将 KafkaSendChannelWithCallback
传递给此参数是有效的,因为它是 SendChannel[V, Unit]
.
另一方面,KafkaSendChannelWithFuture
是 SendChannel[V, Future[RecordMetadata]]
。 SendChannel[V, Future[RecordMetadata]]
是 而不是 SendChannel[V, Unit]
.
一个选项是重新定义 Props
以采用 SendChannel[V, Any]
,因为 Any
是 Unit
和 Future
的超类型:
def props[V](config: Config, sendChannel: SendChannel[V, Any]): Props = ???
在这一点上,编译器仍然不高兴,因为 SendChannel
作为泛型类型,默认情况下是不变的。换句话说,SendChannel[V, Unit]
和 SendChannel[V, Future[RecordMetadata]]
都不是 SendChannel[V, Any]
.
要更改它,请使 SendChannel
在第二个类型参数上协变(通过在 B
前面添加 +
):
trait SendChannel[A, +B] extends CommunicationChannel {
def send(data: A): B
}