flume:数据传输到服务器

eagi6jfj  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(444)

我是个新手。我要写一个程序,它可以将文本文件传输到其他程序(代理)。我知道我们必须知道代理,即主机ip,端口号等,然后一个源,接收器和通道应该被定义。我只想把一个日志文件传送到服务器。我的客户代码如下。公共类myrpcclientfacade{

public class MyClient{

  private RpcClient client;
  private String hostname;
  private int port;

  public void init(String hostname, int port) {
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);

      }

      public void sendDataToFlume(String data) {
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
        try {
          client.append(event);
        } catch (EventDeliveryException e) {
          client.close();
          client = null;
          client = RpcClientFactory.getDefaultInstance(hostname, port);
        }
      }

      public void cleanUp() {
        client.close();
      }
}

以上代码只能发送 String 数据到指定进程。但我得发文件。另外请告诉我 Source,Channel and Sink 必须写入服务器吗?如果是,如何配置和编写这三个。请帮帮我。给一个小样品 Source,Sink And Channel

balp4ylt

balp4ylt1#

实际上,您只需要在每个节点上获得flume客户端。然后提供一个配置文件,提供有关其行为的信息。例如,如果您的节点读取一个文件(读取每个新行并将它们作为事件发送到通道),并通过rpc套接字发送文件内容。您的配置如下所示:


# sources/sinks/channels list

  <Agent>.sources = <Name Source1>
  <Agent>.sinks = <Name Sink1>
  <Agent>.channels = <Name Channel1> 
  # Channel attribution to a source
  <Agent>.sources.<Name Source1>.channels = <Name Channel1>
  # Channel attribution to sink
  <Agent>.sinks.<Name Sink1>.channels = <Name Channel1>
  # Configuration (sources,channels and sinks)
  # Source properties : <Name Source1>
  <Agent>.sources.<Name Source1>.type = exec
  <Agent>.sources.<Name Source1>.command = tail -F test
  <Agent>.sources.<Name Source1>.channels = <Name Channel1>
  # Channel properties : <Name Channel1>
  <Agent>.channels.<Name Channel1>.type = memory
  <Agent>.channels.<Name Channel1>.capacity = 1000
  <Agent>.channels.<Name Channel1>.transactionCapacity = 1000
  # Sink properties : <Name Sink1>
  <Agent>.sinks.<Nom Sink1>.type = avro
  <Agent>.sinks.<Nom Sink1>.channel = <Nom Channel1>
  <Agent>.sinks.<Nom Sink1>.hostname = <HOST NAME or IP>
  <Agent>.sinks.<Nom Sink1>.port = <PORT NUMBER>

然后您必须设置一个代理,它将读取同一端口上的avro源代码,并以您希望的存储方式处理事件。希望有帮助;)

相关问题