生产商代码如下
static PltPage pltPage;
public static void main(String[] args) throws IOException {
Short itemtype = 1;
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("partitioner.class", "com.rms.com.SimplePartitioner");
props.put("serializer.class", "com.rms.com.CustomSerializer");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String,PltResultPage> producer = new Producer<String,PltResultPage>(config);
String folder = new File(".").getAbsoluteFile().getPath();
String parent = new File(folder).getParentFile().getParent();
String path = parent + "/KafkaProducerSparkConsumer/src/resources/PortfolioPLT.txt";
FileReader fr = new FileReader(path);
BufferedReader br = new BufferedReader(fr);
String sCurrentLine;
List<Integer> periodIds = new ArrayList<Integer>();
List<Integer> sampleIds = new ArrayList<Integer>();
List<Integer> eventIds = new ArrayList<Integer>();
List<Integer> dates = new ArrayList<Integer>();
List<Double> losses = new ArrayList<Double>();
while ((sCurrentLine = br.readLine()) != null) {
String [] entries = sCurrentLine.toString().split("~");
if (entries[1].equalsIgnoreCase("GR"))
{
periodIds.add(Integer.parseInt(entries[2]));
sampleIds.add(Integer.parseInt(entries[3]));
eventIds.add(Integer.parseInt(entries[4]));
dates.add(2040);
losses.add(Double.parseDouble(entries[6]));
}
}
pltPage = ExportUtilities.generatepltpage(periodIds,sampleIds,eventIds,dates,losses,periodIds.size(),1000000000002L,Short.parseShort("1"));
PltResultPage resultPage = new PltResultPage();
resultPage.setAnalysisId(1);
resultPage.setExternalID("1");
resultPage.setItemId(1L);
resultPage.setItemType(itemtype);
resultPage.setOutputProperty(itemtype);
resultPage.setResultType(itemtype);
resultPage.setResultPage(pltPage);
resultPage.setJobId(1L);
KeyedMessage<String, PltResultPage> message = new KeyedMessage<String, PltResultPage>("test", resultPage);
producer.send(message);
producer.close();
}
消费者代码如下
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "20000");
props.put("zookeeper.sync.time.ms", "3000");
props.put("auto.commit.interval.ms", "2000");
props.put("auto.offset.reset", "smallest");
props.put("serializer.class", "com.rms.com.CustomSerializer");
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
final StringDecoder decoder =
new StringDecoder(new VerifiableProperties(returnProperties(zooKeeper,groupId)));
final CustomSerializer decoder2 = new CustomSerializer(new VerifiableProperties(returnProperties(zooKeeper,groupId)));
final Map<String, List<KafkaStream<String, PltResultPage>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, decoder, decoder2);
final List<KafkaStream<String, PltResultPage>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
int threadNumber = a_numThreads;
for(KafkaStream stream : streams) {
executor.submit(new ExecuteConsumerClient(stream, threadNumber));
threadNumber++;
}
System.out.println("calling ExecuteConsumerClient.run()");
ConsumerIterator<String,PltResultPage> it = m_stream.iterator();
while (it.hasNext())
{
try {
CreateJavaSparkContext();
System.out.println("Converting to ResultPage");
PltResultPage pltResultPage = (PltResultPage)it.next().message();
System.out.println("Before Impl Accept");
sparkExportPLTToFile.accept(pltResultPage.getJobId(), pltResultPage.getItemId(), pltResultPage.getItemType(), pltResultPage.getOutputProperty(), pltResultPage.getResultType(), pltResultPage.getResultPage(), pltResultPage.getAnalysisId(), pltResultPage.getExternalID());
}
catch (Exception e)
{
System.out.println( "Exception in it.Run" + e.getStackTrace().toString() );
}
System.out.println("Executed impl for thread " + m_threadNumber);
}
System.out.println("Shutting down Thread: " + m_threadNumber);
}
尝试将消息转换为对象时失败。我从一篇文章中得到了定制的序列化程序代码,如下所示。有人能指出实施中有什么问题吗。我尝试从自定义序列化程序中使用frombytes,但没有任何帮助。序列化程序正在返回null对象
自定义序列化程序
public class CustomSerializer implements Encoder<PltResultPage>, Decoder<PltResultPage> {
public CustomSerializer(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
public CustomSerializer() {
}
public byte[] toBytes(PltResultPage o) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
byte[] b = baos.toByteArray();
return b;
} catch (IOException e) {
return new byte[0];
}
}
@Override
public PltResultPage fromBytes(byte[] bytes) {
try {
return (PltResultPage) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
} catch (Exception e) {
return null;
}
}
}
pltresultpage在下面。
public class PltResultPage implements Serializable {
private Long jobId;
private Long itemId;
private Short itemType;
private Short outputProperty;
private Short resultType;
private LossPage resultPage;
private Integer analysisId;
private String externalID;
private static final long serialVersionUID = 0L;
public Long getJobId()
{return this.jobId;}
public Long getItemId()
{return this.itemId;}
public String getExternalID()
{return this.externalID;}
public Short getItemType()
{return this.itemType;}
public Short getOutputProperty()
{return this.outputProperty;}
public Short getResultType()
{ return this.resultType;}
public LossPage getResultPage()
{return this.resultPage;}
public Integer getAnalysisId()
{return this.analysisId;}
public void setJobId(Long jobid)
{this.jobId = jobid;}
public void setOutputProperty(Short output)
{this.outputProperty = output;}
public void setItemId(Long itemId)
{this.itemId = itemId;}
public void setItemType(Short type)
{
this.itemType = type;
}
public void setResultType(Short resultType)
{
this.resultType = resultType;
}
public void setResultPage(LossPage page)
{this.resultPage = page;}
public void setAnalysisId(Integer id)
{
this.analysisId = id;
}
public void setExternalID(String externalID)
{
this.externalID = externalID;
}
}
1条答案
按热度按时间fcg9iug31#
尝试添加
到pltresultpage。您看不到它,但是这个值与其他值一起被序列化,在反序列化时,该值将与当前jvm中加载的类中的值进行比较。如果值不同,序列化将失败,并且您将得到一个空结果,即使您在consumer和producer jvm中对pltresultpage使用完全相同的源代码。如果不为类指定serialversionuid,jvm将为您提供一个值,而且可以肯定的是,使用者jvm中serialversionuid的随机值将不同于生产者jvm中serialversionuid的随机值。
简而言之,如果在自定义序列化程序中使用默认的java序列化/反序列化,则必须在要序列化的对象的类中声明serialversionuid。