使用flume将数据从ms sql server导入hbase

ctehm74n  于 2021-06-04  发布在  Flume
关注(0)|答案(0)|浏览(352)

我对flume真的是个新手。我更喜欢flume而不是sqoop,因为在我的例子中,数据继续导入到mssqlserver,因此我认为flume是一个更好的选择,它能够实时传输数据。
我只是遵循了一些在线示例,然后编辑了我自己的flume配置文件,它告诉了一些关于源、通道和接收器的信息。然而,Flume似乎没有成功地工作。没有数据传输到hbase。
mssql-hbase.conf文件


# source, channel, sink

agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sk1

# declare source type

agent1.sources.src1.type = org.keedio.flume.source.SQLSource
agent1.sources.src1.hibernate.connection.url = jdbc:sqlserver://xx.xx.xx.xx:1433;DatabaseName=xxxx
agent1.sources.src1.hibernate.connection.user = xxxx
agent1.sources.src1.hibernate.connection.password = xxxx
agent1.sources.src1.table = xxxx

agent1.sources.src1.hibernate.connection.autocommit = true

# declare mysql hibernate dialect

agent1.sources.src1.hibernate.dialect = org.hibernate.dialect.SQLServerDialect
agent1.sources.src1.hibernate.connection.driver_class = com.microsoft.sqlserver.jdbc.SQLServerDriver

# agent1.sources.src1.hibernate.provider_class=org.hibernate.connection.C3P0ConnectionProvider

# agent1.sources.src1.columns.to.select = *

# agent1.sources.src1.incremental.column.name = PK, name, machine, time

# agent1.sources.src1.start.from=0

# agent1.sources.src1.incremental.value = 0

# query time interval

agent1.sources.src1.run.query.delay = 5000

# declare the folder loaction where flume state is saved

agent1.sources.src1.status.file.path = /home/user/flume-source-state
agent1.sources.src1.status.file.name = src1.status

agent1.sources.src1.batch.size = 1000
agent1.sources.src1.max.rows = 1000
agent1.sources.src1.delimiter.entry = |

# set the channel to memory mode

agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 10000
agent1.channels.ch1.byteCapacityBufferPercentage = 20
agent1.channels.ch1.byteCapacity = 800000

# declare sink type

agent1.sinks.sk1.type = org.apache.flume.sink.hbase.HBaseSink
agent1.sinks.sk1.table = yyyy
agent1.sinks.sk1.columnFamily = yyyy
agent1.sinks.sk1.hdfs.batchSize = 100
agent1.sinks.sk1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent1.sinks.sk1.serializer.regex = ^\"(.*?)\",\"(.*?)\",\"(.*?)\"$
agent1.sinks.sk1.serializer.colNames = PK, name, machine, time

# bind source, channel, sink

agent1.sources.src1.channels = ch1
agent1.sinks.sk1.channel = ch1

但是,我使用类似的配置文件将数据从mysql传输到hbase。幸运的是,成功了。
mysql-hbase.conf文件


# source, channel, sink

agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sk1

# declare source type

agent1.sources.src1.type = org.keedio.flume.source.SQLSource
agent1.sources.src1.hibernate.connection.url = jdbc:mysql://xxxx:3306/userdb
agent1.sources.src1.hibernate.connection.user = xxxx
agent1.sources.src1.hibernate.connection.password = xxxx
agent1.sources.src1.table = xxxx

agent1.sources.src1.hibernate.connection.autocommit = true

# declare mysql hibernate dialect

agent1.sources.src1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent1.sources.src1.hibernate.connection.driver_class = com.mysql.jdbc.Driver

# agent1.sources.src1.hibernate.provider_class=org.hibernate.connection.C3P0ConnectionProvider

# agent1.sources.src1.columns.to.select = *

# agent1.sources.src1.incremental.column.name = id

# agent1.sources.src1.incremental.value = 0

# query time interval

agent1.sources.src1.run.query.delay = 5000

# declare the folder loaction where flume state is saved

agent1.sources.src1.status.file.path = /home/user/flume-source-state
agent1.sources.src1.status.file.name = src1.status

# agent1.sources.src1.interceptors=i1

# agent1.sources.src1.interceptors.i1.type=search_replace

# agent1.sources.src1.interceptors.i1.searchPattern="

# agent1.sources.src1.interceptors.i1.replaceString=,

# Set the channel to memory mode

agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 10000
agent1.channels.ch1.byteCapacityBufferPercentage = 20
agent1.channels.ch1.byteCapacity = 800000

# declare sink type

agent1.sinks.sk1.type = org.apache.flume.sink.hbase.HBaseSink
agent1.sinks.sk1.table = user_test_2
agent1.sinks.sk1.columnFamily = user_hobby
agent1.sinks.sk1.hdfs.batchSize = 100
agent1.sinks.sk1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent1.sinks.sk1.serializer.regex = ^\"(.*?)\",\"(.*?)\",\"(.*?)\",\"(.*?)\"$
agent1.sinks.sk1.serializer.colNames = id,name,age,hobby

# bind source, channel, sink

agent1.sources.src1.channels = ch1
agent1.sinks.sk1.channel = ch1

有人知道配置文件有问题吗?谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题