gpt4 book ai didi

kotlin - Camel :项目数量小于批量大小时,如何拆分然后汇总

转载 作者:行者123 更新时间:2023-12-02 12:53:46 27 4
gpt4 key购买 nike

我有一个 Camel 路线,该路线从S3读取文件并按如下方式处理输入文件:

  • 使用Bindy
  • 将每一行解析为POJO(学生)
  • 通过body()分割输出
  • 通过正文的属性(.semester)和批处理大小2
  • 进行聚合
  • 调用持久性服务以给定的批次上传到数据库

  • 问题在于,批处理大小为2,记录数为奇数时,总会有一个记录没有保存。

    提供的代码为 Kotlin,但与等效的Java代码(与“\ $ {simple expression}”前面的斜杠或缺少分号来终止语句)应该没有太大区别。

    如果将批处理大小设置为1,则将保存每条记录,否则将永远不会保存最后一条记录。

    我已经检查了 message-processor的文档几次,但似乎没有涵盖这种特殊情况。

    我还设置了[ completionTimeout | completionInterval之外的 completionSize],但没有任何区别。

    有人遇到过这个问题吗?
    val csvDataFormat = BindyCsvDataFormat(Student::class.java)

    from("aws-s3://$student-12-bucket?amazonS3Client=#amazonS3&delay=5000")
    .log("A new Student input file has been received in S3: '\${header.CamelAwsS3BucketName}/\${header.CamelAwsS3Key}'")
    .to("direct:move-input-s3-object-to-in-progress")
    .to("direct:process-s3-file")
    .to("direct:move-input-s3-object-to-completed")
    .end()

    from("direct:process-s3-file")
    .unmarshal(csvDataFormat)
    .split(body())
    .streaming()
    .parallelProcessing()
    .aggregate(simple("\${body.semester}"), GroupedBodyAggregationStrategy())
    .completionSize(2)
    .bean(persistenceService)
    .end()

    使用包含七(7)条记录的输入CSV文件,这是生成的输出(带有一些添加的调试日志记录):

    WARN 19540 --- [student-12-move] c.a.s.s.internal.S3AbortableInputStream:并非从S3ObjectInputStream读取所有字节,从而中止HTTP连接。这可能是一个错误,并可能导致次佳的行为。使用远程GET仅请求所需的字节,或在使用后耗尽输入流。
    INFO 19540 --- [student-12-move] student-workflow-main:在S3中收到一个新的Student输入文件:'student-12-bucket / inbox / foo.csv'
    INFO 19540 --- [student-12-move] move-input-s3-object-to-in-progress:将S3文件“inbox / foo.csv”移动到“in-progress”文件夹中...
    INFO 19540 --- [student-12-move] student-workflow-main:将输入S3文件'in-progress / foo.csv'移动到'in-progress'文件夹中...
    INFO 19540 --- [student-12-move] pre-process-s3-file-records:开始保存到数据库...
    DEBUG 19540 --- [读取#7-拆分] c.b.i.d.s.StudentPersistenceServiceImpl:将记录保存到数据库:Student(id = 7,name = Student 7,semester = 2nd,javaMarks = 25)
    DEBUG 19540 --- [读取#7-拆分] c.b.i.d.s.StudentPersistenceServiceImpl:将记录保存到数据库:Student(id = 5,name = Student 5,semester = 2nd,javaMarks = 81)
    DEBUG 19540 --- [读取#3-拆分] c.b.i.d.s.StudentPersistenceServiceImpl:将记录保存到数据库:Student(id = 6,name = Student 6,semester = 1st,javaMarks = 15)
    DEBUG 19540 --- [读取#3-拆分] c.b.i.d.s.StudentPersistenceServiceImpl:将记录保存到数据库:Student(id = 2,name = Student 2,semester = 1st,javaMarks = 62)
    DEBUG 19540 --- [读取#2-拆分] c.b.i.d.s.StudentPersistenceServiceImpl:将记录保存到数据库:Student(id = 3,name = Student 3,semester = 2nd,javaMarks = 72)
    DEBUG 19540 --- [读取#2-拆分] c.b.i.d.s.StudentPersistenceServiceImpl:将记录保存到数据库:Student(id = 1,name = Student 1,semester = 2nd,javaMarks = 87)
    INFO 19540 --- [student-12-move] device-group-workflow-main:结束预处理S3 CSV文件记录...
    INFO 19540 --- [student-12-move] move-input-s3-object-to-completed:将S3文件'in-progress / foo.csv'移动到'completed'文件夹中...
    INFO 19540 --- [student-12-move] device-group-workflow-main:将S3文件'in-progress / foo.csv'移动到'completed'文件夹中...

    最佳答案

    如果需要立即完成消息,则可以基于拆分器设置的交换属性来指定完成谓词。我没有尝试过,但我认为

    .completionPredicate( simple( "${exchangeProperty.CamelSplitComplete}" ) )

    将处理最后一条消息。

    我的另一个担心是您在拆分器中设置了 parallelProcessing,这可能意味着消息未按顺序处理。是要应用并行处理的拆分器,还是聚合器?除了拆分记录,然后对它们进行处理之后,您似乎没有对拆分记录做任何事情,因此将 parallelProcessing指令移至聚合器可能更好。

    关于kotlin - Camel :项目数量小于批量大小时,如何拆分然后汇总,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54017750/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com