本文整理了Java中org.apache.helix.manager.zk.ZkClient
类的一些代码示例,展示了ZkClient
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZkClient
类的具体详情如下:
包路径:org.apache.helix.manager.zk.ZkClient
类名称:ZkClient
[英]Raw ZkClient that wraps org.apache.helix.manager.zk.zookeeper.ZkClient, with additional constructors and builder. Note that, instead of directly constructing a raw ZkClient, applications should always use HelixZkClientFactory to build shared or dedicated HelixZkClient instances. Only constructing a raw ZkClient when advanced usage is required. For example, application need to access/manage ZkConnection directly. Both SharedZKClient and DedicatedZkClient are built based on the raw ZkClient. As shown below. ---------------------------- | | --------------------- | | | | *implements SharedZkClient DedicatedZkClient ----> HelixZkClient Interface | | | --------------------- | | | Raw ZkClient (this class)-------- | Native ZkClient TODO Completely replace usage of the raw ZkClient within helix-core. Instead, using HelixZkClient. --JJ
[中]包装组织的原始客户机。阿帕奇。螺旋。经理zk。动物园管理员。ZkClient,以及其他构造函数和生成器。请注意,应用程序不应直接构建原始的ZkClient,而应始终使用HelixZkClientFactory来构建共享或专用的HelixZkClient实例。仅在需要高级使用时构建原始客户机。例如,应用程序需要直接访问/管理连接。SharedZKClient和专用ZkClient都是基于原始ZkClient构建的。如下图所示。-------------------------------------------------------------------------| | | | |*实现SharedZkClient专用ZkClient-->HelixZkClient接口| | | | | | | | Raw ZkClient(该类)--------------本机ZkClient以完全取代在helix core中使用Raw ZkClient。而是使用HelixZkClient--林俊杰
代码示例来源:origin: apache/incubator-pinot
public void start(ControllerMetrics controllerMetrics) {
_controllerMetrics = controllerMetrics;
LOGGER.info("Starting realtime segments manager, adding a listener on the property store table configs path.");
String zkUrl = _pinotHelixResourceManager.getHelixZkURL();
_zkClient = new ZkClient(zkUrl, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
_zkClient.setZkSerializer(new ZNRecordSerializer());
_zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
// Subscribe to any data/child changes to property
_zkClient.subscribeChildChanges(_tableConfigPath, this);
_zkClient.subscribeDataChanges(_tableConfigPath, this);
// Subscribe to leadership changes
ControllerLeadershipManager.getInstance().subscribe(PinotLLCRealtimeSegmentManager.class.getName(), this);
// Setup change listeners for already existing tables, if any.
processPropertyStoreChange(_tableConfigPath);
}
代码示例来源:origin: apache/incubator-pinot
StateModelDefinition newStateModelDef =
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
ZkClient zkClient = new ZkClient(zkPath);
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
zkClient.setZkSerializer(new ZNRecordSerializer());
HelixDataAccessor accessor =
new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), newStateModelDef);
LOGGER.info("Completed updating statemodel {}", segmentStateModelName);
zkClient.close();
代码示例来源:origin: apache/incubator-pinot
@Override
public boolean execute()
throws Exception {
LOGGER.info("Connecting to Zookeeper at address: {}", _zkAddress);
ZkClient zkClient = new ZkClient(_zkAddress, 5000);
String helixClusterName = "/" + _clusterName;
LOGGER.info("Executing command: " + toString());
if (!zkClient.exists(helixClusterName)) {
LOGGER.error("Cluster {} does not exist.", _clusterName);
return false;
}
zkClient.deleteRecursive(helixClusterName);
return true;
}
}
代码示例来源:origin: apache/incubator-pinot
FakeHelixManager(String clusterName, String instanceName, InstanceType instanceType, String zkAddress) {
super(clusterName, instanceName, instanceType, zkAddress);
super._zkclient = new ZkClient(StringUtil.join("/", StringUtils.chomp(ZkStarter.DEFAULT_ZK_STR, "/")),
ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
_zkclient.deleteRecursive("/" + clusterName + "/PROPERTYSTORE");
_zkclient.createPersistent("/" + clusterName + "/PROPERTYSTORE", true);
setPropertyStore(clusterName);
}
代码示例来源:origin: apache/helix
new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
} finally {
if (zkclient != null) {
zkclient.close();
代码示例来源:origin: apache/helix
new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
client.createPersistent(path, true);
client.subscribeDataChanges(path, listener);
client.subscribeChildChanges(path, listener);
"ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + path);
client.unsubscribeDataChanges(path, listener);
client.unsubscribeChildChanges(path, listener);
Assert.assertTrue(childWatch.isEmpty(), "ZooKeeper#watchManager#childWatches should be empty");
client.close();
代码示例来源:origin: apache/helix
final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, TEST_PATH).length;
if (_zkClient.exists(TEST_PATH)) {
_zkClient.delete(TEST_PATH);
if (!_zkClient.exists(TEST_ROOT)) {
_zkClient.createPersistent(TEST_ROOT, true);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max"), 0);
zkClient.exists(TEST_ROOT);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter") >= 0);
0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max"), 0);
zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 1);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"),
Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max"), 0);
zkClient.readData(TEST_PATH, new Stat());
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 2);
Assert
>= origIdealStatesReadTotalLatencyCounter);
Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max") >= 0);
zkClient.getChildren(TEST_PATH);
代码示例来源:origin: apache/incubator-pinot
void closeZkClient() {
_zkclient.close();
}
}
代码示例来源:origin: apache/incubator-pinot
List<String> tables = zkHelixAdmin.getResourcesInCluster(_clusterName);
ZkClient zkClient = new ZkClient(_zkAddress);
zkClient.setZkSerializer(new ZNRecordStreamingSerializer());
LOGGER.info("Connecting to Zookeeper at: {}", _zkAddress);
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
ZKHelixDataAccessor zkHelixDataAccessor = new ZKHelixDataAccessor(_clusterName, baseDataAccessor);
代码示例来源:origin: org.apache.helix/helix-core
public ZKDumper(String zkAddress) {
client = new ZkClient(zkAddress, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
ZkSerializer zkSerializer = new ByteArraySerializer();
client.setZkSerializer(zkSerializer);
filter = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return !name.startsWith(".");
}
};
}
代码示例来源:origin: apache/incubator-pinot
@BeforeClass
public void setUp()
throws Exception {
_zookeeperInstance = ZkStarter.startLocalZkServer();
_zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
final String instanceId = "localhost_helixController";
_pinotHelixResourceManager =
new PinotHelixResourceManager(ZkStarter.DEFAULT_ZK_STR, HELIX_CLUSTER_NAME, instanceId, null, 10000L, true,
/*isUpdateStateModel=*/ false, true);
_pinotHelixResourceManager.start();
_helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
ControllerRequestBuilderUtil
.addFakeDataInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, 1, true);
ControllerRequestBuilderUtil
.addFakeBrokerInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, 1, true);
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size(), 1);
Assert
.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_OFFLINE").size(), 1);
Assert
.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_REALTIME").size(), 1);
// Adding table
TableConfig tableConfig =
new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME).build();
_pinotHelixResourceManager.addTable(tableConfig);
}
代码示例来源:origin: apache/helix
new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
client.delete(path);
client.subscribeDataChanges(path, listener);
client.subscribeChildChanges(path, listener);
"fail to get data-delete callback after session-expiry");
client.close();
代码示例来源:origin: apache/helix
@Test
public void testZkSessionExpiry() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
ZkClient client =
new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
String path = String.format("/%s", clusterName);
client.createEphemeral(path);
String oldSessionId = ZkTestHelper.getSessionId(client);
ZkTestHelper.expireSession(client);
String newSessionId = ZkTestHelper.getSessionId(client);
Assert.assertNotSame(newSessionId, oldSessionId);
Assert.assertFalse(client.exists(path), "Ephemeral znode should be gone after session expiry");
client.close();
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
代码示例来源:origin: org.apache.helix/helix-core
public void post(String zkServer, Message message, String clusterName, String instanceName) {
ZkClient client = new ZkClient(zkServer);
client.setZkSerializer(new ZNRecordSerializer());
String path = PropertyPathBuilder.instanceMessage(clusterName, instanceName, message.getId());
client.delete(path);
ZNRecord record = client.readData(PropertyPathBuilder.liveInstance(clusterName, instanceName));
message.setTgtSessionId(record.getSimpleField(LiveInstanceProperty.SESSION_ID.toString()));
message.setTgtName(record.getId());
// System.out.println(message);
client.createPersistent(path, message.getRecord());
}
代码示例来源:origin: org.apache.helix/helix-core
public static void main(String[] args) {
ZkClient zkclient = new ZkClient("localhost:2191");
zkclient.setZkSerializer(new ZNRecordSerializer());
ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
zkclient.close();
代码示例来源:origin: apache/helix
@Test
public void testCloseZkClient() {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
ZkClient client =
new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
String path = String.format("/%s", clusterName);
client.createEphemeral(path);
client.close();
Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+ " should be removed after ZkClient#close()");
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
代码示例来源:origin: org.apache.helix/helix-core
zkClient.createEphemeral("/" + clusterName + "/CONFIGS/CLUSTER/verify");
} catch (ZkNodeExistsException ex) {
LOG.error("There is already a verification in progress", ex);
zkClient.subscribeChildChanges(extViewPath, listener);
for (String child : zkClient.getChildren(extViewPath)) {
String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
zkClient.subscribeDataChanges(childPath, listener);
zkClient.unsubscribeChildChanges(extViewPath, listener);
for (String child : zkClient.getChildren(extViewPath)) {
String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
zkClient.unsubscribeDataChanges(childPath, listener);
zkClient.delete("/" + clusterName + "/CONFIGS/CLUSTER/verify");
代码示例来源:origin: apache/helix
ZkClient zkClient = ZKClientPool.getZkClient(zkAddr);
zkClient.createPersistent("/" + testName, new ZNRecord(testName));
ZNRecord record = zkClient.readData("/" + testName);
Assert.assertEquals(record.getId(), testName);
try {
zkClient = ZKClientPool.getZkClient(zkAddr);
record = zkClient.readData("/" + testName);
Assert.fail("should fail on zk no node exception");
} catch (ZkNoNodeException e) {
zkClient.createPersistent("/" + testName, new ZNRecord(testName));
record = zkClient.readData("/" + testName);
Assert.assertEquals(record.getId(), testName);
zkClient.close();
TestHelper.stopZkServer(zkServer);
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
代码示例来源:origin: org.apache.helix/helix-core
synchronized (this) {
if (_zkclient != null) {
_zkclient.close();
_zkclient.subscribeStateChanges(this);
int retryCount = 0;
while (retryCount < 3) {
try {
_zkclient.waitUntilConnected(_connectionInitTimeout, TimeUnit.MILLISECONDS);
handleStateChanged(KeeperState.SyncConnected);
handleNewSession();
代码示例来源:origin: org.apache.helix/helix-core
public static void createOrReplace(ZkClient client, String path, final ZNRecord record,
final boolean persistent) {
int retryCount = 0;
while (retryCount < RETRYLIMIT) {
try {
if (client.exists(path)) {
DataUpdater<Object> updater = new DataUpdater<Object>() {
@Override
public Object update(Object currentData) {
return record;
}
};
client.updateDataSerialized(path, updater);
} else {
CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
client.create(path, record, mode);
}
break;
} catch (Exception e) {
retryCount = retryCount + 1;
logger.warn("Exception trying to createOrReplace " + path + " Exception:" + e.getMessage()
+ ". Will retry.");
}
}
}
内容来源于网络,如有侵权,请联系作者删除!