在 HDFS 中创建文件但不附加任何内容
Creating file in HDFS but not appending any content
我正在使用 HTTP 源将 JSON 文件放入 HDFS(单节点 SANDBOX)。
文件创建在正确的目录中,但是文件中没有附加任何内容。在我开始调试 HTTP 源之前,你能验证我的 flume.conf 吗?
#################################################################
# Name the components on this agent
#################################################################
hdfs-agent.sources = httpsource
hdfs-agent.sinks = hdfssink
hdfs-agent.channels = channel1
#################################################################
# Describe source
#################################################################
# Source node
hdfs-agent.sources.httpsource.type = http
hdfs-agent.sources.httpsource.port = 5140
hdfs-agent.sources.httpsource.handler = org.apache.flume.source.http.JSONHandler
#################################################################
# Describe Sink
#################################################################
# Sink hdfs
hdfs-agent.sinks.hdfssink.type = hdfs
hdfs-agent.sinks.hdfssink.hdfs.path = hdfs://sandbox:8020/user/flume/node
hdfs-agent.sinks.hdfssink.hdfs.fileType = DataStream
hdfs-agent.sinks.hdfssink.hdfs.batchSize = 1
hdfs-agent.sinks.hdfssink.hdfs.rollSize = 0
hdfs-agent.sinks.hdfssink.hdfs.rollCount = 0
#################################################################
# Describe channel
#################################################################
# Channel memory
hdfs-agent.channels.channel1.type = memory
hdfs-agent.channels.channel1.capacity = 1000
hdfs-agent.channels.channel1.transactionCapacity = 100
#################################################################
# Bind the source and sink to the channel
#################################################################
hdfs-agent.sources.httpsource.channels = channel1
hdfs-agent.sinks.hdfssink.channel = channel1
我目前只是尝试从小处着手进行测试:
[{"text": "Hi Flume this Node"}]
所以我认为我的 batchSize/rollSize/rollCount 可能是这里的问题?
batchSize、rollSize、rollCount 值都可以。
将 rollSize 和 rollCount 设置为 0 将禁用文件滚动功能。
hdfs-agent.sources.httpsource.type 应设置为 org.apache.flume.source.http.HTTPSource
发送到http源的数据格式应该是
[{"headers" : {"a":"b", "c":"d"},"body": "random_body"}, {"headers" : {"e": "f"},"body": "random_body2"}].
我使用您使用的数据测试了发送 ([{"text": "Hi Flume this Node"}])。由于没有 "body" 属性,因此没有任何内容附加到我的文件中。但是当我发布以下内容时,数据已附加到我的文件中。
curl -X POST -H 'Content-Type: application/json; charset=UTF-8' -d '[{ "headers" : { "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1" }, "body" : "random_body" }]' http://localhost:5140.
希望对您有所帮助
正如 arathim 指出的那样 org.apache.flume.source.http.JSONHandler 期望 Flume 事件格式。如果您想假脱机自己的 JSON,则需要创建自己的处理程序。这是一个 Handler 的例子,它接受任何 JSON:
public class GenericJSONInputHandler implements HTTPSourceHandler {
protected static final String TIMESTAMP = "timestamp";
private static final Logger LOG = LoggerFactory.getLogger(GenericJSONInputHandler.class);
protected static final String TYPE = "type";
public GenericJSONInputHandler() {
}
/**
* {@inheritDoc}
*/
@Override
public List<Event> getEvents(HttpServletRequest request) throws Exception {
BufferedReader reader = request.getReader();
String charset = request.getCharacterEncoding();
// UTF-8 is default for JSON. If no charset is specified, UTF-8 is to
// be assumed.
if (charset == null) {
LOG.debug("Charset is null, default charset of UTF-8 should be used.");
}
List<Event> eventList = new ArrayList<Event>(0);
try {
String json = reader.readLine();
LOG.debug("Received line with size " + json.length());
while (json != null) {
List<Event> e = createEvents(json);
if (e !=null) {
eventList.addAll(e);
}
json = reader.readLine();
}
}
catch (Exception ex) {
throw new HTTPBadRequestException("Request has invalid JSON Syntax.", ex);
}
return eventList;
}
protected List<Event> createEvents(String json) {
try {
if (isValidJSON(json)) {
Map<String, String> headers = new HashMap<>();
headers.put(TIMESTAMP, String.valueOf(System.currentTimeMillis()));
headers.put(TYPE, "default");
return Arrays.asList(EventBuilder.withBody(json.getBytes(), headers));
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public boolean isValidJSON(final String json) {
boolean valid = false;
try {
final JsonParser parser = new ObjectMapper().getFactory()
.createParser(json);
while (parser.nextToken() != null) {
}
valid = true;
}
catch (JsonParseException jpe) {
jpe.printStackTrace();
}
catch (IOException ioe) {
ioe.printStackTrace();
}
return valid;
}
@Override
public void configure(Context context) {
}
}
我正在使用 HTTP 源将 JSON 文件放入 HDFS(单节点 SANDBOX)。
文件创建在正确的目录中,但是文件中没有附加任何内容。在我开始调试 HTTP 源之前,你能验证我的 flume.conf 吗?
#################################################################
# Name the components on this agent
#################################################################
hdfs-agent.sources = httpsource
hdfs-agent.sinks = hdfssink
hdfs-agent.channels = channel1
#################################################################
# Describe source
#################################################################
# Source node
hdfs-agent.sources.httpsource.type = http
hdfs-agent.sources.httpsource.port = 5140
hdfs-agent.sources.httpsource.handler = org.apache.flume.source.http.JSONHandler
#################################################################
# Describe Sink
#################################################################
# Sink hdfs
hdfs-agent.sinks.hdfssink.type = hdfs
hdfs-agent.sinks.hdfssink.hdfs.path = hdfs://sandbox:8020/user/flume/node
hdfs-agent.sinks.hdfssink.hdfs.fileType = DataStream
hdfs-agent.sinks.hdfssink.hdfs.batchSize = 1
hdfs-agent.sinks.hdfssink.hdfs.rollSize = 0
hdfs-agent.sinks.hdfssink.hdfs.rollCount = 0
#################################################################
# Describe channel
#################################################################
# Channel memory
hdfs-agent.channels.channel1.type = memory
hdfs-agent.channels.channel1.capacity = 1000
hdfs-agent.channels.channel1.transactionCapacity = 100
#################################################################
# Bind the source and sink to the channel
#################################################################
hdfs-agent.sources.httpsource.channels = channel1
hdfs-agent.sinks.hdfssink.channel = channel1
我目前只是尝试从小处着手进行测试:
[{"text": "Hi Flume this Node"}]
所以我认为我的 batchSize/rollSize/rollCount 可能是这里的问题?
batchSize、rollSize、rollCount 值都可以。 将 rollSize 和 rollCount 设置为 0 将禁用文件滚动功能。
hdfs-agent.sources.httpsource.type 应设置为 org.apache.flume.source.http.HTTPSource
发送到http源的数据格式应该是
[{"headers" : {"a":"b", "c":"d"},"body": "random_body"}, {"headers" : {"e": "f"},"body": "random_body2"}].
我使用您使用的数据测试了发送 ([{"text": "Hi Flume this Node"}])。由于没有 "body" 属性,因此没有任何内容附加到我的文件中。但是当我发布以下内容时,数据已附加到我的文件中。
curl -X POST -H 'Content-Type: application/json; charset=UTF-8' -d '[{ "headers" : { "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1" }, "body" : "random_body" }]' http://localhost:5140.
希望对您有所帮助
正如 arathim 指出的那样 org.apache.flume.source.http.JSONHandler 期望 Flume 事件格式。如果您想假脱机自己的 JSON,则需要创建自己的处理程序。这是一个 Handler 的例子,它接受任何 JSON:
public class GenericJSONInputHandler implements HTTPSourceHandler {
protected static final String TIMESTAMP = "timestamp";
private static final Logger LOG = LoggerFactory.getLogger(GenericJSONInputHandler.class);
protected static final String TYPE = "type";
public GenericJSONInputHandler() {
}
/**
* {@inheritDoc}
*/
@Override
public List<Event> getEvents(HttpServletRequest request) throws Exception {
BufferedReader reader = request.getReader();
String charset = request.getCharacterEncoding();
// UTF-8 is default for JSON. If no charset is specified, UTF-8 is to
// be assumed.
if (charset == null) {
LOG.debug("Charset is null, default charset of UTF-8 should be used.");
}
List<Event> eventList = new ArrayList<Event>(0);
try {
String json = reader.readLine();
LOG.debug("Received line with size " + json.length());
while (json != null) {
List<Event> e = createEvents(json);
if (e !=null) {
eventList.addAll(e);
}
json = reader.readLine();
}
}
catch (Exception ex) {
throw new HTTPBadRequestException("Request has invalid JSON Syntax.", ex);
}
return eventList;
}
protected List<Event> createEvents(String json) {
try {
if (isValidJSON(json)) {
Map<String, String> headers = new HashMap<>();
headers.put(TIMESTAMP, String.valueOf(System.currentTimeMillis()));
headers.put(TYPE, "default");
return Arrays.asList(EventBuilder.withBody(json.getBytes(), headers));
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public boolean isValidJSON(final String json) {
boolean valid = false;
try {
final JsonParser parser = new ObjectMapper().getFactory()
.createParser(json);
while (parser.nextToken() != null) {
}
valid = true;
}
catch (JsonParseException jpe) {
jpe.printStackTrace();
}
catch (IOException ioe) {
ioe.printStackTrace();
}
return valid;
}
@Override
public void configure(Context context) {
}
}