org.apache.flume.Channel.getTransaction()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(223)

本文整理了Java中org.apache.flume.Channel.getTransaction()方法的一些代码示例,展示了Channel.getTransaction()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Channel.getTransaction()方法的具体详情如下:
包路径:org.apache.flume.Channel
类名称:Channel
方法名:getTransaction

Channel.getTransaction介绍

暂无

代码示例

代码示例来源:origin: apache/flume

/**
 * Enter the transaction boundary. This will either begin a new transaction
 * if one didn't already exist. If we're already in a transaction boundary,
 * then this method does nothing.
 *
 * @param channel The Sink's channel
 * @throws EventDeliveryException There was an error starting a new batch
 *                                with the failure policy.
 */
private void enterTransaction(Channel channel) throws EventDeliveryException {
 // There's no synchronization around the transaction instance because the
 // Sink API states "the Sink#process() call is guaranteed to only
 // be accessed  by a single thread". Technically other methods could be
 // called concurrently, but the implementation of SinkRunner waits
 // for the Thread running process() to end before calling stop()
 if (transaction == null) {
  this.transaction = channel.getTransaction();
  transaction.begin();
  failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
 }
}

代码示例来源:origin: apache/ignite

Channel channel = getChannel();
Transaction transaction = channel.getTransaction();

代码示例来源:origin: apache/flume

@Override
 public Status process() throws EventDeliveryException {
  Status result = Status.READY;
  Channel channel = getChannel();
  Transaction transaction = channel.getTransaction();
  Event event = null;

  try {
   transaction.begin();
   event = channel.take();

   if (event != null) {
    if (logger.isInfoEnabled()) {
     logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
    }
   } else {
    // No event found, request back-off semantics from the sink runner
    result = Status.BACKOFF;
   }
   transaction.commit();
  } catch (Exception ex) {
   transaction.rollback();
   throw new EventDeliveryException("Failed to log event: " + event, ex);
  } finally {
   transaction.close();
  }

  return result;
 }
}

代码示例来源:origin: apache/flume

Status status = Status.READY;
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
List<Row> actions = new LinkedList<Row>();
List<Increment> incs = new LinkedList<Increment>();

代码示例来源:origin: apache/flume

Status status = Status.READY;
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
List<Row> actions = new LinkedList<>();
List<Increment> incs = new LinkedList<>();

代码示例来源:origin: apache/flume

Transaction transaction = channel.getTransaction();
Event event = null;
long eventCounter = counterGroup.get("events.success");

代码示例来源:origin: apache/flume

Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
Transaction tx = null;
try {
 tx = optChannel.getTransaction();
 tx.begin();

代码示例来源:origin: apache/flume

Transaction transaction = channel.getTransaction();
transaction.begin();
boolean success = false;

代码示例来源:origin: apache/flume

Status status = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();

代码示例来源:origin: apache/flume

Status status = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();

代码示例来源:origin: apache/flume

Status status = Status.READY;
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
try {
 txn.begin();

代码示例来源:origin: apache/flume

createTopic(topic, 5);
Transaction tx = memoryChannel.getTransaction();
tx.begin();

代码示例来源:origin: apache/flume

partitionMap.put(i, new ArrayList<Event>());
Transaction tx = memoryChannel.getTransaction();
tx.begin();

代码示例来源:origin: apache/flume

headers.put("key", TestConstants.CUSTOM_KEY);
headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);

代码示例来源:origin: apache/flume

@Test
public void testTopicAndKeyFromHeader() {
 Sink kafkaSink = new KafkaSink();
 Context context = prepareDefaultContext();
 Configurables.configure(kafkaSink, context);
 Channel memoryChannel = new MemoryChannel();
 Configurables.configure(memoryChannel, context);
 kafkaSink.setChannel(memoryChannel);
 kafkaSink.start();
 String msg = "test-topic-and-key-from-header";
 Map<String, String> headers = new HashMap<String, String>();
 headers.put("topic", TestConstants.CUSTOM_TOPIC);
 headers.put("key", TestConstants.CUSTOM_KEY);
 Transaction tx = memoryChannel.getTransaction();
 tx.begin();
 Event event = EventBuilder.withBody(msg.getBytes(), headers);
 memoryChannel.put(event);
 tx.commit();
 tx.close();
 try {
  Sink.Status status = kafkaSink.process();
  if (status == Sink.Status.BACKOFF) {
   fail("Error Occurred");
  }
 } catch (EventDeliveryException ex) {
  // ignore
 }
 checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC);
}

代码示例来源:origin: apache/flume

private Sink.Status prepareAndSend(Context context, String msg)
  throws EventDeliveryException {
 Sink kafkaSink = new KafkaSink();
 Configurables.configure(kafkaSink, context);
 Channel memoryChannel = new MemoryChannel();
 Configurables.configure(memoryChannel, context);
 kafkaSink.setChannel(memoryChannel);
 kafkaSink.start();
 Transaction tx = memoryChannel.getTransaction();
 tx.begin();
 Event event = EventBuilder.withBody(msg.getBytes());
 memoryChannel.put(event);
 tx.commit();
 tx.close();
 return kafkaSink.process();
}

代码示例来源:origin: apache/flume

headers.put(KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER, TestConstants.CUSTOM_TOPIC);
headers.put("foo", TestConstants.CUSTOM_TOPIC);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);

代码示例来源:origin: apache/flume

@Test
public void testReplaceSubStringOfTopicWithHeaders() {
 String topic = TestConstants.HEADER_1_VALUE + "-topic";
 Sink kafkaSink = new KafkaSink();
 Context context = prepareDefaultContext();
 context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC);
 Configurables.configure(kafkaSink, context);
 Channel memoryChannel = new MemoryChannel();
 Configurables.configure(memoryChannel, context);
 kafkaSink.setChannel(memoryChannel);
 kafkaSink.start();
 String msg = "test-replace-substring-of-topic-with-headers";
 Map<String, String> headers = new HashMap<>();
 headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
 Transaction tx = memoryChannel.getTransaction();
 tx.begin();
 Event event = EventBuilder.withBody(msg.getBytes(), headers);
 memoryChannel.put(event);
 tx.commit();
 tx.close();
 try {
  Sink.Status status = kafkaSink.process();
  if (status == Sink.Status.BACKOFF) {
   fail("Error Occurred");
  }
 } catch (EventDeliveryException ex) {
  // ignore
 }
 checkMessageArrived(msg, topic);
}

代码示例来源:origin: apache/flume

Map<String, String> headers = new HashMap<String, String>();
headers.put(customTopicHeader, TestConstants.CUSTOM_TOPIC);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);

代码示例来源:origin: apache/flume

@Test
public void testDefaultTopic() {
 Sink kafkaSink = new KafkaSink();
 Context context = prepareDefaultContext();
 Configurables.configure(kafkaSink, context);
 Channel memoryChannel = new MemoryChannel();
 Configurables.configure(memoryChannel, context);
 kafkaSink.setChannel(memoryChannel);
 kafkaSink.start();
 String msg = "default-topic-test";
 Transaction tx = memoryChannel.getTransaction();
 tx.begin();
 Event event = EventBuilder.withBody(msg.getBytes());
 memoryChannel.put(event);
 tx.commit();
 tx.close();
 try {
  Sink.Status status = kafkaSink.process();
  if (status == Sink.Status.BACKOFF) {
   fail("Error Occurred");
  }
 } catch (EventDeliveryException ex) {
  // ignore
 }
 checkMessageArrived(msg, DEFAULT_TOPIC);
}

相关文章