使用 Kerberos 连接到 Mapper 内的 Accumulo
Connecting to Accumulo inside a Mapper using Kerberos
我正在将一些软件从较旧的 Hadoop 集群(使用 username/password 身份验证)移动到较新的 2.6.0-cdh5。 12.0 已启用 Kerberos 身份验证。
我已经能够使用 DelegationToken 中设置的 DelegationToken 获得许多使用 Accumulo 作为其输入 and/or 输出的现有 Map/Reduce 作业AccumuloInput/OutputFormat 类.
但是,我有 1 个工作,它使用 AccumuloInput/OutputFormat 作为输入和输出,而且在其 Mapper.setup() 方法中,它通过 Zookeeper 连接到 Accumulo,因此在 Mapper.map() 方法,它可以将 Mapper.map() 中正在处理的每个 key/value 与 另一个 Accumulo table.[= 中的条目进行比较11=]
我在下面包含了相关代码,它显示了连接到 Zookeeper 用户 PasswordToken 的 setup() 方法,然后创建了一个 Accumulo table 扫描器,然后在映射器方法中使用它。
所以问题是如何用 KerberosToken 替换 PasswordToken 的使用,以便在 Mapper.setup() 方法中设置 Accumulo 扫描器?我找不到 "get" 我设置的 AccumuloInput/OutputFormat 类 使用的 DelegationToken 的方法。
我已尝试 context.getCredentials().getAllTokens() 并寻找类型为 org.apache.accumulo.code.client.security.tokens.AuthenticationToken 的标记——此处返回的所有标记均为类型 org.apache.hadoop.security.token.Token.
请注意,我在 cut/paste 中输入了代码片段,因为代码在未连接到 Internet 的网络上运行 - 即可能存在拼写错误。 :)
//****************************
// code in the M/R driver
//****************************
ClientConfiguration accumuloCfg = ClientConfiguration.loadDefault().withInstance("Accumulo1").withZkHosts("zookeeper1");
ZooKeeperInstance inst = new ZooKeeperInstance(accumuloCfg);
AuthenticationToken dt = conn.securityOperations().getDelegationToken(new DelagationTokenConfig());
AccumuloInputFormat.setConnectorInfo(job, username, dt);
AccumuloOutputFormat.setConnectorInfo(job, username, dt);
// other job setup and then
job.waitForCompletion(true)
//****************************
// this is inside the Mapper class of the M/R job
//****************************
private Scanner index_scanner;
public void setup(Context context) {
Configuration cfg = context.getConfiguration();
// properties set and passed from M/R Driver program
String username = cfg.get("UserName");
String password = cfg.get("Password");
String accumuloInstName = cfg.get("InstanceName");
String zookeepers = cfg.get("Zookeepers");
String tableName = cfg.get("TableName");
Instance inst = new ZooKeeperInstance(accumuloInstName, zookeepers);
try {
AuthenticationToken passwordToken = new PasswordToken(password);
Connector conn = inst.getConnector(username, passwordToken);
index_scanner = conn.createScanner(tableName, conn.securityOperations().getUserAuthorizations(username));
} catch(Exception e) {
e.printStackTrace();
}
}
public void map(Key key, Value value, Context context) throws IOException, InterruptedException {
String uuid = key.getRow().toString();
index_scanner.clearColumns();
index_scanner.setRange(Range.exact(uuid));
for(Entry<Key, Value> entry : index_scanner) {
// do some processing in here
}
}
提供AccumuloInputFormat and AccumuloOutputFormat have a method to set the token in the job configuration with the Accumulo*putFormat.setConnectorInfo(job, principle, token)
. You can also serialize the token in a file in HDFS, using the AuthenticationTokenSerializer并使用接受文件名的setConnectorInfo
方法的版本。
如果传入 KerberosToken,作业将创建要使用的 DelegationToken,如果传入 DelegationToken,它只会使用它。
所提供的 AccumuloInputFormat
应该处理它自己的扫描器,因此通常情况下,如果您正确设置了配置,则不必在 Mapper 中执行此操作。但是,如果您在 Mapper 内部进行二次扫描(例如连接),您可以检查所提供的 AccumuloInputFormat
的 RecordReader 源代码,以获取有关如何检索配置和构建扫描器的示例。
我正在将一些软件从较旧的 Hadoop 集群(使用 username/password 身份验证)移动到较新的 2.6.0-cdh5。 12.0 已启用 Kerberos 身份验证。
我已经能够使用 DelegationToken 中设置的 DelegationToken 获得许多使用 Accumulo 作为其输入 and/or 输出的现有 Map/Reduce 作业AccumuloInput/OutputFormat 类.
但是,我有 1 个工作,它使用 AccumuloInput/OutputFormat 作为输入和输出,而且在其 Mapper.setup() 方法中,它通过 Zookeeper 连接到 Accumulo,因此在 Mapper.map() 方法,它可以将 Mapper.map() 中正在处理的每个 key/value 与 另一个 Accumulo table.[= 中的条目进行比较11=]
我在下面包含了相关代码,它显示了连接到 Zookeeper 用户 PasswordToken 的 setup() 方法,然后创建了一个 Accumulo table 扫描器,然后在映射器方法中使用它。
所以问题是如何用 KerberosToken 替换 PasswordToken 的使用,以便在 Mapper.setup() 方法中设置 Accumulo 扫描器?我找不到 "get" 我设置的 AccumuloInput/OutputFormat 类 使用的 DelegationToken 的方法。
我已尝试 context.getCredentials().getAllTokens() 并寻找类型为 org.apache.accumulo.code.client.security.tokens.AuthenticationToken 的标记——此处返回的所有标记均为类型 org.apache.hadoop.security.token.Token.
请注意,我在 cut/paste 中输入了代码片段,因为代码在未连接到 Internet 的网络上运行 - 即可能存在拼写错误。 :)
//****************************
// code in the M/R driver
//****************************
ClientConfiguration accumuloCfg = ClientConfiguration.loadDefault().withInstance("Accumulo1").withZkHosts("zookeeper1");
ZooKeeperInstance inst = new ZooKeeperInstance(accumuloCfg);
AuthenticationToken dt = conn.securityOperations().getDelegationToken(new DelagationTokenConfig());
AccumuloInputFormat.setConnectorInfo(job, username, dt);
AccumuloOutputFormat.setConnectorInfo(job, username, dt);
// other job setup and then
job.waitForCompletion(true)
//****************************
// this is inside the Mapper class of the M/R job
//****************************
private Scanner index_scanner;
public void setup(Context context) {
Configuration cfg = context.getConfiguration();
// properties set and passed from M/R Driver program
String username = cfg.get("UserName");
String password = cfg.get("Password");
String accumuloInstName = cfg.get("InstanceName");
String zookeepers = cfg.get("Zookeepers");
String tableName = cfg.get("TableName");
Instance inst = new ZooKeeperInstance(accumuloInstName, zookeepers);
try {
AuthenticationToken passwordToken = new PasswordToken(password);
Connector conn = inst.getConnector(username, passwordToken);
index_scanner = conn.createScanner(tableName, conn.securityOperations().getUserAuthorizations(username));
} catch(Exception e) {
e.printStackTrace();
}
}
public void map(Key key, Value value, Context context) throws IOException, InterruptedException {
String uuid = key.getRow().toString();
index_scanner.clearColumns();
index_scanner.setRange(Range.exact(uuid));
for(Entry<Key, Value> entry : index_scanner) {
// do some processing in here
}
}
提供AccumuloInputFormat and AccumuloOutputFormat have a method to set the token in the job configuration with the Accumulo*putFormat.setConnectorInfo(job, principle, token)
. You can also serialize the token in a file in HDFS, using the AuthenticationTokenSerializer并使用接受文件名的setConnectorInfo
方法的版本。
如果传入 KerberosToken,作业将创建要使用的 DelegationToken,如果传入 DelegationToken,它只会使用它。
所提供的 AccumuloInputFormat
应该处理它自己的扫描器,因此通常情况下,如果您正确设置了配置,则不必在 Mapper 中执行此操作。但是,如果您在 Mapper 内部进行二次扫描(例如连接),您可以检查所提供的 AccumuloInputFormat
的 RecordReader 源代码,以获取有关如何检索配置和构建扫描器的示例。