Oozie 日期时间开始
Oozie date time start
我的 flume.config 上有一个自定义来源 运行,负责每小时从 Facebook 页面提取数据。
我想知道是否有任何方法可以将提取时间设置为我的协调器的开始时间?
例如,我将我的协调器设置为在 2015 年 1 月 1 日凌晨 12 点开始,然后我的 flume 同时开始提取。
这是我的自定义来源:
public class FacebookPageFansCitySource extends AbstractPollableSource
{
private String accessToken;
private String pageId;
private int refreshInterval;
private FacebookClient facebookClient;
private volatile long lastPoll = 0;
@Override
protected void doConfigure(Context context) throws FlumeException
{
this.accessToken = context.getString(FacebookSourceConstants.ACCESS_TOKEN_KEY);
this.pageId = context.getString(FacebookSourceConstants.PAGE_ID_KEY);
this.refreshInterval = context.getInteger(FacebookSourceConstants.REFRESH_INTERVAL_KEY);
facebookClient = new DefaultFacebookClient(accessToken, Version.VERSION_2_2);
}
@Override
protected void doStart() throws FlumeException
{
}
@Override
protected void doStop() throws FlumeException
{
}
@Override
protected Status doProcess() throws EventDeliveryException
{
Status status = Status.BACKOFF;
if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - lastPoll) > refreshInterval)
{
lastPoll = System.currentTimeMillis();
try
{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
Date date = new Date(lastPoll);
String dateFormatted = simpleDateFormat.format(date);
final Map<String, String> headers = new HashMap<String, String>();
headers.put("timestamp", String.valueOf(date.getTime()));
Insight insight = getInsight(pageId, "page_fans_city");
if (insight != null)
{
final List<Event> events = new ArrayList<Event>();
ChannelProcessor channelProcessor = getChannelProcessor();
List<JsonObject> values = insight.getValues();
for (JsonObject value : values)
{
String referenceDate = simpleDateFormat.format(DateUtils.toDateFromLongFormat(value.getString("end_time")));
JsonObject jsonObjectValue = value.getJsonObject("value");
for (Iterator<?> keys = jsonObjectValue.keys(); keys.hasNext(); )
{
String key = (String) keys.next();
Long count = jsonObjectValue.getLong(key);
JsonObject jsonObject = new JsonObject();
jsonObject.put("reference_date", referenceDate);
jsonObject.put("city", key);
jsonObject.put("count", count);
jsonObject.put("poll_time", dateFormatted);
Event event = EventBuilder.withBody(jsonObject.toString().getBytes(), headers);
events.add(event);
}
}
channelProcessor.processEventBatch(events);
}
status = Status.READY;
}
catch (Exception e)
{
Logger.getLogger(FacebookPageFansCitySource.class.getName()).log(Level.SEVERE, null, e);
}
}
return status;
}
private Insight getInsight(String objectId, String metric)
{
TimeZone timeZone = TimeZone.getTimeZone("UTC");
Calendar calendar = Calendar.getInstance(timeZone);
calendar.add(Calendar.DAY_OF_MONTH, -4);
Parameter parameterSince = Parameter.with("since", calendar.getTime());
calendar.add(Calendar.DAY_OF_MONTH, 1);
Parameter parameterUntil = Parameter.with("until", calendar.getTime());
Connection<Insight> responseListInsight = facebookClient.fetchConnection(objectId + "/insights/" + metric, Insight.class, parameterSince, parameterUntil);
if (responseListInsight != null && !responseListInsight.getData().isEmpty())
return responseListInsight.getData().get(0);
else
return null;
}
}
感谢您的帮助。
如何创建一个 java action,并设置一个使用协调器当前时间的工作流 属性。
<property>
<name>myStart</name>
<value>${coord:current(0)}</value>
</property>
然后在您的操作中使用此 属性 作为参数。
我的 flume.config 上有一个自定义来源 运行,负责每小时从 Facebook 页面提取数据。
我想知道是否有任何方法可以将提取时间设置为我的协调器的开始时间?
例如,我将我的协调器设置为在 2015 年 1 月 1 日凌晨 12 点开始,然后我的 flume 同时开始提取。
这是我的自定义来源:
public class FacebookPageFansCitySource extends AbstractPollableSource
{
private String accessToken;
private String pageId;
private int refreshInterval;
private FacebookClient facebookClient;
private volatile long lastPoll = 0;
@Override
protected void doConfigure(Context context) throws FlumeException
{
this.accessToken = context.getString(FacebookSourceConstants.ACCESS_TOKEN_KEY);
this.pageId = context.getString(FacebookSourceConstants.PAGE_ID_KEY);
this.refreshInterval = context.getInteger(FacebookSourceConstants.REFRESH_INTERVAL_KEY);
facebookClient = new DefaultFacebookClient(accessToken, Version.VERSION_2_2);
}
@Override
protected void doStart() throws FlumeException
{
}
@Override
protected void doStop() throws FlumeException
{
}
@Override
protected Status doProcess() throws EventDeliveryException
{
Status status = Status.BACKOFF;
if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - lastPoll) > refreshInterval)
{
lastPoll = System.currentTimeMillis();
try
{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
Date date = new Date(lastPoll);
String dateFormatted = simpleDateFormat.format(date);
final Map<String, String> headers = new HashMap<String, String>();
headers.put("timestamp", String.valueOf(date.getTime()));
Insight insight = getInsight(pageId, "page_fans_city");
if (insight != null)
{
final List<Event> events = new ArrayList<Event>();
ChannelProcessor channelProcessor = getChannelProcessor();
List<JsonObject> values = insight.getValues();
for (JsonObject value : values)
{
String referenceDate = simpleDateFormat.format(DateUtils.toDateFromLongFormat(value.getString("end_time")));
JsonObject jsonObjectValue = value.getJsonObject("value");
for (Iterator<?> keys = jsonObjectValue.keys(); keys.hasNext(); )
{
String key = (String) keys.next();
Long count = jsonObjectValue.getLong(key);
JsonObject jsonObject = new JsonObject();
jsonObject.put("reference_date", referenceDate);
jsonObject.put("city", key);
jsonObject.put("count", count);
jsonObject.put("poll_time", dateFormatted);
Event event = EventBuilder.withBody(jsonObject.toString().getBytes(), headers);
events.add(event);
}
}
channelProcessor.processEventBatch(events);
}
status = Status.READY;
}
catch (Exception e)
{
Logger.getLogger(FacebookPageFansCitySource.class.getName()).log(Level.SEVERE, null, e);
}
}
return status;
}
private Insight getInsight(String objectId, String metric)
{
TimeZone timeZone = TimeZone.getTimeZone("UTC");
Calendar calendar = Calendar.getInstance(timeZone);
calendar.add(Calendar.DAY_OF_MONTH, -4);
Parameter parameterSince = Parameter.with("since", calendar.getTime());
calendar.add(Calendar.DAY_OF_MONTH, 1);
Parameter parameterUntil = Parameter.with("until", calendar.getTime());
Connection<Insight> responseListInsight = facebookClient.fetchConnection(objectId + "/insights/" + metric, Insight.class, parameterSince, parameterUntil);
if (responseListInsight != null && !responseListInsight.getData().isEmpty())
return responseListInsight.getData().get(0);
else
return null;
}
}
感谢您的帮助。
如何创建一个 java action,并设置一个使用协调器当前时间的工作流 属性。
<property>
<name>myStart</name>
<value>${coord:current(0)}</value>
</property>
然后在您的操作中使用此 属性 作为参数。