无法从 voltdb 导出到 kafka
Fail to export from voltdb to kafka
VoltDB 是企业版 5.1.2
Kafka是2.9.2-0.8.1.1,也试过2.10-0.8.2.1
VoltDB 位于 192.168.56.101
卡夫卡位于 192.168.56.102
这是我的 VoltDB 部署配置:
<deployment>
<cluster hostcount="1" sitesperhost="4" kfactor="0" />
<commandlog enabled="true">
<frequency time="400" transactions="1000" />
</commandlog>
<export>
<configuration enabled="true" type="kafka" stream="archive">
<property name="metadata.broker.list">192.168.56.102:9092</property>
<property name="producer.type">sync</property>
<property name="batch.mode">true</property>
</configuration>
</export>
</deployment>
架构定义为:
drop table person if exists;
create table person (
ic varchar(9) not null,
first_name varchar(20) not null,
last_name varchar(20) not null,
middle_name varchar(20),
gender tinyint not null,
dob timestamp not null,
date_created timestamp default now
);
partition table person on column ic;
export table person to stream archive;
而对于Kafka的server.properties,我只加了这一行
auto.create.topics.enable=true
我第一次启动 Kafka 是这样的:
bin/zookeeper-server-startsh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
当我启动 VoltDB 时,我遇到了这个异常:
david@u14voltdb:~$ voltdb create catalog.jar --deployment=config.xml
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=64m; support was removed in 8.0
Initializing VoltDB...
_ __ ____ ____ ____
| | / /___ / / /_/ __ \/ __ )
| | / / __ \/ / __/ / / / __ |
| |/ / /_/ / / /_/ /_/ / /_/ /
|___/\____/_/\__/_____/_____/
--------------------------------
Build: 5.1.2 voltdb-5.1.2-0-g6d05c33-local Enterprise Edition
Connecting to VoltDB cluster as the leader...
Host id of this node is: 0
Starting VoltDB with trial license. License expires on May 31, 2015.
Initializing the database and command logs. This may take a moment...
WARN: Failed to send producer request with correlation id 2 to broker 0 with data for partitions [voltdbexportPERSON,0]
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
at kafka.producer.SyncProducer.liftedTree1(SyncProducer.scala:72)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send.apply$mcV$sp(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send.apply(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send.apply(SyncProducer.scala:101)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData.apply(DefaultEventHandler.scala:100)
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:80)
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.mutable.HashTable$$anon.foreach(HashTable.scala:161)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:42)
at org.voltdb.exportclient.KafkaExportClient$KafkaExportDecoder.onBlockCompletion(KafkaExportClient.java:217)
at org.voltdb.export.processors.GuestProcessor.run(GuestProcessor.java:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at org.voltcore.utils.CoreUtils.run(CoreUtils.java:735)
at java.lang.Thread.run(Thread.java:745)
在 Kafka 端,我不断得到这个:
[2015-05-14 00:40:08,197] INFO Closing socket connection to /192.168.56.101. (kafka.network.Processor)
有什么建议吗?
这纯粹是Kafka设置的问题。在设置中,有一个注释设置:
advertised.host.name=something
只需要将"something"替换为Kafka所在服务器的IP地址运行即可。这是在
中找到的
VoltDB 是企业版 5.1.2
Kafka是2.9.2-0.8.1.1,也试过2.10-0.8.2.1
VoltDB 位于 192.168.56.101
卡夫卡位于 192.168.56.102
这是我的 VoltDB 部署配置:
<deployment>
<cluster hostcount="1" sitesperhost="4" kfactor="0" />
<commandlog enabled="true">
<frequency time="400" transactions="1000" />
</commandlog>
<export>
<configuration enabled="true" type="kafka" stream="archive">
<property name="metadata.broker.list">192.168.56.102:9092</property>
<property name="producer.type">sync</property>
<property name="batch.mode">true</property>
</configuration>
</export>
</deployment>
架构定义为:
drop table person if exists;
create table person (
ic varchar(9) not null,
first_name varchar(20) not null,
last_name varchar(20) not null,
middle_name varchar(20),
gender tinyint not null,
dob timestamp not null,
date_created timestamp default now
);
partition table person on column ic;
export table person to stream archive;
而对于Kafka的server.properties,我只加了这一行
auto.create.topics.enable=true
我第一次启动 Kafka 是这样的:
bin/zookeeper-server-startsh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
当我启动 VoltDB 时,我遇到了这个异常:
david@u14voltdb:~$ voltdb create catalog.jar --deployment=config.xml
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=64m; support was removed in 8.0
Initializing VoltDB...
_ __ ____ ____ ____
| | / /___ / / /_/ __ \/ __ )
| | / / __ \/ / __/ / / / __ |
| |/ / /_/ / / /_/ /_/ / /_/ /
|___/\____/_/\__/_____/_____/
--------------------------------
Build: 5.1.2 voltdb-5.1.2-0-g6d05c33-local Enterprise Edition
Connecting to VoltDB cluster as the leader...
Host id of this node is: 0
Starting VoltDB with trial license. License expires on May 31, 2015.
Initializing the database and command logs. This may take a moment...
WARN: Failed to send producer request with correlation id 2 to broker 0 with data for partitions [voltdbexportPERSON,0]
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
at kafka.producer.SyncProducer.liftedTree1(SyncProducer.scala:72)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send.apply$mcV$sp(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send.apply(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send.apply(SyncProducer.scala:101)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData.apply(DefaultEventHandler.scala:100)
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:80)
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.mutable.HashTable$$anon.foreach(HashTable.scala:161)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:42)
at org.voltdb.exportclient.KafkaExportClient$KafkaExportDecoder.onBlockCompletion(KafkaExportClient.java:217)
at org.voltdb.export.processors.GuestProcessor.run(GuestProcessor.java:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at org.voltcore.utils.CoreUtils.run(CoreUtils.java:735)
at java.lang.Thread.run(Thread.java:745)
在 Kafka 端,我不断得到这个:
[2015-05-14 00:40:08,197] INFO Closing socket connection to /192.168.56.101. (kafka.network.Processor)
有什么建议吗?
这纯粹是Kafka设置的问题。在设置中,有一个注释设置:
advertised.host.name=something
只需要将"something"替换为Kafka所在服务器的IP地址运行即可。这是在