本文整理了Java中org.apache.flume.Channel.put()
方法的一些代码示例,展示了Channel.put()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel.put()
方法的具体详情如下:
包路径:org.apache.flume.Channel
类名称:Channel
方法名:put
[英]Puts the given event into the channel.
Note: This method must be invoked within an active Transaction boundary. Failure to do so can lead to unpredictable results.
[中]将给定事件放入通道。
注意:此方法必须在活动事务边界内调用。不这样做可能导致不可预测的结果。
代码示例来源:origin: apache/flume
@Override
public void run() {
channel.put(event);
}
});
代码示例来源:origin: apache/flume
@Override
public void run() {
for (Event event : events) {
channel.put(event);
}
}
});
代码示例来源:origin: apache/flume
reqChannel.put(event);
optChannel.put(event);
代码示例来源:origin: apache/flume
tx.begin();
reqChannel.put(event);
tx.begin();
optChannel.put(event);
代码示例来源:origin: apache/flume
memoryChannel.put(event);
tx.commit();
tx.close();
代码示例来源:origin: apache/flume
memoryChannel.put(event);
代码示例来源:origin: apache/flume
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
memoryChannel.put(event);
tx.commit();
tx.close();
代码示例来源:origin: apache/flume
@Test
public void testTopicAndKeyFromHeader() {
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
Configurables.configure(memoryChannel, context);
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();
String msg = "test-topic-and-key-from-header";
Map<String, String> headers = new HashMap<String, String>();
headers.put("topic", TestConstants.CUSTOM_TOPIC);
headers.put("key", TestConstants.CUSTOM_KEY);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
memoryChannel.put(event);
tx.commit();
tx.close();
try {
Sink.Status status = kafkaSink.process();
if (status == Sink.Status.BACKOFF) {
fail("Error Occurred");
}
} catch (EventDeliveryException ex) {
// ignore
}
checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC);
}
代码示例来源:origin: apache/flume
private Sink.Status prepareAndSend(Context context, String msg)
throws EventDeliveryException {
Sink kafkaSink = new KafkaSink();
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
Configurables.configure(memoryChannel, context);
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes());
memoryChannel.put(event);
tx.commit();
tx.close();
return kafkaSink.process();
}
代码示例来源:origin: apache/flume
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
memoryChannel.put(event);
tx.commit();
tx.close();
代码示例来源:origin: apache/flume
@Test
public void testReplaceSubStringOfTopicWithHeaders() {
String topic = TestConstants.HEADER_1_VALUE + "-topic";
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC);
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
Configurables.configure(memoryChannel, context);
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();
String msg = "test-replace-substring-of-topic-with-headers";
Map<String, String> headers = new HashMap<>();
headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
memoryChannel.put(event);
tx.commit();
tx.close();
try {
Sink.Status status = kafkaSink.process();
if (status == Sink.Status.BACKOFF) {
fail("Error Occurred");
}
} catch (EventDeliveryException ex) {
// ignore
}
checkMessageArrived(msg, topic);
}
代码示例来源:origin: apache/flume
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
memoryChannel.put(event);
tx.commit();
tx.close();
代码示例来源:origin: apache/flume
@Test
public void testDefaultTopic() {
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
Configurables.configure(memoryChannel, context);
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();
String msg = "default-topic-test";
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes());
memoryChannel.put(event);
tx.commit();
tx.close();
try {
Sink.Status status = kafkaSink.process();
if (status == Sink.Status.BACKOFF) {
fail("Error Occurred");
}
} catch (EventDeliveryException ex) {
// ignore
}
checkMessageArrived(msg, DEFAULT_TOPIC);
}
代码示例来源:origin: forcedotcom/phoenix
channel.put(event);
transaction.commit();
transaction.close();
代码示例来源:origin: forcedotcom/phoenix
channel.put(event);
transaction.commit();
transaction.close();
代码示例来源:origin: forcedotcom/phoenix
channel.put(event);
transaction.commit();
transaction.close();
代码示例来源:origin: apache/phoenix
channel.put(event);
transaction.commit();
transaction.close();
代码示例来源:origin: apache/phoenix
channel.put(event);
transaction.commit();
transaction.close();
代码示例来源:origin: apache/phoenix
@Test
public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException {
final String fullTableName = "FLUME_JSON_TEST";
initSinkContextWithDefaults(fullTableName);
sink = new PhoenixSink();
Configurables.configure(sink, sinkContext);
assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
final Channel channel = this.initChannel();
sink.setChannel(channel);
sink.start();
final String eventBody = "{\"col1\" : \"kalyan\", \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
// put event in channel
Transaction transaction = channel.getTransaction();
transaction.begin();
channel.put(event);
transaction.commit();
transaction.close();
sink.process();
int rowsInDb = countRows(fullTableName);
assertEquals(0, rowsInDb);
sink.stop();
assertEquals(LifecycleState.STOP, sink.getLifecycleState());
dropTable(fullTableName);
}
代码示例来源:origin: apache/phoenix
@Test
public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException {
final String fullTableName = "FLUME_CSV_TEST";
initSinkContextWithDefaults(fullTableName);
sink = new PhoenixSink();
Configurables.configure(sink, sinkContext);
assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
final Channel channel = this.initChannel();
sink.setChannel(channel);
sink.start();
final String eventBody = "kalyan,\"abc,pqr,xyz\",\"1,2,3,4\"";
final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
// put event in channel
Transaction transaction = channel.getTransaction();
transaction.begin();
channel.put(event);
transaction.commit();
transaction.close();
sink.process();
int rowsInDb = countRows(fullTableName);
assertEquals(0, rowsInDb);
sink.stop();
assertEquals(LifecycleState.STOP, sink.getLifecycleState());
dropTable(fullTableName);
}
内容来源于网络,如有侵权,请联系作者删除!