本文整理了Java中com.alibaba.datax.common.util.Configuration.getLong()
方法的一些代码示例,展示了Configuration.getLong()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.getLong()
方法的具体详情如下:
包路径:com.alibaba.datax.common.util.Configuration
类名称:Configuration
方法名:getLong
[英]根据用户提供的json path,寻址Long对象
[中]根据用户提供的json路径寻址长的对象
代码示例来源:origin: ECNU-1X/DataX-Masking
/**
* 根据用户提供的json path,寻址Long对象,如果对象不存在,返回默认Long对象
*
* @return Long对象,如果path不存在或者Integer不存在,返回默认Long对象
*/
public Long getLong(final String path, long defaultValue) {
Long result = this.getLong(path);
if (null == result) {
return defaultValue;
}
return result;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public AbstractContainerCommunicator(Configuration configuration) {
this.configuration = configuration;
this.jobId = configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public ErrorRecordChecker(Configuration configuration) {
this(configuration.getLong(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_RECORD),
configuration.getDouble(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_PERCENT));
}
代码示例来源:origin: ECNU-1X/DataX-Masking
long byteSpeed = configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024);
long recordSpeed = configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000);
this.byteSpeed = byteSpeed;
this.recordSpeed = recordSpeed;
this.flowControlInterval = configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_FLOWCONTROLINTERVAL, 1000);
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public void init() {
this.writerSliceConfig = getPluginJobConf();
this.print = this.writerSliceConfig.getBool(Key.PRINT, true);
this.recordNumBeforSleep = this.writerSliceConfig.getLong(Key.RECORD_NUM_BEFORE_SLEEP, 0);
this.sleepTime = this.writerSliceConfig.getLong(Key.SLEEP_TIME, 0);
String servers = this.writerSliceConfig.getString(Key.SERVERS);
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<>(props);
if(recordNumBeforSleep < 0) {
throw DataXException.asDataXException(StreamKafkaWriterErrorCode.CONFIG_INVALID_EXCEPTION, "recordNumber 不能为负值");
}
if(sleepTime <0) {
throw DataXException.asDataXException(StreamKafkaWriterErrorCode.CONFIG_INVALID_EXCEPTION, "sleep 不能为负值");
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public void init() {
this.readerSliceConfig = super.getPluginJobConf();
this.columns = this.readerSliceConfig.getList(Key.COLUMN,
String.class);
this.sliceRecordCount = this.readerSliceConfig
.getLong(Key.SLICE_RECORD_COUNT);
this.haveMixupFunction = this.readerSliceConfig.getBool(
Constant.HAVE_MIXUP_FUNCTION, false);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
private void preCheckInit() {
this.jobId = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);
if (this.jobId < 0) {
LOG.info("Set jobId = 0");
this.jobId = 0;
this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
this.jobId);
}
Thread.currentThread().setName("job-" + this.jobId);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator());
this.jobReader = this.preCheckReaderInit(jobPluginCollector);
this.jobWriter = this.preCheckWriterInit(jobPluginCollector);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
// warn: 忽略大小写
this.mixupFunctionPattern = Pattern.compile(Constant.MIXUP_FUNCTION_PATTERN, Pattern.CASE_INSENSITIVE);
dealColumn(this.originalConfig);
Long sliceRecordCount = this.originalConfig
.getLong(Key.SLICE_RECORD_COUNT);
if (null == sliceRecordCount) {
throw DataXException.asDataXException(StreamReaderErrorCode.REQUIRED_VALUE,
"没有设置参数[sliceRecordCount].");
} else if (sliceRecordCount < 1) {
throw DataXException.asDataXException(StreamReaderErrorCode.ILLEGAL_VALUE,
"参数[sliceRecordCount]不能小于1.");
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
/**
* reader和writer的初始化
*/
private void init() {
this.jobId = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);
if (this.jobId < 0) {
LOG.info("Set jobId = 0");
this.jobId = 0;
this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
this.jobId);
}
Thread.currentThread().setName("job-" + this.jobId);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator());
//必须先Reader ,后Writer
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static String prepareJdbcUrl(Configuration conf) {
String adsURL = conf.getString(Key.ADS_URL);
String schema = conf.getString(Key.SCHEMA);
Long socketTimeout = conf.getLong(Key.SOCKET_TIMEOUT,
Constant.DEFAULT_SOCKET_TIMEOUT);
String suffix = conf.getString(Key.JDBC_URL_SUFFIX, "");
return AdsUtil.prepareJdbcUrl(adsURL, schema, socketTimeout, suffix);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public StandAloneJobContainerCommunicator(Configuration configuration) {
super(configuration);
super.setCollector(new ProcessInnerCollector(configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID)));
super.setReporter(new ProcessInnerReporter());
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public TaskGroupContainer(Configuration configuration) {
super(configuration);
initCommunicator(configuration);
this.jobId = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
this.taskGroupId = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
this.channelClazz = this.configuration.getString(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CLASS);
this.taskCollectorClass = this.configuration.getString(
CoreConstant.DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
@Override
public void init() {
this.writerSliceConfig = getPluginJobConf();
this.fieldDelimiter = this.writerSliceConfig.getString(
Key.FIELD_DELIMITER, "\t");
this.print = this.writerSliceConfig.getBool(Key.PRINT, true);
this.path = this.writerSliceConfig.getString(Key.PATH, null);
this.fileName = this.writerSliceConfig.getString(Key.FILE_NAME, null);
this.recordNumBeforSleep = this.writerSliceConfig.getLong(Key.RECORD_NUM_BEFORE_SLEEP, 0);
this.sleepTime = this.writerSliceConfig.getLong(Key.SLEEP_TIME, 0);
if(recordNumBeforSleep < 0) {
throw DataXException.asDataXException(StreamWriterErrorCode.CONFIG_INVALID_EXCEPTION, "recordNumber 不能为负值");
}
if(sleepTime <0) {
throw DataXException.asDataXException(StreamWriterErrorCode.CONFIG_INVALID_EXCEPTION, "sleep 不能为负值");
}
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
originalConfig.getNecessaryValue(Key.HBASE_CONFIG, Hbase11xWriterErrorCode.REQUIRED_VALUE);
originalConfig.getNecessaryValue(Key.TABLE, Hbase11xWriterErrorCode.REQUIRED_VALUE);
Hbase11xHelper.validateMode(originalConfig);
String encoding = originalConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
if (!Charset.isSupported(encoding)) {
throw DataXException.asDataXException(Hbase11xWriterErrorCode.ILLEGAL_VALUE, String.format("Hbasewriter 不支持您所配置的编码:[%s]", encoding));
}
originalConfig.set(Key.ENCODING, encoding);
Boolean walFlag = originalConfig.getBool(Key.WAL_FLAG, false);
originalConfig.set(Key.WAL_FLAG, walFlag);
long writeBufferSize = originalConfig.getLong(Key.WRITE_BUFFER_SIZE,Constant.DEFAULT_WRITE_BUFFER_SIZE);
originalConfig.set(Key.WRITE_BUFFER_SIZE, writeBufferSize);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static AdsHelper createAdsHelper(Configuration originalConfig){
//Get adsUrl,userName,password,schema等参数,创建AdsHelp实例
String adsUrl = originalConfig.getString(Key.ADS_URL);
String userName = originalConfig.getString(Key.USERNAME);
String password = originalConfig.getString(Key.PASSWORD);
String schema = originalConfig.getString(Key.SCHEMA);
Long socketTimeout = originalConfig.getLong(Key.SOCKET_TIMEOUT, Constant.DEFAULT_SOCKET_TIMEOUT);
String suffix = originalConfig.getString(Key.JDBC_URL_SUFFIX, "");
return new AdsHelper(adsUrl,userName,password,schema,socketTimeout,suffix);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static AdsHelper createAdsHelperWithOdpsAccount(Configuration originalConfig) {
String adsUrl = originalConfig.getString(Key.ADS_URL);
String userName = originalConfig.getString(TransferProjectConf.KEY_ACCESS_ID);
String password = originalConfig.getString(TransferProjectConf.KEY_ACCESS_KEY);
String schema = originalConfig.getString(Key.SCHEMA);
Long socketTimeout = originalConfig.getLong(Key.SOCKET_TIMEOUT, Constant.DEFAULT_SOCKET_TIMEOUT);
String suffix = originalConfig.getString(Key.JDBC_URL_SUFFIX, "");
return new AdsHelper(adsUrl, userName, password, schema,socketTimeout,suffix);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
originalConfig.getNecessaryValue(Key.HBASE_CONFIG, Hbase094xWriterErrorCode.REQUIRED_VALUE);
originalConfig.getNecessaryValue(Key.TABLE, Hbase094xWriterErrorCode.REQUIRED_VALUE);
Hbase094xHelper.validateMode(originalConfig);
String encoding = originalConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
if (!Charset.isSupported(encoding)) {
throw DataXException.asDataXException(Hbase094xWriterErrorCode.ILLEGAL_VALUE, String.format("Hbasewriter 不支持您所配置的编码:[%s]", encoding));
}
originalConfig.set(Key.ENCODING, encoding);
Boolean autoFlush = originalConfig.getBool(Key.AUTO_FLUSH, false);
//本期设置autoflush 一定为flase,通过hbase writeBufferSize来控制每次flush大小
originalConfig.set(Key.AUTO_FLUSH,false);
Boolean walFlag = originalConfig.getBool(Key.WAL_FLAG, false);
originalConfig.set(Key.WAL_FLAG, walFlag);
long writeBufferSize = originalConfig.getLong(Key.WRITE_BUFFER_SIZE,Constant.DEFAULT_WRITE_BUFFER_SIZE);
originalConfig.set(Key.WRITE_BUFFER_SIZE, writeBufferSize);
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static Table getTable(com.alibaba.datax.common.util.Configuration configuration){
String hbaseConfig = configuration.getString(Key.HBASE_CONFIG);
String userTable = configuration.getString(Key.TABLE);
long writeBufferSize = configuration.getLong(Key.WRITE_BUFFER_SIZE, Constant.DEFAULT_WRITE_BUFFER_SIZE);
org.apache.hadoop.hbase.client.Connection hConnection = Hbase11xHelper.getHbaseConnection(hbaseConfig);
TableName hTableName = TableName.valueOf(userTable);
org.apache.hadoop.hbase.client.Admin admin = null;
org.apache.hadoop.hbase.client.Table hTable = null;
try {
admin = hConnection.getAdmin();
Hbase11xHelper.checkHbaseTable(admin,hTableName);
hTable = hConnection.getTable(hTableName);
BufferedMutatorParams bufferedMutatorParams = new BufferedMutatorParams(hTableName);
bufferedMutatorParams.writeBufferSize(writeBufferSize);
} catch (Exception e) {
Hbase11xHelper.closeTable(hTable);
Hbase11xHelper.closeAdmin(admin);
Hbase11xHelper.closeConnection(hConnection);
throw DataXException.asDataXException(Hbase11xWriterErrorCode.GET_HBASE_TABLE_ERROR, e);
}
return hTable;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static BufferedMutator getBufferedMutator(com.alibaba.datax.common.util.Configuration configuration){
String hbaseConfig = configuration.getString(Key.HBASE_CONFIG);
String userTable = configuration.getString(Key.TABLE);
long writeBufferSize = configuration.getLong(Key.WRITE_BUFFER_SIZE, Constant.DEFAULT_WRITE_BUFFER_SIZE);
org.apache.hadoop.conf.Configuration hConfiguration = Hbase11xHelper.getHbaseConfiguration(hbaseConfig);
org.apache.hadoop.hbase.client.Connection hConnection = Hbase11xHelper.getHbaseConnection(hbaseConfig);
TableName hTableName = TableName.valueOf(userTable);
org.apache.hadoop.hbase.client.Admin admin = null;
BufferedMutator bufferedMutator = null;
try {
admin = hConnection.getAdmin();
Hbase11xHelper.checkHbaseTable(admin,hTableName);
//参考HTable getBufferedMutator()
bufferedMutator = hConnection.getBufferedMutator(
new BufferedMutatorParams(hTableName)
.pool(HTable.getDefaultExecutor(hConfiguration))
.writeBufferSize(writeBufferSize));
} catch (Exception e) {
Hbase11xHelper.closeBufferedMutator(bufferedMutator);
Hbase11xHelper.closeAdmin(admin);
Hbase11xHelper.closeConnection(hConnection);
throw DataXException.asDataXException(Hbase11xWriterErrorCode.GET_HBASE_BUFFEREDMUTATOR_ERROR, e);
}
return bufferedMutator;
}
代码示例来源:origin: ECNU-1X/DataX-Masking
public static HTable getTable(com.alibaba.datax.common.util.Configuration configuration){
String hbaseConfig = configuration.getString(Key.HBASE_CONFIG);
String userTable = configuration.getString(Key.TABLE);
org.apache.hadoop.conf.Configuration hConfiguration = Hbase094xHelper.getHbaseConfiguration(hbaseConfig);
Boolean autoFlush = configuration.getBool(Key.AUTO_FLUSH, false);
long writeBufferSize = configuration.getLong(Key.WRITE_BUFFER_SIZE, Constant.DEFAULT_WRITE_BUFFER_SIZE);
HTable htable = null;
HBaseAdmin admin = null;
try {
htable = new HTable(hConfiguration, userTable);
admin = new HBaseAdmin(hConfiguration);
Hbase094xHelper.checkHbaseTable(admin,htable);
//本期设置autoflush 一定为flase,通过hbase writeBufferSize来控制每次flush大小
htable.setAutoFlush(false);
htable.setWriteBufferSize(writeBufferSize);
return htable;
} catch (Exception e) {
Hbase094xHelper.closeTable(htable);
throw DataXException.asDataXException(Hbase094xWriterErrorCode.GET_HBASE_TABLE_ERROR, e);
}finally {
Hbase094xHelper.closeAdmin(admin);
}
}
内容来源于网络,如有侵权,请联系作者删除!