我目前有一个财务申请。虽然在这个财务应用程序中有许多计算,但其中一个计算是确定1)新的传入交易占总交易金额的百分比是多少?2) 对于同一客户,新交易占给定客户总交易金额的百分比是多少?
为了简单起见,让我们假设每天早上6点时,转换数据将被切断,此时程序将启动。换言之,我们在这里处理给定日期的大部分静态数据。
例如:
交易1:客户1->100美元
交易2:客户1->100美元
交易3:客户2->100美元
我想知道的是,交易1占总交易的33%。交易记录1占客户1总交易记录的50%。
下面是今天的代码的一个稍微简化的版本,它作为单个java进程运行,所有数据都存储在同一进程的堆中(因此这里没有进程间通信)。
dao类:维护应用程序数据
public class ApplicationDataDao {
private Map<String,Transaction> transactionsByTransactionId;
private Map<String,Transcation> transcationsByCustomerId;
private TranscationAggregate transcationAggregate;
private Map<String,TranscationAggregate> transactionAggregateByCustomerId;
//constructor, getters and setters to populate these maps and to retrieve data
from these maps
}
事务类:表示一个事务
public class Transaction {
private String transcationId;
private String customerId;
private BigDecimal transcationAmount;
private BigDecimal transcationPercentageAllocation;
private BigDecimal customerPercentageAllocation;
}
aggregate类:保存事务级别和客户级别的合计。
public class TranscationAggregate {
private BigDecimal totalTranscationAmount = BigDecimal.ZERO;
private String trancationId;
private String customerId;
private void aggregate(BigDecimal currentTranscationAmount) {
totalTranscationAmount.add(currentTranscationAmount);
}
}
从今天的截止文件中读取数据
ApplicationDataDao dao = getSingletonApplicationDataDao();
for(String line : reader.read()) {
String []tokens = line.split(",");
Transaction transaction = new Transaction();
transaction.setTransactionId(tokens[0]);
transaction.setCustomerId(tokens[1]);
transcation.setTransactionAmount(tokens[2]);
dao.putTransactionByTransactionId(transaction.getTranscationId());
dao.putTranscationByCustomerId(transcation.getCustomerId());
//Keep a track of the total transaction amount and total transaction amount by customer id.
dao.getTranscationAggregate().aggregate(transcation.getTranscationAmount());
dao.getTranscationAggregateByCustomerId(transcation.getCustomerId()).
aggregate(transcation.getTranscationAmount());
}
计算一个事务相对于其他事务的百分比分配
for(Transaction transaction : dao.getTranscationsByTranscationId().values()) {
transaction.setTranscationPercentageAllocation(transaction.getTranscationAmount().divide(dao.getTransactionAggregate().getTotalTransactionAmount())
}
计算客户的交易相对于同一客户的其他交易的百分比分配
for(TransactionAggregate transactionAggregate : dao.getTranscationAggregateByCustomerId()) {
Transaction transaction = dao.getTranscationByCustomerId(transactionAggregate .getCustomerId());
transaction.setCustomerPercentageAllocation(transaction.getTranscationAmount().divide(transactionAggregate.getTotalTransactionAmount())
}
到目前为止,这个应用程序运行在其他团队使用的专用unix机器上。换句话说,它是一个独立的、单一的应用程序。我想把这个应用程序重构成一个基于kafka流的应用程序。这意味着上述for循环将被分解为生产者和消费者,而不是在一个for循环中完成以下所有工作:
从文件中读取一行,将其转换为事务对象并将其写入Kafka主题的独立程序。
另一方面,流消费者读取事务对象并创建两个ktable示例,分别保存总事务量(null键)和按客户r id(客户id作为键)聚合事务量
例如,将ktable示例写入两个独立的kafka主题(事务聚合主题和客户聚合主题)。
我现在有了一个事务对象流。我也有两个主题,基本上是持有总量。我的问题是:如何使用每个事务的聚合ktables中的值重新丰富事务流,以便在处理结束时查看流时,每个事务对象现在都知道它相对于其他事务的百分比,或者它相对于同一客户的其他事务的百分比(首先,事务流没有键。如何将事务流中的消息与两个ktable匹配?)
暂无答案!
目前还没有任何答案,快来回答吧!