运行时的 Apache Flink 映射
Apache Flink Mapping at Runtime
我已经构建了一个 flink 流作业来从 kafka 读取 xml 文件,转换文件并将其写入数据库。
由于 xml 文件中的属性与数据库列名不匹配,我已经为映射构建了一个开关案例。
因为这不是很灵活,所以我想从代码中取出这个硬连线映射信息。首先,我想出了一个映射文件的想法,它看起来像这样:
path.in.xml.to.attribut=database.column.name
当前的作业逻辑是这样的:
switch(path.in.xml.to.attribute){
case "example.one.name":
return "name";
对于映射文件,我想我会使用 Map 将映射数据存储为键值对。
这将使工作更加灵活,就像现在一样。还有一个缺点是,对于我想要应用的此配置中的每个更改,我都必须重新启动 flink 作业。
我的问题是是否可以在运行时注入这种映射逻辑,例如通过自己的 kafka 主题。当这种实现成为可能时,它看起来像一个例子。
如果您唯一需要的是能够更新 xml 属性和数据库列名称之间的映射,那么 The Broadcast State Pattern can be used. Also, A Practical Guide to Broadcast State in Apache Flink 也很有用。
这个想法是有一个流,订阅你自己的带有数据库映射的kafka主题,它将更新广播给所有任务管理器。这些运算符会将此 Map<String, String>
维护为一个状态,您可以使用此映射状态来解析列名,即使用 map.get(path.in.xml.to.attribute))
代替 switch(path.in.xml.to.attribute)
。本例中的 map
运算符应替换为 BroadcastProcessFunction
.
我已经构建了一个 flink 流作业来从 kafka 读取 xml 文件,转换文件并将其写入数据库。 由于 xml 文件中的属性与数据库列名不匹配,我已经为映射构建了一个开关案例。
因为这不是很灵活,所以我想从代码中取出这个硬连线映射信息。首先,我想出了一个映射文件的想法,它看起来像这样:
path.in.xml.to.attribut=database.column.name
当前的作业逻辑是这样的:
switch(path.in.xml.to.attribute){
case "example.one.name":
return "name";
对于映射文件,我想我会使用 Map 将映射数据存储为键值对。
这将使工作更加灵活,就像现在一样。还有一个缺点是,对于我想要应用的此配置中的每个更改,我都必须重新启动 flink 作业。
我的问题是是否可以在运行时注入这种映射逻辑,例如通过自己的 kafka 主题。当这种实现成为可能时,它看起来像一个例子。
如果您唯一需要的是能够更新 xml 属性和数据库列名称之间的映射,那么 The Broadcast State Pattern can be used. Also, A Practical Guide to Broadcast State in Apache Flink 也很有用。
这个想法是有一个流,订阅你自己的带有数据库映射的kafka主题,它将更新广播给所有任务管理器。这些运算符会将此 Map<String, String>
维护为一个状态,您可以使用此映射状态来解析列名,即使用 map.get(path.in.xml.to.attribute))
代替 switch(path.in.xml.to.attribute)
。本例中的 map
运算符应替换为 BroadcastProcessFunction
.