Pinpoint 不向 Kinesis 发送事件数据
Pinpoint does not send event data to Kinesis
我想为我的应用程序推荐模型使用个性化。获取我当前的应用程序分析数据。我已连接 pinpoint 以借助 kinesis firehose 获取数据,如 this documentation.
中所述
但是当我连接 kinesis data firehose 进行精确定位时。
我的 pinpoint 将数据发送到 kinesis。但是输出与我想要的不同。
运动设置:
和我得到的输出。
是否有任何其他方法可以解决发送数据以从精确定位到个性化以启动活动的问题。活动开始后,我可以根据文档通过活动发送数据。
由于 Pinpoint 事件的形式和内容与 Personalize 所需的交互格式不同(在 bulk as an interactions CSV or incrementally via the PutEvents API), some transformation is going to be required to get these events into the right format. The solution you noted uses periodic bulk imports by using Athena to extract and format the event data saved in S3 (through Kinesis Firehose) into the CSV format expected by Personalize and then imported into Personalize. You can find the Athena named queries in the CloudFormation template for the solution here 中导入。
WITH evs AS (
SELECT
client.client_id as endpoint_id,
attributes.campaign_id as campaign_id,
event_type,
arrival_timestamp
FROM event
WHERE
(
${InteractionsQueryDateScope} > 0
AND arrival_timestamp >= date_add('day', -1, CURRENT_DATE)
AND arrival_timestamp < CURRENT_DATE
) OR (
${InteractionsQueryDateScope} = -1
)
AND
event_type != '_custom.recommender'
),
recs AS (
SELECT
attributes.personalize_user_id as personalize_user_id,
client.client_id as endpoint_id,
attributes.campaign_id as campaign_id,
attributes.item_id as item_id,
event_type,
arrival_timestamp
FROM event
WHERE
(
${InteractionsQueryDateScope} > 0
AND arrival_timestamp >= date_add('day', -1, CURRENT_DATE)
AND arrival_timestamp < CURRENT_DATE
) OR (
${InteractionsQueryDateScope} = -1
)
AND
event_type = '_custom.recommender'
)
SELECT
r.personalize_user_id as USER_ID,
r.item_id AS ITEM_ID,
b.event_type AS EVENT_TYPE,
v.EVENT_VALUE,
CAST(to_unixtime(b.arrival_timestamp) AS BIGINT) AS TIMESTAMP
FROM endpoint_export a
INNER JOIN recs r
ON a.id = r.endpoint_id
INNER JOIN evs b
ON a.id = b.endpoint_id AND r.campaign_id = b.campaign_id
INNER JOIN event_value v
ON b.event_type = v.event_type
以下是在 Glue 数据目录中创建表的方式。
CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.event (
client struct<client_id:string>,
attributes struct<campaign_id:string, item_id:string, personalize_user_id:string>,
event_type string,
arrival_timestamp timestamp
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://${DataS3Bucket}/events/'
TBLPROPERTIES ('has_encrypted_data'='false');
CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.endpoint_export (
id string,
channeltype string,
address string,
endpointstatus string,
optout string,
effectivedate string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://${DataS3Bucket}/endpoint_exports/'
TBLPROPERTIES ('has_encrypted_data'='false');
CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.event_value (
event_type string,
event_value double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = ',',
'field.delim' = ','
) LOCATION 's3://${DataS3Bucket}/event_values/'
TBLPROPERTIES ('has_encrypted_data'='false', 'skip.header.line.count'='1');
Kinesis Firehose/S3/Athena 的另一种方法是编写一个直接从 Kinesis 数据流中使用 Pinpoint 事件的 Lambda 函数,将 Lambda 中的 Pinpoint 事件转换为 PutEvents API call to a Personalize event tracker,然后积累足够的交互数据后,创建解决方案、解决方案版本和市场活动。
Personalize 的 Pinpoint 事件所需的最少字段为 USER_ID
、ITEM_ID
和 TIMESTAMP
。 USER_ID
很可能是 Pinpoint 端点 (client.client_id
),ITEM_ID
很可能作为属性 (attributes.item_id
) 通过 Pinpoint 传递,而 TIMESTAMP
将是 arrival_timestamp
。您还可以在 Personalize 中将 Pinpoint event_type
用作 EVENT_TYPE
。上面的 SQL 语句向您展示了如何使用 Athena 完成此操作,但您也可以在 Lambda 中的代码中为您从 Kinesis 数据流中消耗的每个小批量事件执行此操作。
我想为我的应用程序推荐模型使用个性化。获取我当前的应用程序分析数据。我已连接 pinpoint 以借助 kinesis firehose 获取数据,如 this documentation.
中所述但是当我连接 kinesis data firehose 进行精确定位时。
我的 pinpoint 将数据发送到 kinesis。但是输出与我想要的不同。
运动设置:
和我得到的输出。
是否有任何其他方法可以解决发送数据以从精确定位到个性化以启动活动的问题。活动开始后,我可以根据文档通过活动发送数据。
由于 Pinpoint 事件的形式和内容与 Personalize 所需的交互格式不同(在 bulk as an interactions CSV or incrementally via the PutEvents API), some transformation is going to be required to get these events into the right format. The solution you noted uses periodic bulk imports by using Athena to extract and format the event data saved in S3 (through Kinesis Firehose) into the CSV format expected by Personalize and then imported into Personalize. You can find the Athena named queries in the CloudFormation template for the solution here 中导入。
WITH evs AS (
SELECT
client.client_id as endpoint_id,
attributes.campaign_id as campaign_id,
event_type,
arrival_timestamp
FROM event
WHERE
(
${InteractionsQueryDateScope} > 0
AND arrival_timestamp >= date_add('day', -1, CURRENT_DATE)
AND arrival_timestamp < CURRENT_DATE
) OR (
${InteractionsQueryDateScope} = -1
)
AND
event_type != '_custom.recommender'
),
recs AS (
SELECT
attributes.personalize_user_id as personalize_user_id,
client.client_id as endpoint_id,
attributes.campaign_id as campaign_id,
attributes.item_id as item_id,
event_type,
arrival_timestamp
FROM event
WHERE
(
${InteractionsQueryDateScope} > 0
AND arrival_timestamp >= date_add('day', -1, CURRENT_DATE)
AND arrival_timestamp < CURRENT_DATE
) OR (
${InteractionsQueryDateScope} = -1
)
AND
event_type = '_custom.recommender'
)
SELECT
r.personalize_user_id as USER_ID,
r.item_id AS ITEM_ID,
b.event_type AS EVENT_TYPE,
v.EVENT_VALUE,
CAST(to_unixtime(b.arrival_timestamp) AS BIGINT) AS TIMESTAMP
FROM endpoint_export a
INNER JOIN recs r
ON a.id = r.endpoint_id
INNER JOIN evs b
ON a.id = b.endpoint_id AND r.campaign_id = b.campaign_id
INNER JOIN event_value v
ON b.event_type = v.event_type
以下是在 Glue 数据目录中创建表的方式。
CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.event (
client struct<client_id:string>,
attributes struct<campaign_id:string, item_id:string, personalize_user_id:string>,
event_type string,
arrival_timestamp timestamp
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://${DataS3Bucket}/events/'
TBLPROPERTIES ('has_encrypted_data'='false');
CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.endpoint_export (
id string,
channeltype string,
address string,
endpointstatus string,
optout string,
effectivedate string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://${DataS3Bucket}/endpoint_exports/'
TBLPROPERTIES ('has_encrypted_data'='false');
CREATE EXTERNAL TABLE IF NOT EXISTS `${PinpointEventDatabase}`.event_value (
event_type string,
event_value double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = ',',
'field.delim' = ','
) LOCATION 's3://${DataS3Bucket}/event_values/'
TBLPROPERTIES ('has_encrypted_data'='false', 'skip.header.line.count'='1');
Kinesis Firehose/S3/Athena 的另一种方法是编写一个直接从 Kinesis 数据流中使用 Pinpoint 事件的 Lambda 函数,将 Lambda 中的 Pinpoint 事件转换为 PutEvents API call to a Personalize event tracker,然后积累足够的交互数据后,创建解决方案、解决方案版本和市场活动。
Personalize 的 Pinpoint 事件所需的最少字段为 USER_ID
、ITEM_ID
和 TIMESTAMP
。 USER_ID
很可能是 Pinpoint 端点 (client.client_id
),ITEM_ID
很可能作为属性 (attributes.item_id
) 通过 Pinpoint 传递,而 TIMESTAMP
将是 arrival_timestamp
。您还可以在 Personalize 中将 Pinpoint event_type
用作 EVENT_TYPE
。上面的 SQL 语句向您展示了如何使用 Athena 完成此操作,但您也可以在 Lambda 中的代码中为您从 Kinesis 数据流中消耗的每个小批量事件执行此操作。