使用 Kerberos 连接到 Mapper 内的 Accumulo

Connecting to Accumulo inside a Mapper using Kerberos

我正在将一些软件从较旧的 Hadoop 集群(使用 username/password 身份验证)移动到较新的 2.6.0-cdh5。 12.0 已启用 Kerberos 身份验证

我已经能够使用 DelegationToken 中设置的 DelegationToken 获得许多使用 Accumulo 作为其输入 and/or 输出的现有 Map/Reduce 作业AccumuloInput/OutputFormat 类.

但是,我有 1 个工作,它使用 AccumuloInput/OutputFormat 作为输入和输出,而且在其 Mapper.setup() 方法中,它通过 Zookeeper 连接到 Accumulo,因此在 Mapper.map() 方法,它可以将 Mapper.map() 中正在处理的每个 key/value 与 另一个 Accumulo table.[= 中的条目进行比较11=]

我在下面包含了相关代码,它显示了连接到 Zookeeper 用户 PasswordToken 的 setup() 方法,然后创建了一个 Accumulo table 扫描器,然后在映射器方法中使用它。

所以问题是如何用 KerberosToken 替换 PasswordToken 的使用,以便在 Mapper.setup() 方法中设置 Accumulo 扫描器?我找不到 "get" 我设置的 AccumuloInput/OutputFormat 类 使用的 DelegationToken 的方法。

我已尝试 context.getCredentials().getAllTokens() 并寻找类型为 org.apache.accumulo.code.client.security.tokens.AuthenticationToken 的标记——此处返回的所有标记均为类型 org.apache.hadoop.security.token.Token.

请注意,我在 cut/paste 中输入了代码片段,因为代码在未连接到 Internet 的网络上运行 - 即可能存在拼写错误。 :)

//****************************
// code in the M/R driver
//****************************
ClientConfiguration accumuloCfg = ClientConfiguration.loadDefault().withInstance("Accumulo1").withZkHosts("zookeeper1");
ZooKeeperInstance inst = new ZooKeeperInstance(accumuloCfg);
AuthenticationToken dt = conn.securityOperations().getDelegationToken(new DelagationTokenConfig());
AccumuloInputFormat.setConnectorInfo(job, username, dt);
AccumuloOutputFormat.setConnectorInfo(job, username, dt);
// other job setup and then
job.waitForCompletion(true)



//****************************
// this is inside the Mapper class of the M/R job
//****************************
private Scanner index_scanner;

public void setup(Context context) {
    Configuration cfg = context.getConfiguration();

    // properties set and passed from M/R Driver program
    String username = cfg.get("UserName");
    String password = cfg.get("Password");
    String accumuloInstName = cfg.get("InstanceName");
    String zookeepers = cfg.get("Zookeepers");
    String tableName = cfg.get("TableName");
    Instance inst = new ZooKeeperInstance(accumuloInstName, zookeepers);
    try {
      AuthenticationToken passwordToken = new PasswordToken(password);

      Connector conn = inst.getConnector(username, passwordToken);

      index_scanner = conn.createScanner(tableName, conn.securityOperations().getUserAuthorizations(username));
    } catch(Exception e) {
       e.printStackTrace();
    }
}

public void map(Key key, Value value, Context context) throws IOException, InterruptedException {
    String uuid = key.getRow().toString();
    index_scanner.clearColumns();
    index_scanner.setRange(Range.exact(uuid));
    for(Entry<Key, Value> entry : index_scanner) {
        // do some processing in here
    }
}

提供AccumuloInputFormat and AccumuloOutputFormat have a method to set the token in the job configuration with the Accumulo*putFormat.setConnectorInfo(job, principle, token). You can also serialize the token in a file in HDFS, using the AuthenticationTokenSerializer并使用接受文件名的setConnectorInfo方法的版本。

如果传入 KerberosToken,作业将创建要使用的 DelegationToken,如果传入 DelegationToken,它只会使用它。

所提供的 AccumuloInputFormat 应该处理它自己的扫描器,因此通常情况下,如果您正确设置了配置,则不必在 Mapper 中执行此操作。但是,如果您在 Mapper 内部进行二次扫描(例如连接),您可以检查所提供的 AccumuloInputFormat 的 RecordReader 源代码,以获取有关如何检索配置和构建扫描器的示例。