org.apache.flume.Event.setBody()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(5.5k)|赞(0)|评价(0)|浏览(174)

本文整理了Java中org.apache.flume.Event.setBody()方法的一些代码示例,展示了Event.setBody()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Event.setBody()方法的具体详情如下:
包路径:org.apache.flume.Event
类名称:Event
方法名:setBody

Event.setBody介绍

[英]Sets the raw byte array of the data contained in this event.
[中]设置此事件中包含的数据的原始字节数组。

代码示例

代码示例来源:origin: apache/flume

@Override
public Event intercept(Event event) {
 String origBody = new String(event.getBody(), charset);
 Matcher matcher = searchPattern.matcher(origBody);
 String newBody = matcher.replaceAll(replaceString);
 event.setBody(newBody.getBytes(charset));
 return event;
}

代码示例来源:origin: apache/flume

/**
 * Instantiate an Event instance based on the provided body and headers.
 * If <code>headers</code> is <code>null</code>, then it is ignored.
 * @param body
 * @param headers
 * @return
 */
public static Event withBody(byte[] body, Map<String, String> headers) {
 Event event = new SimpleEvent();
 if (body == null) {
  body = new byte[0];
 }
 event.setBody(body);
 if (headers != null) {
  event.setHeaders(new HashMap<String, String>(headers));
 }
 return event;
}

代码示例来源:origin: apache/flume

"Read " + count + " of total " + length);
 event.setBody(body);
String text = textMessage.getText();
if (text != null) {
 event.setBody(text.getBytes(charset));
  out = new ObjectOutputStream(bos);
  out.writeObject(object);
  event.setBody(bos.toByteArray());
 } catch (IOException e) {
  throw new FlumeException("Error serializing object", e);

代码示例来源:origin: org.apache.flume/flume-ng-core

@Override
public Event intercept(Event event) {
 String origBody = new String(event.getBody(), charset);
 Matcher matcher = searchPattern.matcher(origBody);
 String newBody = matcher.replaceAll(replaceString);
 event.setBody(newBody.getBytes(charset));
 return event;
}

代码示例来源:origin: keedio/flume-ng-sql-source

@Override
public void write(char[] cbuf, int off, int len) throws IOException {
  Event event = new SimpleEvent();
  
  String s = new String(cbuf);
  event.setBody(s.substring(off, len-1).getBytes(Charset.forName(sqlSourceHelper.getDefaultCharsetResultSet())));
  
  Map<String, String> headers;
  headers = new HashMap<String, String>();
  headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
  event.setHeaders(headers);
  
  events.add(event);
  
  if (events.size() >= sqlSourceHelper.getBatchSize())
    flush();
}

代码示例来源:origin: org.apache.flume/flume-ng-sdk

/**
 * Instantiate an Event instance based on the provided body and headers.
 * If <code>headers</code> is <code>null</code>, then it is ignored.
 * @param body
 * @param headers
 * @return
 */
public static Event withBody(byte[] body, Map<String, String> headers) {
 Event event = new SimpleEvent();
 if (body == null) {
  body = new byte[0];
 }
 event.setBody(body);
 if (headers != null) {
  event.setHeaders(new HashMap<String, String>(headers));
 }
 return event;
}

代码示例来源:origin: keedio/flume-ftp-source

/**
 * @param lastInfo byte[]
 * @void process last appended data to files
 */
private void processMessage(byte[] lastInfo, String fileName, String filePath) {
 byte[] message = lastInfo;
 Event event = new SimpleEvent();
 Map<String, String> headers = new HashMap<>();
 headers.put("fileName", fileName);
 headers.put("filePath", filePath);
 headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
 event.setBody(message);
 event.setHeaders(headers);
 try {
  getChannelProcessor().processEvent(event);
 } catch (ChannelException e) {
  LOGGER.error("ChannelException", e);
 }
 sourceCounter.incrementCountSizeProc(message.length);
 sourceCounter.incrementEventCount();
}

代码示例来源:origin: jaibeermalik/searchanalytics-bigdata

public Event getJsonEvent(
    final SearchQueryInstruction searchQueryInstruction)
    throws JsonProcessingException {
  final String searchQueryInstructionAsString = getObjectMapper()
      .writeValueAsString(searchQueryInstruction);
  // String writeValueAsString =
  // mapper.writerWithDefaultPrettyPrinter().writeValueAsString(searchQueryInstruction);
  searchEventsLogger.info(searchQueryInstructionAsString);
  final Event event = new JSONEvent();
  event.setBody(searchQueryInstructionAsString.getBytes());
  final Map<String, String> headers = new HashMap<String, String>();
  headers.put("eventId", searchQueryInstruction.getEventIdSuffix());
  headers.put("timestamp", searchQueryInstruction
      .getCreatedTimeStampInMillis().toString());
  if (searchQueryInstruction.getClickedDocId() != null) {
    if (searchQueryInstruction.getFavourite() != null
        && searchQueryInstruction.getFavourite()) {
      headers.put("State", "FAVOURITE");
    } else {
      headers.put("State", "VIEWED");
    }
  }
  event.setHeaders(headers);
  return event;
}

代码示例来源:origin: hadooparchitecturebook/hadoop-arch-book

event.setBody(Bytes.toBytes(action.getJSONObject().toString()));
eventActionList.add(event);
actionList.add(action);

代码示例来源:origin: org.apache.flume.flume-ng-sources/flume-jms-source

"Read " + count + " of total " + length);
 event.setBody(body);
String text = textMessage.getText();
if (text != null) {
 event.setBody(text.getBytes(charset));
  out = new ObjectOutputStream(bos);
  out.writeObject(object);
  event.setBody(bos.toByteArray());
 } catch (IOException e) {
  throw new FlumeException("Error serializing object", e);

代码示例来源:origin: stackoverflow.com

event.setBody(newBody.getBytes())
return event;

代码示例来源:origin: handuyishe/rocketmq-flume

mq, messageExt.getTopic(), messageExt.getTags(), messageExt.getBody()});
event.setBody(messageExt.getBody());
event.setHeaders(headers);
eventList.add(event);

代码示例来源:origin: jcustenborder/flume-ng-rabbitmq

Map<String, String> properties = RabbitMQUtil.getHeaders(delivery.getProperties());
Event event = new SimpleEvent();
event.setBody(delivery.getBody());
event.setHeaders(properties);

相关文章