让 Spring-XD 和 hdfs sink 为 maprfs 工作
Getting Spring-XD and the hdfs sink to work for maprfs
这是关于 spring-xd release 1.0.1 与 maprfs 一起工作的问题,官方暂不支持。我仍然想让它工作。
这就是我们所做的:
1) 调整 xd-shell 和 xd-worker 和 xd-singlenode shell 脚本以接受参数 --hadoopDistro mapr
2) 将库添加到新目录 $XD_HOME/lib/mapr
avro-1.7.4.jar jersey-core-1.9.jar
hadoop-annotations-2.2.0.jar jersey-server-1.9.jar
hadoop-core-1.0.3-mapr-3.0.2.jar jetty-util-6.1.26.jar
hadoop-distcp-2.2.0.jar maprfs-1.0.3-mapr-3.0.2.jar
hadoop-hdfs-2.2.0.jar protobuf-java-2.5.0.jar
hadoop-mapreduce-client-core-2.2.0.jar spring-data-hadoop-2.0.2.RELEASE-hadoop24.jar
hadoop-streaming-2.2.0.jar spring-data-hadoop-batch-2.0.2.RELEASE-hadoop24.jar
hadoop-yarn-api-2.2.0.jar spring-data-hadoop-core-2.0.2.RELEASE-hadoop24.jar
hadoop-yarn-common-2.2.0.jar spring-data-hadoop-store-2.0.2.RELEASE-hadoop24.jar
3) 运行 bin/xd-singlenode --hadoopDistro mapr
和 shell/bin/xd-shell --hadoopDistro mapr
。
通过 stream create foo --definition "time | hdfs" --deploy
创建和部署流时,数据将写入 maprfs 上的文件 tmp/xd/foo/foo-1.txt.tmp。然而,当取消部署流时,出现以下异常:
org.springframework.data.hadoop.store.StoreException: Failed renaming from /xd/foo/foo-1.txt.tmp to /xd/foo/foo-1.txt; nested exception is java.io.FileNotFoundException: Requested file /xd/foo/foo-1.txt does not exist.
at org.springframework.data.hadoop.store.support.OutputStoreObjectSupport.renameFile(OutputStoreObjectSupport.java:261)
at org.springframework.data.hadoop.store.output.TextFileWriter.close(TextFileWriter.java:92)
at org.springframework.xd.integration.hadoop.outbound.HdfsDataStoreMessageHandler.doStop(HdfsDataStoreMessageHandler.java:58)
at org.springframework.xd.integration.hadoop.outbound.HdfsStoreMessageHandler.stop(HdfsStoreMessageHandler.java:94)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:201)
at com.sun.proxy.$Proxy120.stop(Unknown Source)
at org.springframework.integration.endpoint.EventDrivenConsumer.doStop(EventDrivenConsumer.java:64)
at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:100)
at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:115)
at org.springframework.integration.config.ConsumerEndpointFactoryBean.stop(ConsumerEndpointFactoryBean.java:303)
at org.springframework.context.support.DefaultLifecycleProcessor.doStop(DefaultLifecycleProcessor.java:229)
at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:51)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:363)
at org.springframework.context.support.DefaultLifecycleProcessor.stopBeans(DefaultLifecycleProcessor.java:202)
at org.springframework.context.support.DefaultLifecycleProcessor.stop(DefaultLifecycleProcessor.java:106)
at org.springframework.context.support.AbstractApplicationContext.stop(AbstractApplicationContext.java:1186)
at org.springframework.xd.module.core.SimpleModule.stop(SimpleModule.java:234)
at org.springframework.xd.dirt.module.ModuleDeployer.destroyModule(ModuleDeployer.java:132)
at org.springframework.xd.dirt.module.ModuleDeployer.handleUndeploy(ModuleDeployer.java:111)
at org.springframework.xd.dirt.module.ModuleDeployer.undeploy(ModuleDeployer.java:83)
at org.springframework.xd.dirt.server.ContainerRegistrar.undeployModule(ContainerRegistrar.java:261)
at org.springframework.xd.dirt.server.ContainerRegistrar$StreamModuleWatcher.process(ContainerRegistrar.java:884)
at org.apache.curator.framework.imps.NamespaceWatcher.process(NamespaceWatcher.java:67)
at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498)
Caused by: java.io.FileNotFoundException: Requested file /xd/foo/foo-1.txt does not exist.
at com.mapr.fs.MapRFileSystem.getMapRFileStatus(MapRFileSystem.java:805)
at com.mapr.fs.MapRFileSystem.delete(MapRFileSystem.java:629)
at org.springframework.data.hadoop.store.support.OutputStoreObjectSupport.renameFile(OutputStoreObjectSupport.java:258)
... 29 more
我查看了 OutputStoreObjectSupport.renameFile()
函数。当 hdfs 上的文件完成时,此方法会尝试将文件 /xd/foo/foo-1.txt.tmp 重命名为 xd/foo/foo1.txt。这是相关代码:
try {
FileSystem fs = path.getFileSystem(getConfiguration());
boolean succeed;
try {
fs.delete(toPath, false);
log.info("Renaming path=[" + path + "] toPath=[" + toPath + "]");
succeed = fs.rename(path, toPath);
} catch (Exception e) {
throw new StoreException("Failed renaming from " + path + " to " + toPath, e);
}
if (!succeed) {
throw new StoreException("Failed renaming from " + path + " to " + toPath + " because hdfs returned false");
}
}
当目标文件在hdfs上不存在时,调用fs.delete(toPath, false)
时maprfs似乎抛出异常。然而,在这种情况下抛出异常是没有意义的。我假设其他文件系统实现的行为不同,但这是我仍然需要验证的一点。不幸的是,我找不到 MapRFileSystem.java 的来源。这是闭源吗?这将帮助我更好地理解这个问题。有人有从 spring-xd 写入 maprfs 的经验吗?或者使用 spring-data-hadoop 在 maprfs 上重命名文件?
编辑
我设法在 spring XD 之外用一个简单的测试用例重现了这个问题(见下文)。请注意,仅当设置了 inWritingSuffix 或 inWritingPrefix 时才会抛出此异常。否则 spring-hadoop 将不会尝试重命名该文件。所以这对我来说仍然是一种不令人满意的解决方法:避免使用 inWritingPrefixes 和 inWritingSuffixes。
@ContextConfiguration("context.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class MaprfsSinkTest {
@Autowired
Configuration configuration;
@Autowired
FileSystem filesystem;
@Autowired
DataStoreWriter<String >storeWriter;
@Test
public void testRenameOnMaprfs() throws IOException, InterruptedException {
Path testPath = new Path("/tmp/foo.txt");
filesystem.delete(testPath, true);
TextFileWriter writer = new TextFileWriter(configuration, testPath, null);
writer.setInWritingSuffix("tmp");
writer.write("some entity");
writer.close();
}
@Test
public void testStoreWriter() throws IOException {
this.storeWriter.write("something");
}
}
我为 spring-hadoop 创建了一个支持 maprfs 的新分支:
https://github.com/blinse/spring-hadoop/tree/origin/2.0.2.RELEASE-mapr
构建此版本并使用生成的 jar 可以很好地与 hdfs 接收器配合使用。
这是关于 spring-xd release 1.0.1 与 maprfs 一起工作的问题,官方暂不支持。我仍然想让它工作。
这就是我们所做的:
1) 调整 xd-shell 和 xd-worker 和 xd-singlenode shell 脚本以接受参数 --hadoopDistro mapr
2) 将库添加到新目录 $XD_HOME/lib/mapr
avro-1.7.4.jar jersey-core-1.9.jar
hadoop-annotations-2.2.0.jar jersey-server-1.9.jar
hadoop-core-1.0.3-mapr-3.0.2.jar jetty-util-6.1.26.jar
hadoop-distcp-2.2.0.jar maprfs-1.0.3-mapr-3.0.2.jar
hadoop-hdfs-2.2.0.jar protobuf-java-2.5.0.jar
hadoop-mapreduce-client-core-2.2.0.jar spring-data-hadoop-2.0.2.RELEASE-hadoop24.jar
hadoop-streaming-2.2.0.jar spring-data-hadoop-batch-2.0.2.RELEASE-hadoop24.jar
hadoop-yarn-api-2.2.0.jar spring-data-hadoop-core-2.0.2.RELEASE-hadoop24.jar
hadoop-yarn-common-2.2.0.jar spring-data-hadoop-store-2.0.2.RELEASE-hadoop24.jar
3) 运行 bin/xd-singlenode --hadoopDistro mapr
和 shell/bin/xd-shell --hadoopDistro mapr
。
通过 stream create foo --definition "time | hdfs" --deploy
创建和部署流时,数据将写入 maprfs 上的文件 tmp/xd/foo/foo-1.txt.tmp。然而,当取消部署流时,出现以下异常:
org.springframework.data.hadoop.store.StoreException: Failed renaming from /xd/foo/foo-1.txt.tmp to /xd/foo/foo-1.txt; nested exception is java.io.FileNotFoundException: Requested file /xd/foo/foo-1.txt does not exist.
at org.springframework.data.hadoop.store.support.OutputStoreObjectSupport.renameFile(OutputStoreObjectSupport.java:261)
at org.springframework.data.hadoop.store.output.TextFileWriter.close(TextFileWriter.java:92)
at org.springframework.xd.integration.hadoop.outbound.HdfsDataStoreMessageHandler.doStop(HdfsDataStoreMessageHandler.java:58)
at org.springframework.xd.integration.hadoop.outbound.HdfsStoreMessageHandler.stop(HdfsStoreMessageHandler.java:94)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:201)
at com.sun.proxy.$Proxy120.stop(Unknown Source)
at org.springframework.integration.endpoint.EventDrivenConsumer.doStop(EventDrivenConsumer.java:64)
at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:100)
at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:115)
at org.springframework.integration.config.ConsumerEndpointFactoryBean.stop(ConsumerEndpointFactoryBean.java:303)
at org.springframework.context.support.DefaultLifecycleProcessor.doStop(DefaultLifecycleProcessor.java:229)
at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:51)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:363)
at org.springframework.context.support.DefaultLifecycleProcessor.stopBeans(DefaultLifecycleProcessor.java:202)
at org.springframework.context.support.DefaultLifecycleProcessor.stop(DefaultLifecycleProcessor.java:106)
at org.springframework.context.support.AbstractApplicationContext.stop(AbstractApplicationContext.java:1186)
at org.springframework.xd.module.core.SimpleModule.stop(SimpleModule.java:234)
at org.springframework.xd.dirt.module.ModuleDeployer.destroyModule(ModuleDeployer.java:132)
at org.springframework.xd.dirt.module.ModuleDeployer.handleUndeploy(ModuleDeployer.java:111)
at org.springframework.xd.dirt.module.ModuleDeployer.undeploy(ModuleDeployer.java:83)
at org.springframework.xd.dirt.server.ContainerRegistrar.undeployModule(ContainerRegistrar.java:261)
at org.springframework.xd.dirt.server.ContainerRegistrar$StreamModuleWatcher.process(ContainerRegistrar.java:884)
at org.apache.curator.framework.imps.NamespaceWatcher.process(NamespaceWatcher.java:67)
at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498)
Caused by: java.io.FileNotFoundException: Requested file /xd/foo/foo-1.txt does not exist.
at com.mapr.fs.MapRFileSystem.getMapRFileStatus(MapRFileSystem.java:805)
at com.mapr.fs.MapRFileSystem.delete(MapRFileSystem.java:629)
at org.springframework.data.hadoop.store.support.OutputStoreObjectSupport.renameFile(OutputStoreObjectSupport.java:258)
... 29 more
我查看了 OutputStoreObjectSupport.renameFile()
函数。当 hdfs 上的文件完成时,此方法会尝试将文件 /xd/foo/foo-1.txt.tmp 重命名为 xd/foo/foo1.txt。这是相关代码:
try {
FileSystem fs = path.getFileSystem(getConfiguration());
boolean succeed;
try {
fs.delete(toPath, false);
log.info("Renaming path=[" + path + "] toPath=[" + toPath + "]");
succeed = fs.rename(path, toPath);
} catch (Exception e) {
throw new StoreException("Failed renaming from " + path + " to " + toPath, e);
}
if (!succeed) {
throw new StoreException("Failed renaming from " + path + " to " + toPath + " because hdfs returned false");
}
}
当目标文件在hdfs上不存在时,调用fs.delete(toPath, false)
时maprfs似乎抛出异常。然而,在这种情况下抛出异常是没有意义的。我假设其他文件系统实现的行为不同,但这是我仍然需要验证的一点。不幸的是,我找不到 MapRFileSystem.java 的来源。这是闭源吗?这将帮助我更好地理解这个问题。有人有从 spring-xd 写入 maprfs 的经验吗?或者使用 spring-data-hadoop 在 maprfs 上重命名文件?
编辑
我设法在 spring XD 之外用一个简单的测试用例重现了这个问题(见下文)。请注意,仅当设置了 inWritingSuffix 或 inWritingPrefix 时才会抛出此异常。否则 spring-hadoop 将不会尝试重命名该文件。所以这对我来说仍然是一种不令人满意的解决方法:避免使用 inWritingPrefixes 和 inWritingSuffixes。
@ContextConfiguration("context.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class MaprfsSinkTest {
@Autowired
Configuration configuration;
@Autowired
FileSystem filesystem;
@Autowired
DataStoreWriter<String >storeWriter;
@Test
public void testRenameOnMaprfs() throws IOException, InterruptedException {
Path testPath = new Path("/tmp/foo.txt");
filesystem.delete(testPath, true);
TextFileWriter writer = new TextFileWriter(configuration, testPath, null);
writer.setInWritingSuffix("tmp");
writer.write("some entity");
writer.close();
}
@Test
public void testStoreWriter() throws IOException {
this.storeWriter.write("something");
}
}
我为 spring-hadoop 创建了一个支持 maprfs 的新分支:
https://github.com/blinse/spring-hadoop/tree/origin/2.0.2.RELEASE-mapr
构建此版本并使用生成的 jar 可以很好地与 hdfs 接收器配合使用。