Oozie 日期时间开始

Oozie date time start

我的 flume.config 上有一个自定义来源 运行,负责每小时从 Facebook 页面提取数据。

我想知道是否有任何方法可以将提取时间设置为我的协调器的开始时间?

例如,我将我的协调器设置为在 2015 年 1 月 1 日凌晨 12 点开始,然后我的 flume 同时开始提取。

这是我的自定义来源:

public class FacebookPageFansCitySource extends AbstractPollableSource
{
    private String accessToken;
    private String pageId;
    private int refreshInterval;

    private FacebookClient facebookClient;

    private volatile long lastPoll = 0;

    @Override
    protected void doConfigure(Context context) throws FlumeException
    {
        this.accessToken = context.getString(FacebookSourceConstants.ACCESS_TOKEN_KEY);
        this.pageId = context.getString(FacebookSourceConstants.PAGE_ID_KEY);
        this.refreshInterval = context.getInteger(FacebookSourceConstants.REFRESH_INTERVAL_KEY);

        facebookClient = new DefaultFacebookClient(accessToken, Version.VERSION_2_2);
    }

    @Override
    protected void doStart() throws FlumeException
    {
    }

    @Override
    protected void doStop() throws FlumeException
    {
    }

    @Override
    protected Status doProcess() throws EventDeliveryException
    {
        Status status = Status.BACKOFF;

        if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - lastPoll) > refreshInterval)
        {
            lastPoll = System.currentTimeMillis();

            try
            {
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");

                simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

                Date date = new Date(lastPoll);

                String dateFormatted = simpleDateFormat.format(date);

                final Map<String, String> headers = new HashMap<String, String>();

                headers.put("timestamp", String.valueOf(date.getTime()));

                Insight insight = getInsight(pageId, "page_fans_city");

                if (insight != null)
                {
                    final List<Event> events = new ArrayList<Event>();

                    ChannelProcessor channelProcessor = getChannelProcessor();

                    List<JsonObject> values = insight.getValues();

                    for (JsonObject value : values)
                    {
                        String referenceDate = simpleDateFormat.format(DateUtils.toDateFromLongFormat(value.getString("end_time")));

                        JsonObject jsonObjectValue = value.getJsonObject("value");

                        for (Iterator<?> keys = jsonObjectValue.keys(); keys.hasNext(); )
                        {
                            String key = (String) keys.next();
                            Long count = jsonObjectValue.getLong(key);

                            JsonObject jsonObject = new JsonObject();

                            jsonObject.put("reference_date", referenceDate);
                            jsonObject.put("city", key);
                            jsonObject.put("count", count);
                            jsonObject.put("poll_time", dateFormatted);

                            Event event = EventBuilder.withBody(jsonObject.toString().getBytes(), headers);

                            events.add(event);
                        }
                    }

                    channelProcessor.processEventBatch(events);
                }

                status = Status.READY;
            }
            catch (Exception e)
            {
                Logger.getLogger(FacebookPageFansCitySource.class.getName()).log(Level.SEVERE, null, e);
            }
        }

        return status;
    }

    private Insight getInsight(String objectId, String metric)
    {
        TimeZone timeZone = TimeZone.getTimeZone("UTC");

        Calendar calendar = Calendar.getInstance(timeZone);

        calendar.add(Calendar.DAY_OF_MONTH, -4);

        Parameter parameterSince = Parameter.with("since", calendar.getTime());

        calendar.add(Calendar.DAY_OF_MONTH, 1);

        Parameter parameterUntil = Parameter.with("until", calendar.getTime());

        Connection<Insight> responseListInsight = facebookClient.fetchConnection(objectId + "/insights/" + metric, Insight.class, parameterSince, parameterUntil);

        if (responseListInsight != null && !responseListInsight.getData().isEmpty())
            return responseListInsight.getData().get(0);
        else
            return null;
    }
}

感谢您的帮助。

如何创建一个 java action,并设置一个使用协调器当前时间的工作流 属性。

<property>
    <name>myStart</name>
    <value>${coord:current(0)}</value>
</property>

然后在您的操作中使用此 属性 作为参数。