本文整理了Java中org.apache.flume.Event.setBody()
方法的一些代码示例,展示了Event.setBody()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Event.setBody()
方法的具体详情如下:
包路径:org.apache.flume.Event
类名称:Event
方法名:setBody
[英]Sets the raw byte array of the data contained in this event.
[中]设置此事件中包含的数据的原始字节数组。
代码示例来源:origin: apache/flume
@Override
public Event intercept(Event event) {
String origBody = new String(event.getBody(), charset);
Matcher matcher = searchPattern.matcher(origBody);
String newBody = matcher.replaceAll(replaceString);
event.setBody(newBody.getBytes(charset));
return event;
}
代码示例来源:origin: apache/flume
/**
* Instantiate an Event instance based on the provided body and headers.
* If <code>headers</code> is <code>null</code>, then it is ignored.
* @param body
* @param headers
* @return
*/
public static Event withBody(byte[] body, Map<String, String> headers) {
Event event = new SimpleEvent();
if (body == null) {
body = new byte[0];
}
event.setBody(body);
if (headers != null) {
event.setHeaders(new HashMap<String, String>(headers));
}
return event;
}
代码示例来源:origin: apache/flume
"Read " + count + " of total " + length);
event.setBody(body);
String text = textMessage.getText();
if (text != null) {
event.setBody(text.getBytes(charset));
out = new ObjectOutputStream(bos);
out.writeObject(object);
event.setBody(bos.toByteArray());
} catch (IOException e) {
throw new FlumeException("Error serializing object", e);
代码示例来源:origin: org.apache.flume/flume-ng-core
@Override
public Event intercept(Event event) {
String origBody = new String(event.getBody(), charset);
Matcher matcher = searchPattern.matcher(origBody);
String newBody = matcher.replaceAll(replaceString);
event.setBody(newBody.getBytes(charset));
return event;
}
代码示例来源:origin: keedio/flume-ng-sql-source
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
Event event = new SimpleEvent();
String s = new String(cbuf);
event.setBody(s.substring(off, len-1).getBytes(Charset.forName(sqlSourceHelper.getDefaultCharsetResultSet())));
Map<String, String> headers;
headers = new HashMap<String, String>();
headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
event.setHeaders(headers);
events.add(event);
if (events.size() >= sqlSourceHelper.getBatchSize())
flush();
}
代码示例来源:origin: org.apache.flume/flume-ng-sdk
/**
* Instantiate an Event instance based on the provided body and headers.
* If <code>headers</code> is <code>null</code>, then it is ignored.
* @param body
* @param headers
* @return
*/
public static Event withBody(byte[] body, Map<String, String> headers) {
Event event = new SimpleEvent();
if (body == null) {
body = new byte[0];
}
event.setBody(body);
if (headers != null) {
event.setHeaders(new HashMap<String, String>(headers));
}
return event;
}
代码示例来源:origin: keedio/flume-ftp-source
/**
* @param lastInfo byte[]
* @void process last appended data to files
*/
private void processMessage(byte[] lastInfo, String fileName, String filePath) {
byte[] message = lastInfo;
Event event = new SimpleEvent();
Map<String, String> headers = new HashMap<>();
headers.put("fileName", fileName);
headers.put("filePath", filePath);
headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
event.setBody(message);
event.setHeaders(headers);
try {
getChannelProcessor().processEvent(event);
} catch (ChannelException e) {
LOGGER.error("ChannelException", e);
}
sourceCounter.incrementCountSizeProc(message.length);
sourceCounter.incrementEventCount();
}
代码示例来源:origin: jaibeermalik/searchanalytics-bigdata
public Event getJsonEvent(
final SearchQueryInstruction searchQueryInstruction)
throws JsonProcessingException {
final String searchQueryInstructionAsString = getObjectMapper()
.writeValueAsString(searchQueryInstruction);
// String writeValueAsString =
// mapper.writerWithDefaultPrettyPrinter().writeValueAsString(searchQueryInstruction);
searchEventsLogger.info(searchQueryInstructionAsString);
final Event event = new JSONEvent();
event.setBody(searchQueryInstructionAsString.getBytes());
final Map<String, String> headers = new HashMap<String, String>();
headers.put("eventId", searchQueryInstruction.getEventIdSuffix());
headers.put("timestamp", searchQueryInstruction
.getCreatedTimeStampInMillis().toString());
if (searchQueryInstruction.getClickedDocId() != null) {
if (searchQueryInstruction.getFavourite() != null
&& searchQueryInstruction.getFavourite()) {
headers.put("State", "FAVOURITE");
} else {
headers.put("State", "VIEWED");
}
}
event.setHeaders(headers);
return event;
}
代码示例来源:origin: hadooparchitecturebook/hadoop-arch-book
event.setBody(Bytes.toBytes(action.getJSONObject().toString()));
eventActionList.add(event);
actionList.add(action);
代码示例来源:origin: org.apache.flume.flume-ng-sources/flume-jms-source
"Read " + count + " of total " + length);
event.setBody(body);
String text = textMessage.getText();
if (text != null) {
event.setBody(text.getBytes(charset));
out = new ObjectOutputStream(bos);
out.writeObject(object);
event.setBody(bos.toByteArray());
} catch (IOException e) {
throw new FlumeException("Error serializing object", e);
代码示例来源:origin: stackoverflow.com
event.setBody(newBody.getBytes())
return event;
代码示例来源:origin: handuyishe/rocketmq-flume
mq, messageExt.getTopic(), messageExt.getTags(), messageExt.getBody()});
event.setBody(messageExt.getBody());
event.setHeaders(headers);
eventList.add(event);
代码示例来源:origin: jcustenborder/flume-ng-rabbitmq
Map<String, String> properties = RabbitMQUtil.getHeaders(delivery.getProperties());
Event event = new SimpleEvent();
event.setBody(delivery.getBody());
event.setHeaders(properties);
内容来源于网络,如有侵权,请联系作者删除!