org.apache.hadoop.mapred.lip.input.invalidInputException

org.apache.hadoop.mapred.lip.input.invalidInputException

i 运行 此代码在 power shell 中遵循此 tutorial 中 pwer shell 的步骤和命令。 我只是将名称从 WordCount 更改为 Matrix。 所有步骤都工作正常,但我在 运行 Azure PowerShell 脚本后收到此错误:

exception in thread main org.apache.hadoop.mapred.lip.input.invalidInputException:input path does not exist

代码

import java.io.IOException;
import java.util.*;
 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
public class OneStepMatrixMultiplication {
 
    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            int m = Integer.parseInt(conf.get("m"));
            int p = Integer.parseInt(conf.get("p"));
            String line = value.toString();
            String[] indicesAndValue = line.split(",");
            Text outputKey = new Text();
            Text outputValue = new Text();
            if (indicesAndValue[0].equals("A")) {
                for (int k = 0; k < p; k++) {
                    outputKey.set(indicesAndValue[1] + "," + k);
                    outputValue.set("A," + indicesAndValue[2] + "," + indicesAndValue[3]);
                    context.write(outputKey, outputValue);
                }
            } else {
                for (int i = 0; i < m; i++) {
                    outputKey.set(i + "," + indicesAndValue[2]);
                    outputValue.set("B," + indicesAndValue[1] + "," + indicesAndValue[3]);
                    context.write(outputKey, outputValue);
                }
            }
        }
    }
 
    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String[] value;
            HashMap<Integer, Float> hashA = new HashMap<Integer, Float>();
            HashMap<Integer, Float> hashB = new HashMap<Integer, Float>();
            for (Text val : values) {
                value = val.toString().split(",");
                if (value[0].equals("A")) {
                    hashA.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
                } else {
                    hashB.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
                }
            }
            int n = Integer.parseInt(context.getConfiguration().get("n"));
            float result = 0.0f;
            float a_ij;
            float b_jk;
            for (int j = 0; j < n; j++) {
                a_ij = hashA.containsKey(j) ? hashA.get(j) : 0.0f;
                b_jk = hashB.containsKey(j) ? hashB.get(j) : 0.0f;
                result += a_ij * b_jk;
            }
            if (result != 0.0f) {
                context.write(null, new Text(key.toString() + "," + Float.toString(result)));
            }
        }
    }
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // A is an m-by-n matrix; B is an n-by-p matrix.
        conf.set("m", "2");
        conf.set("n", "5");
        conf.set("p", "3");
 
        Job job = new Job(conf, "MatrixMatrixMultiplicationOneStep");
        job.setJarByClass(OneStepMatrixMultiplication.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
 
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
 
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
       System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Th脚本文件代码

# The Storage account and the HDInsight cluster variables
$subscriptionName = "<AzureSubscriptionName>"
$stringPrefix = "<StringForPrefix>"
$location = "<MicrosoftDataCenter>"     ### Must match the data Storage account location
$clusterNodes = <NumberOFNodesInTheCluster>

$storageAccountName_Data = "<TheDataStorageAccountName>"
$containerName_Data = "<TheDataBlobStorageContainerName>"

$clusterName = $stringPrefix + "hdicluster"

$storageAccountName_Default = $stringPrefix + "hdistore"
$containerName_Default =  $stringPrefix + "hdicluster"

# The MapReduce job variables
$jarFile = "wasb://$containerName_Data@$storageAccountName_Data.blob.core.windows.net/WordCount/jars/WordCount.jar"
$className = "org.apache.hadoop.examples.WordCount"
$mrInput = "wasb://$containerName_Data@$storageAccountName_Data.blob.core.windows.net/WordCount/Input/"
$mrOutput = "wasb://$containerName_Data@$storageAccountName_Data.blob.core.windows.net/WordCount/Output/"
$mrStatusOutput = "wasb://$containerName_Data@$storageAccountName_Data.blob.core.windows.net/WordCount/MRStatusOutput/"

# Create a PSCredential object. The user name and password are hardcoded here. You can change them if you want.
$password = ConvertTo-SecureString "Pass@word1" -AsPlainText -Force
$creds = New-Object System.Management.Automation.PSCredential ("Admin", $password)

Select-AzureSubscription $subscriptionName

#=============================
# Create a Storage account used as the default file system
Write-Host "Create a storage account" -ForegroundColor Green
New-AzureStorageAccount -StorageAccountName $storageAccountName_Default -location $location

#=============================
# Create a Blob storage container used as the default file system
Write-Host "Create a Blob storage container" -ForegroundColor Green
$storageAccountKey_Default = Get-AzureStorageKey $storageAccountName_Default | %{ $_.Primary }
$destContext = New-AzureStorageContext –StorageAccountName $storageAccountName_Default –StorageAccountKey $storageAccountKey_Default

New-AzureStorageContainer -Name $containerName_Default -Context $destContext

#=============================
# Create an HDInsight cluster
Write-Host "Create an HDInsight cluster" -ForegroundColor Green
$storageAccountKey_Data = Get-AzureStorageKey $storageAccountName_Data | %{ $_.Primary }

$config = New-AzureHDInsightClusterConfig -ClusterSizeInNodes $clusterNodes |
    Set-AzureHDInsightDefaultStorage -StorageAccountName "$storageAccountName_Default.blob.core.windows.net" -StorageAccountKey $storageAccountKey_Default -StorageContainerName $containerName_Default |
    Add-AzureHDInsightStorage -StorageAccountName "$storageAccountName_Data.blob.core.windows.net" -StorageAccountKey $storageAccountKey_Data

New-AzureHDInsightCluster -Name $clusterName -Location $location -Credential $creds -Config $config

#=============================
# Create a MapReduce job definition
Write-Host "Create a MapReduce job definition" -ForegroundColor Green
$mrJobDef = New-AzureHDInsightMapReduceJobDefinition -JobName mrWordCountJob  -JarFile $jarFile -ClassName $className -Arguments $mrInput, $mrOutput -StatusFolder /WordCountStatus

#=============================
# Run the MapReduce job
Write-Host "Run the MapReduce job" -ForegroundColor Green
$mrJob = Start-AzureHDInsightJob -Cluster $clusterName -JobDefinition $mrJobDef
Wait-AzureHDInsightJob -Job $mrJob -WaitTimeoutInSeconds 3600

Get-AzureHDInsightJobOutput -Cluster $clusterName -JobId $mrJob.JobId -StandardError
Get-AzureHDInsightJobOutput -Cluster $clusterName -JobId $mrJob.JobId -StandardOutput

#=============================
# Delete the HDInsight cluster
Write-Host "Delete the HDInsight cluster" -ForegroundColor Green
Remove-AzureHDInsightCluster -Name $clusterName  

# Delete the default file system Storage account
Write-Host "Delete the storage account" -ForegroundColor Green
Remove-AzureStorageAccount -StorageAccountName $storageAccountName_Default

根据我的理解,我认为您想在 Azure HDInsight 中计算矩阵乘法。您可以 运行 您的代码在 HDInsight 模拟器中成功,但在 Azure 上的 HDInsigit 中失败。

Azure HDInsight的HDFS上的文件路径是直接使用基于blob容器的相对路径作为根路径,如果远程进入集群,没有主机信息,如wasb:///examples/data/....

因此您可以尝试远程进入 HDInsight 集群,运行 远程 ssh 中的代码 Linux 或 cmd 中的代码 Windows,然后按照以下步骤操作。

  1. 将 mapreduce jar 文件和数据文件复制到 HDInsight 群集中。例如 Linux 上的 Hadoop,您可以命令 scp <your-file> <ssh-username>@<hdcluster-name>-ssh.azurehdinsight.net:/home/<hdcluster-username>/.
  2. 在 HDInsight 文件系统中创建目录,命令 hadoop fs -mkdir wasb:///<dir-name>/
  3. 将您的 mapreduce jar 文件复制到 hadoop fs -cp <your jar file>wasb:///<dir-name>/jars/ 中,就像 HDInsight 上的默认示例一样。

或者您也可以参考https://azure.microsoft.com/en-us/documentation/articles/hdinsight-upload-data/将文件上传到HDInsight来代替上面的三个步骤。

  1. 像 HDInsight 上的默认示例一样,将您的数据文件复制到 hadoop fs -cp <your data file> wasb:///<dir-name>/data/input/
  2. 命令 hadoop jar wasb:///<dir-name>/jars/<your jar file name>.jar <your class name> wasb:///<dir-name>/data/input/<your data file> wasbL///<dir-name>/data/output 到 运行 你的代码
  3. 等待作业完成,然后命令hadoop fs -cat wasb:///<dir-name>/data/output/*显示结果。

如果是在Linux上创建的HDInsight Cluster,可以参考https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hadoop-use-mapreduce-ssh/在Azure新门户上找到ssh登录信息,如下图

如果是在Windows上创建的HDInsight集群,可以参考https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hadoop-use-mapreduce-remote-desktop/,找到远程桌面信息,如上图,Remote Desktop而不是Secure Shell.

如果您想查看代码的结果,您也可以在 Azure 新门户上找到它,请参见下图。