本文整理了Java中org.apache.flume.Channel.getTransaction()
方法的一些代码示例,展示了Channel.getTransaction()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel.getTransaction()
方法的具体详情如下:
包路径:org.apache.flume.Channel
类名称:Channel
方法名:getTransaction
暂无
代码示例来源:origin: apache/flume
/**
* Enter the transaction boundary. This will either begin a new transaction
* if one didn't already exist. If we're already in a transaction boundary,
* then this method does nothing.
*
* @param channel The Sink's channel
* @throws EventDeliveryException There was an error starting a new batch
* with the failure policy.
*/
private void enterTransaction(Channel channel) throws EventDeliveryException {
// There's no synchronization around the transaction instance because the
// Sink API states "the Sink#process() call is guaranteed to only
// be accessed by a single thread". Technically other methods could be
// called concurrently, but the implementation of SinkRunner waits
// for the Thread running process() to end before calling stop()
if (transaction == null) {
this.transaction = channel.getTransaction();
transaction.begin();
failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
}
}
代码示例来源:origin: apache/ignite
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
代码示例来源:origin: apache/flume
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
try {
transaction.begin();
event = channel.take();
if (event != null) {
if (logger.isInfoEnabled()) {
logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
}
} else {
// No event found, request back-off semantics from the sink runner
result = Status.BACKOFF;
}
transaction.commit();
} catch (Exception ex) {
transaction.rollback();
throw new EventDeliveryException("Failed to log event: " + event, ex);
} finally {
transaction.close();
}
return result;
}
}
代码示例来源:origin: apache/flume
Status status = Status.READY;
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
List<Row> actions = new LinkedList<Row>();
List<Increment> incs = new LinkedList<Increment>();
代码示例来源:origin: apache/flume
Status status = Status.READY;
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
List<Row> actions = new LinkedList<>();
List<Increment> incs = new LinkedList<>();
代码示例来源:origin: apache/flume
Transaction transaction = channel.getTransaction();
Event event = null;
long eventCounter = counterGroup.get("events.success");
代码示例来源:origin: apache/flume
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
Transaction tx = null;
try {
tx = optChannel.getTransaction();
tx.begin();
代码示例来源:origin: apache/flume
Transaction transaction = channel.getTransaction();
transaction.begin();
boolean success = false;
代码示例来源:origin: apache/flume
Status status = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
代码示例来源:origin: apache/flume
Status status = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
代码示例来源:origin: apache/flume
Status status = Status.READY;
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
try {
txn.begin();
代码示例来源:origin: apache/flume
createTopic(topic, 5);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
代码示例来源:origin: apache/flume
partitionMap.put(i, new ArrayList<Event>());
Transaction tx = memoryChannel.getTransaction();
tx.begin();
代码示例来源:origin: apache/flume
headers.put("key", TestConstants.CUSTOM_KEY);
headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
代码示例来源:origin: apache/flume
@Test
public void testTopicAndKeyFromHeader() {
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
Configurables.configure(memoryChannel, context);
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();
String msg = "test-topic-and-key-from-header";
Map<String, String> headers = new HashMap<String, String>();
headers.put("topic", TestConstants.CUSTOM_TOPIC);
headers.put("key", TestConstants.CUSTOM_KEY);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
memoryChannel.put(event);
tx.commit();
tx.close();
try {
Sink.Status status = kafkaSink.process();
if (status == Sink.Status.BACKOFF) {
fail("Error Occurred");
}
} catch (EventDeliveryException ex) {
// ignore
}
checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC);
}
代码示例来源:origin: apache/flume
private Sink.Status prepareAndSend(Context context, String msg)
throws EventDeliveryException {
Sink kafkaSink = new KafkaSink();
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
Configurables.configure(memoryChannel, context);
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes());
memoryChannel.put(event);
tx.commit();
tx.close();
return kafkaSink.process();
}
代码示例来源:origin: apache/flume
headers.put(KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER, TestConstants.CUSTOM_TOPIC);
headers.put("foo", TestConstants.CUSTOM_TOPIC);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
代码示例来源:origin: apache/flume
@Test
public void testReplaceSubStringOfTopicWithHeaders() {
String topic = TestConstants.HEADER_1_VALUE + "-topic";
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC);
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
Configurables.configure(memoryChannel, context);
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();
String msg = "test-replace-substring-of-topic-with-headers";
Map<String, String> headers = new HashMap<>();
headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
memoryChannel.put(event);
tx.commit();
tx.close();
try {
Sink.Status status = kafkaSink.process();
if (status == Sink.Status.BACKOFF) {
fail("Error Occurred");
}
} catch (EventDeliveryException ex) {
// ignore
}
checkMessageArrived(msg, topic);
}
代码示例来源:origin: apache/flume
Map<String, String> headers = new HashMap<String, String>();
headers.put(customTopicHeader, TestConstants.CUSTOM_TOPIC);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
代码示例来源:origin: apache/flume
@Test
public void testDefaultTopic() {
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
Configurables.configure(memoryChannel, context);
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();
String msg = "default-topic-test";
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes());
memoryChannel.put(event);
tx.commit();
tx.close();
try {
Sink.Status status = kafkaSink.process();
if (status == Sink.Status.BACKOFF) {
fail("Error Occurred");
}
} catch (EventDeliveryException ex) {
// ignore
}
checkMessageArrived(msg, DEFAULT_TOPIC);
}
内容来源于网络,如有侵权,请联系作者删除!