我已经为Spark结构化流创建了一个自定义流选项卡。我刚刚在Spark UI中附加了流选项卡,但我无法在流选项卡中附加页面。在此自定义选项卡中,我想显示已完成的批处理数量、每个批处理的消息数量、处理时间及其图表。通过SQL侦听器和StreamingQueryListener,我可以获得信息,并希望添加到流选项卡中。
public class CustomTab extends SparkUITab {
public CustomTab(SparkUI parent, String prefix) {
super(parent, prefix);
}
public static void getCustomTab(JobContext jobcontext) {
SparkContext sparkContext = jobcontext.getSparkSession().sparkContext();
StreamingContext ssc = new StreamingContext(sparkContext, new Duration(1000));
StreamingTab streamingTab = new StreamingTab(ssc);
sparkContext.ui().get().attachPage(new CustomPage("customPage", streamingTab));
if (sparkContext.ui().isDefined()) {
sparkContext.ui().get().attachTab(streamingTab);
}
}
}
我在自定义流选项卡中附加customPage时遇到问题。SparkUI未调用SparkUITab的render方法。我遇到的第二个问题是如何更改Java中的scala.xml.Node类。我的意思是,Java中是否有任何类可以代替Node类使用,或者如何在scala. xml. Node中添加String。
如果有任何Java类,我可以使用,那么如何更改回节点的序列。
class CustomPage extends WebUIPage {
StreamingTab streamingTab = null;
public CustomPage(String prefix, StreamingTab sparkUI) {
super(prefix);
this.streamingTab = sparkUI;
}
@Override
public Seq<Node> render(HttpServletRequest request) {
System.out.println("In render method");
String value = "This text is going to come at the bottom";
Option<String> optionSome = Option.apply(value);
Function0<Seq<Node>> length = new AbstractFunction0<Seq<Node>>() {
@Override
public Seq<Node> apply() {
scala.xml.Node node = null;
// I am not getting that how to add html code("<div> { <div id=\"custommmmmm\"></div> } </div>") here
return null;
}
};
return UIUtils.headerSparkPage("custom", length, this.streamingTab, null, optionSome, false, false);
}
}
是否可以通过侦听器显示结构化流批处理、其处理时间和每个批处理的行数,如果可能,我应该使用哪个侦听器,以及如何将其添加到我的自定义页面中。
2条答案
按热度按时间j8ag8udp1#
您可以将StreamingQueryListener用于您的用例。您必须持久化从StreamingQueryListener获取的数据,然后才能在“自定义”选项卡中显示它们。
flvlnr442#
使用类Text -参见示例: