如何将两条消息与从 Apache Camel 中的 MySQL 数据库检索到的数据结合起来?

How to combine two messages with data retrieved from the MySQL db in Apache Camel?

我目前正在处理一个集成项目。我必须从 MySQL 数据库中获取一些数据,然后他们使用 Apache Camel 将它们组合起来。在数据库中,我有两个表,materials 和包。它们是一对多的关系,一个material可以包含多个包。我已经想出了如何从数据库中获取数据并将它们保存到 json 文件,但我不知道如何将这两个消息合并为一个。我读过有关聚合的信息,但我并没有真正理解它们。这是我第一次使用 Apache Camel,我真的不知道我现在该做什么。这些路线的代码如下所示:

public class InputAdapter{

public static void main(String[] args) throws Exception {

    BasicDataSource dataSource = new BasicDataSource();
    dataSource.setDriverClassName("com.mysql.jdbc.Driver");
    dataSource.setUrl("jdbc:mysql://localhost:3306/Packages");
    dataSource.setUsername("uname");
    dataSource.setPassword("passwd");
    SimpleRegistry registry = new SimpleRegistry();
    registry.bind("dataSource", dataSource);

    CamelContext context = new DefaultCamelContext(registry);
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("timer://foo?repeatCount=1")
                    .setBody(constant("SELECT * FROM material;"))
                    .to("jdbc:dataSource")
                    .marshal().json(true)
                    .to("file:/some/path/to/file?fileName=materials.json");
            from("timer://foo?repeatCount=1")
                    .setBody(constant("SELECT * FROM package;"))
                    .to("jdbc:dataSource")
                    .marshal().json(true)
                    .to("file:/some/path/to/file?fileName=packages.json");
        }
    });

    context.start();
    Thread.sleep(10000);
    context.stop();
}
}

Material 的模型和包只是具有 getter 和 setter 的私有属性:

public class Material {
private int id;
private int number;
private enumType type;
private String name;
private String description;
private boolean is_deleted;
private List<Package> packageList = new ArrayList<>();

public enum enumType {
    A1, A2, A3, B1, B2, B3, Z1, Z2, Z3;
}

public int getId() {
    return this.id;
}

... some getters
}

public List<Package> getPackageList() {
    return this.packageList;
}

public void setId(int id) {
    this.id = id;
}

... some setters

public void setPackageList(List<Package> packages) {
    this.packageList = packages;
}
}

有人可以提示我现在应该做什么吗?请帮助我。

聚合器通常用于合并来自某个源的消息。我可能不会使用聚合器来组合这两组项目。如果您希望提取所有 Material 并从数据库中获取相关包,您最好检索每个 Material.

的包列表

我会创建一个处理器来处理为每个返回的 Materials objects 检索包,然后在单个路由中输出整个内容。

您可以在 Camel 中定义处理器 class,如下所示:

public class PackageProcessor implements Processor {
  @Override
  public void process(Exchange exchange) {
    // Transform the input to your message class
    // Retrieve the Packages
    // Transform the results to Packages
    // Add to the Material
    // Set the Out Body
    exchange.getMessage().setBody(material);
  }
}

然后您可以使用路线中的处理器来完成这项工作。那将使路线看起来像这样:

from("timer://foo?repeatCount=1")
  .routeId("my-material-route")
  .setBody(constant("SELECT * FROM materials;"))
  .to("jdbc:dataSource")
  .split(body())
  .process(new PackageProcessor())
  .setHeader(Exchange.FILE_NAME, simple("${exchangeId}.json"))
  .marshal().json(true)
  .to("file:/somepath")
  .end();

这会将每条记录输出到包含所需信息的 Json 文件。如果您希望将所有项目放在一个文件中,这就是聚合器发挥作用的地方。

您会注意到原始路线之外的一些物品。 JDBC 组件的结果是 ArrayList>。我们在路由中添加一个 Split 以将每个项目单独发送到处理器而不是整个结果集。处理器应该在 Exchange Body.

中收到一个 HashMap

在处理器之后,我们将交换中的 CamelFileName Header 设置为交换 ID,然后这将从 Materials table 中的每条记录输出一个单独的文件。

如果您想将所有内容都放在一个文件中,您可以使用一个聚合器来收集交换并构建一个列表。将交换释放到 JSON 文件可能会稍微复杂一些。您通常必须设置超时或某种评估函数来确定何时应该发布“超级”交换。