我对flume真的是个新手。我更喜欢flume而不是sqoop,因为在我的例子中,数据继续导入到mssqlserver,因此我认为flume是一个更好的选择,它能够实时传输数据。
我只是遵循了一些在线示例,然后编辑了我自己的flume配置文件,它告诉了一些关于源、通道和接收器的信息。然而,Flume似乎没有成功地工作。没有数据传输到hbase。
mssql-hbase.conf文件
# source, channel, sink
agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sk1
# declare source type
agent1.sources.src1.type = org.keedio.flume.source.SQLSource
agent1.sources.src1.hibernate.connection.url = jdbc:sqlserver://xx.xx.xx.xx:1433;DatabaseName=xxxx
agent1.sources.src1.hibernate.connection.user = xxxx
agent1.sources.src1.hibernate.connection.password = xxxx
agent1.sources.src1.table = xxxx
agent1.sources.src1.hibernate.connection.autocommit = true
# declare mysql hibernate dialect
agent1.sources.src1.hibernate.dialect = org.hibernate.dialect.SQLServerDialect
agent1.sources.src1.hibernate.connection.driver_class = com.microsoft.sqlserver.jdbc.SQLServerDriver
# agent1.sources.src1.hibernate.provider_class=org.hibernate.connection.C3P0ConnectionProvider
# agent1.sources.src1.columns.to.select = *
# agent1.sources.src1.incremental.column.name = PK, name, machine, time
# agent1.sources.src1.start.from=0
# agent1.sources.src1.incremental.value = 0
# query time interval
agent1.sources.src1.run.query.delay = 5000
# declare the folder loaction where flume state is saved
agent1.sources.src1.status.file.path = /home/user/flume-source-state
agent1.sources.src1.status.file.name = src1.status
agent1.sources.src1.batch.size = 1000
agent1.sources.src1.max.rows = 1000
agent1.sources.src1.delimiter.entry = |
# set the channel to memory mode
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 10000
agent1.channels.ch1.byteCapacityBufferPercentage = 20
agent1.channels.ch1.byteCapacity = 800000
# declare sink type
agent1.sinks.sk1.type = org.apache.flume.sink.hbase.HBaseSink
agent1.sinks.sk1.table = yyyy
agent1.sinks.sk1.columnFamily = yyyy
agent1.sinks.sk1.hdfs.batchSize = 100
agent1.sinks.sk1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent1.sinks.sk1.serializer.regex = ^\"(.*?)\",\"(.*?)\",\"(.*?)\"$
agent1.sinks.sk1.serializer.colNames = PK, name, machine, time
# bind source, channel, sink
agent1.sources.src1.channels = ch1
agent1.sinks.sk1.channel = ch1
但是,我使用类似的配置文件将数据从mysql传输到hbase。幸运的是,成功了。
mysql-hbase.conf文件
# source, channel, sink
agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sk1
# declare source type
agent1.sources.src1.type = org.keedio.flume.source.SQLSource
agent1.sources.src1.hibernate.connection.url = jdbc:mysql://xxxx:3306/userdb
agent1.sources.src1.hibernate.connection.user = xxxx
agent1.sources.src1.hibernate.connection.password = xxxx
agent1.sources.src1.table = xxxx
agent1.sources.src1.hibernate.connection.autocommit = true
# declare mysql hibernate dialect
agent1.sources.src1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.src1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
# agent1.sources.src1.hibernate.provider_class=org.hibernate.connection.C3P0ConnectionProvider
# agent1.sources.src1.columns.to.select = *
# agent1.sources.src1.incremental.column.name = id
# agent1.sources.src1.incremental.value = 0
# query time interval
agent1.sources.src1.run.query.delay = 5000
# declare the folder loaction where flume state is saved
agent1.sources.src1.status.file.path = /home/user/flume-source-state
agent1.sources.src1.status.file.name = src1.status
# agent1.sources.src1.interceptors=i1
# agent1.sources.src1.interceptors.i1.type=search_replace
# agent1.sources.src1.interceptors.i1.searchPattern="
# agent1.sources.src1.interceptors.i1.replaceString=,
# Set the channel to memory mode
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 10000
agent1.channels.ch1.byteCapacityBufferPercentage = 20
agent1.channels.ch1.byteCapacity = 800000
# declare sink type
agent1.sinks.sk1.type = org.apache.flume.sink.hbase.HBaseSink
agent1.sinks.sk1.table = user_test_2
agent1.sinks.sk1.columnFamily = user_hobby
agent1.sinks.sk1.hdfs.batchSize = 100
agent1.sinks.sk1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent1.sinks.sk1.serializer.regex = ^\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\"$
agent1.sinks.sk1.serializer.colNames = id,name,age,hobby
# bind source, channel, sink
agent1.sources.src1.channels = ch1
agent1.sinks.sk1.channel = ch1
有人知道配置文件有问题吗?谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!