使用来自ktables的数据丰富kafka流

px9o7tmv  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(214)

我目前有一个财务申请。虽然在这个财务应用程序中有许多计算,但其中一个计算是确定1)新的传入交易占总交易金额的百分比是多少?2) 对于同一客户,新交易占给定客户总交易金额的百分比是多少?
为了简单起见,让我们假设每天早上6点时,转换数据将被切断,此时程序将启动。换言之,我们在这里处理给定日期的大部分静态数据。
例如:
交易1:客户1->100美元
交易2:客户1->100美元
交易3:客户2->100美元
我想知道的是,交易1占总交易的33%。交易记录1占客户1总交易记录的50%。
下面是今天的代码的一个稍微简化的版本,它作为单个java进程运行,所有数据都存储在同一进程的堆中(因此这里没有进程间通信)。
dao类:维护应用程序数据

public class ApplicationDataDao {
    private Map<String,Transaction> transactionsByTransactionId;
    private Map<String,Transcation> transcationsByCustomerId;
    private TranscationAggregate transcationAggregate;
    private Map<String,TranscationAggregate> transactionAggregateByCustomerId;

    //constructor, getters and setters to populate these maps and to retrieve data 
    from these maps
}

事务类:表示一个事务

public class Transaction {
     private String transcationId;
     private String customerId;
     private BigDecimal transcationAmount;

     private BigDecimal transcationPercentageAllocation;
     private BigDecimal customerPercentageAllocation;
}

aggregate类:保存事务级别和客户级别的合计。

public class TranscationAggregate {
    private BigDecimal totalTranscationAmount = BigDecimal.ZERO;

    private String trancationId;
    private String customerId;

    private void aggregate(BigDecimal currentTranscationAmount) {
        totalTranscationAmount.add(currentTranscationAmount);            
    }      

}

从今天的截止文件中读取数据

ApplicationDataDao dao = getSingletonApplicationDataDao();

    for(String line : reader.read()) {

         String []tokens = line.split(",");
         Transaction transaction = new Transaction();
         transaction.setTransactionId(tokens[0]);
         transaction.setCustomerId(tokens[1]);
         transcation.setTransactionAmount(tokens[2]);
         dao.putTransactionByTransactionId(transaction.getTranscationId());
         dao.putTranscationByCustomerId(transcation.getCustomerId());     
         //Keep a track of the total transaction amount and total transaction amount by customer id.
         dao.getTranscationAggregate().aggregate(transcation.getTranscationAmount());
         dao.getTranscationAggregateByCustomerId(transcation.getCustomerId()).
         aggregate(transcation.getTranscationAmount());

      }

计算一个事务相对于其他事务的百分比分配

for(Transaction transaction : dao.getTranscationsByTranscationId().values()) {
                  transaction.setTranscationPercentageAllocation(transaction.getTranscationAmount().divide(dao.getTransactionAggregate().getTotalTransactionAmount())
     }

计算客户的交易相对于同一客户的其他交易的百分比分配

for(TransactionAggregate transactionAggregate : dao.getTranscationAggregateByCustomerId()) {
       Transaction transaction = dao.getTranscationByCustomerId(transactionAggregate .getCustomerId());
       transaction.setCustomerPercentageAllocation(transaction.getTranscationAmount().divide(transactionAggregate.getTotalTransactionAmount())
     }

到目前为止,这个应用程序运行在其他团队使用的专用unix机器上。换句话说,它是一个独立的、单一的应用程序。我想把这个应用程序重构成一个基于kafka流的应用程序。这意味着上述for循环将被分解为生产者和消费者,而不是在一个for循环中完成以下所有工作:
从文件中读取一行,将其转换为事务对象并将其写入Kafka主题的独立程序。
另一方面,流消费者读取事务对象并创建两个ktable示例,分别保存总事务量(null键)和按客户r id(客户id作为键)聚合事务量
例如,将ktable示例写入两个独立的kafka主题(事务聚合主题和客户聚合主题)。
我现在有了一个事务对象流。我也有两个主题,基本上是持有总量。我的问题是:如何使用每个事务的聚合ktables中的值重新丰富事务流,以便在处理结束时查看流时,每个事务对象现在都知道它相对于其他事务的百分比,或者它相对于同一客户的其他事务的百分比(首先,事务流没有键。如何将事务流中的消息与两个ktable匹配?)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题