本文整理了Java中org.jgroups.Message
类的一些代码示例,展示了Message
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message
类的具体详情如下:
包路径:org.jgroups.Message
类名称:Message
[英]A Message encapsulates data sent to members of a group. It contains among other things the address of the sender, the destination address, a payload (byte buffer) and a list of headers. Headers are added by protocols on the sender side and removed by protocols on the receiver's side.
The byte buffer can point to a reference, and we can subset it using index and length. However, when the message is serialized, we only write the bytes between index and length.
[中]消息封装发送给组成员的数据。它包含发送方地址、目的地地址、有效负载(字节缓冲区)和头列表。报头由发送方的协议添加,由接收方的协议删除。
字节缓冲区可以指向一个引用,我们可以使用索引和长度将其子集。然而,当消息被序列化时,我们只写入索引和长度之间的字节。
代码示例来源:origin: wildfly/wildfly
private void handleFlushReconcile(Message msg) {
Address requester = msg.getSrc();
Tuple<Collection<? extends Address>,Digest> tuple=readParticipantsAndDigest(msg.getRawBuffer(),
msg.getOffset(),msg.getLength());
Digest reconcileDigest = tuple.getVal2();
if (log.isDebugEnabled())
log.debug(localAddress + ": received FLUSH_RECONCILE, passing digest to NAKACK "
+ reconcileDigest);
// Let NAKACK reconcile missing messages
down_prot.down(new Event(Event.REBROADCAST, reconcileDigest));
if (log.isDebugEnabled())
log.debug(localAddress + ": returned from FLUSH_RECONCILE, "
+ " sending RECONCILE_OK to " + requester);
Message reconcileOk = new Message(requester).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL)
.putHeader(this.id,new FlushHeader(FlushHeader.FLUSH_RECONCILE_OK));
down_prot.down(reconcileOk);
}
代码示例来源:origin: wildfly/wildfly
public void receive(Message msg) {
Address sender=msg.getSrc();
System.out.println("<< " + msg.getObject() + " from " + sender);
Address dst=msg.getDest();
if(dst == null) {
Message rsp=new Message(msg.getSrc(), "this is a response");
try {
ch.send(rsp);
}
catch(Exception e) {
e.printStackTrace();
}
}
}
代码示例来源:origin: wildfly/wildfly
protected void handleSecretKeyRequest(final Message msg) {
if(!inView(msg.src(), "key requester %s is not in current view %s; ignoring key request"))
return;
log.debug("%s: received secret key request from %s", local_addr, msg.getSrc());
try {
PublicKey tmpKey=generatePubKey(msg.getBuffer());
sendSecretKey(secret_key, tmpKey, msg.getSrc());
}
catch(Exception e) {
log.warn("%s: unable to reconstitute peer's public key", local_addr);
}
}
代码示例来源:origin: wildfly/wildfly
/**
* If the sender is null, set our own address. We cannot just go ahead and set the address
* anyway, as we might be sending a message on behalf of someone else ! E.g. in case of
* retransmission, when the original sender has crashed, or in a FLUSH protocol when we
* have to return all unstable messages with the FLUSH_OK response.
*/
protected void setSourceAddress(Message msg) {
if(msg.getSrc() == null && local_addr != null) // should already be set by TP.ProtocolAdapter in shared transport case !
msg.setSrc(local_addr);
}
代码示例来源:origin: wildfly/wildfly
public void send(Object dest, byte[] buf, int offset, int length) throws Exception {
Message msg=new Message((Address)dest, buf, offset, length);
if(oob)
msg.setFlag(Message.Flag.OOB);
if(dont_bundle)
msg.setFlag(Message.Flag.DONT_BUNDLE);
ch.send(msg);
}
代码示例来源:origin: wildfly/wildfly
public Object down(Message msg) {
if(msg.getSrc() == null)
msg.setSrc(local_addr);
Buffer serialized_msg=Util.streamableToBuffer(msg);
// exclude existing headers, they will be seen again when we unmarshal the message at the receiver
Message tmp=msg.copy(false, false).setBuffer(serialized_msg);
return down_prot.down(tmp);
}
代码示例来源:origin: wildfly/wildfly
public Object up(Message msg) {
GmsHeader hdr=msg.getHeader(this.id);
if(hdr == null)
return up_prot.up(msg);
break;
case GmsHeader.JOIN_RSP:
JoinRsp join_rsp=readJoinRsp(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
if(join_rsp != null)
impl.handleJoinResponse(join_rsp);
break;
case GmsHeader.VIEW:
Tuple<View,Digest> tuple=readViewAndDigest(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
if(tuple == null)
return null;
log.warn("%s: failed to create view from delta-view; dropping view: %s", local_addr, t.toString());
log.trace("%s: sending request for full view to %s", local_addr, msg.src());
Message full_view_req=new Message(msg.src())
.putHeader(id, new GmsHeader(GmsHeader.GET_CURRENT_VIEW)).setFlag(OOB, INTERNAL);
down_prot.down(full_view_req);
return null;
Address coord=msg.getSrc();
if(!new_view.containsMember(coord)) {
Address sender=msg.getSrc();
ack_collector.ack(sender);
return null; // don't pass further up
代码示例来源:origin: wildfly/wildfly
private void rejectFlush(Collection<? extends Address> participants, long viewId) {
if(participants == null)
return;
for (Address flushMember : participants) {
if(flushMember == null)
continue;
Message reject = new Message(flushMember).src(localAddress).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL)
.putHeader(this.id, new FlushHeader(FlushHeader.ABORT_FLUSH, viewId))
.setBuffer(marshal(participants, null));
down_prot.down(reject);
}
}
代码示例来源:origin: wildfly/wildfly
public void run() {
Message heartbeat=new Message().setFlag(Message.Flag.INTERNAL).putHeader(id, new HeartbeatHeader());
down_prot.down(heartbeat);
num_heartbeats_sent++;
log.trace("Sent heartbeat");
}
代码示例来源:origin: wildfly/wildfly
protected void sendResponse(Address dest, short id) {
try {
RsvpHeader hdr=new RsvpHeader(RsvpHeader.RSP,id);
Message msg=new Message(dest) .putHeader(this.id, hdr)
.setFlag(Message.Flag.RSVP, Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE, Message.Flag.OOB);
if(log.isTraceEnabled())
log.trace(local_addr + ": " + hdr.typeToString() + " --> " + dest);
down_prot.down(msg);
}
catch(Throwable t) {
log.error(Util.getMessage("FailedSendingResponse"), t);
}
}
代码示例来源:origin: wildfly/wildfly
public Object down(Event evt) {
switch(evt.getType()) {
case Event.VIEW_CHANGE:
Object retval=down_prot.down(evt);
View view=evt.getArg();
members.addAll(view.getMembers());
bcast_task.adjustSuspectedMembers(members);
computePingDest(null);
if(view.size() <= 1)
stopMonitor();
else if(!isMonitorRunning())
FdHeader hdr=new FdHeader(FdHeader.UNSUSPECT);
hdr.mbrs=new ArrayList<>();
hdr.mbrs.add(evt.getArg());
hdr.from=local_addr;
Message unsuspect_msg=new Message().setFlag(Message.Flag.INTERNAL).putHeader(id, hdr);
log.trace("%s: broadcasting UNSUSPECT message (mbrs=%s)", local_addr, hdr.mbrs);
down_prot.down(unsuspect_msg);
break;
break;
return down_prot.down(evt);
代码示例来源:origin: wildfly/wildfly
final List<Address> current_mbrs=view.getMembers();
return new MutableDigest(view.getMembersRaw())
.set((Digest)gms.getDownProtocol().down(new Event(Event.GET_DIGEST, gms.local_addr)));
Message get_digest_req=new Message().setFlag(Message.Flag.OOB, Message.Flag.INTERNAL)
.putHeader(gms.getId(), new GMS.GmsHeader(GMS.GmsHeader.GET_DIGEST_REQ).mergeId(merge_id));
gms.getDownProtocol().down(get_digest_req);
Digest digest=(Digest)gms.getDownProtocol().down(new Event(Event.GET_DIGEST, gms.local_addr));
digest_collector.add(gms.local_addr, digest);
digest_collector.waitForAllResponses(max_wait_time);
if(log.isTraceEnabled()) {
if(digest_collector.hasAllResponses())
log.trace("%s: fetched all digests for %s", gms.local_addr, current_mbrs);
else
log.trace("%s: fetched incomplete digests (after timeout of %d) ms for %s",
gms.local_addr, max_wait_time, current_mbrs);
代码示例来源:origin: wildfly/wildfly
log.trace("%s: mcasting view %s", local_addr, new_view);
up_prot.up(new Event(Event.TMP_VIEW, new_view));
down_prot.down(new Event(Event.TMP_VIEW, new_view));
Message view_change_msg=new Message().putHeader(this.id, new GmsHeader(GmsHeader.VIEW))
.setBuffer(marshal(new_view, digest)).setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
if(new_view instanceof MergeView) // https://issues.jboss.org/browse/JGRP-1484
view_change_msg.setFlag(Message.Flag.NO_TOTAL_ORDER);
down_prot.down(view_change_msg);
sendJoinResponses(jr, joiners);
try {
if(ack_collector.size() > 0) {
ack_collector.waitForAllAcks(view_ack_collection_timeout);
log.trace("%s: got all ACKs (%d) for view %s in %d ms",
local_addr, ack_collector.expectedAcks(), new_view.getViewId(), System.currentTimeMillis()-start);
log.warn("%s: failed to collect all ACKs (expected=%d) for view %s after %dms, missing %d ACKs from %s",
local_addr, ack_collector.expectedAcks(), new_view.getViewId(), view_ack_collection_timeout,
ack_collector.size(), ack_collector.printMissing());
代码示例来源:origin: wildfly/wildfly
public Object down(Event evt) {
switch(evt.getType()) {
handleViewChange(evt.getArg());
break;
StateTransferInfo info=evt.getArg();
Address target=info.target;
log.error("%s: cannot fetch state from myself", local_addr);
target=null;
log.debug("%s: first member (no state)", local_addr);
up_prot.up(new Event(Event.STATE_TRANSFER_INPUTSTREAM_CLOSED, new StateTransferResult()));
Message state_req=new Message(target).putHeader(this.id, new StateHeader(StateHeader.STATE_REQ))
.setFlag(Message.Flag.SKIP_BARRIER, Message.Flag.DONT_BUNDLE, Message.Flag.OOB);
log.debug("%s: asking %s for state", local_addr, target);
down_prot.down(state_req);
return down_prot.down(evt); // pass on to the layer below us
代码示例来源:origin: wildfly/wildfly
/** send client's public key to server and request server's public key */
protected void sendKeyRequest(Address key_server) {
if(key_server == null)
return;
if(last_key_request == 0 || System.currentTimeMillis() - last_key_request > min_time_between_key_requests)
last_key_request=System.currentTimeMillis();
else
return;
if(use_external_key_exchange) {
log.debug("%s: asking key exchange protocol to get secret key from %s", local_addr, key_server);
down_prot.down(new Event(Event.FETCH_SECRET_KEY, key_server));
return;
}
log.debug("%s: asking %s for the secret key (my version: %s)",
local_addr, key_server, Util.byteArrayToHexString(sym_version));
Message newMsg=new Message(key_server, key_pair.getPublic().getEncoded()).src(local_addr)
.putHeader(this.id,new EncryptHeader(EncryptHeader.SECRET_KEY_REQ, null));
down_prot.down(newMsg);
}
代码示例来源:origin: wildfly/wildfly
@Override
public void members(List<PingData> mbrs) {
PhysicalAddress own_physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
PingData data=new PingData(local_addr, false, org.jgroups.util.NameCache.get(local_addr), own_physical_addr);
PingHeader hdr=new PingHeader(PingHeader.GET_MBRS_REQ).clusterName(cluster_name);
Set<PhysicalAddress> physical_addrs=mbrs.stream().filter(ping_data -> ping_data != null && ping_data.getPhysicalAddr() != null)
.map(PingData::getPhysicalAddr).collect(Collectors.toSet());
for(PhysicalAddress physical_addr: physical_addrs) {
if(physical_addr != null && own_physical_addr.equals(physical_addr)) // no need to send the request to myself
continue;
// the message needs to be DONT_BUNDLE, see explanation above
final Message msg=new Message(physical_addr).setFlag(Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE, Message.Flag.OOB)
.putHeader(this.id, hdr).setBuffer(marshal(data));
log.trace("%s: sending discovery request to %s", local_addr, msg.getDest());
down_prot.down(msg);
}
}
代码示例来源:origin: wildfly/wildfly
public Object up(Message msg) {
PingHeader hdr=msg.getHeader(this.id);
if(hdr == null)
return up_prot.up(msg);
PingData data=readPingData(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
Address logical_addr=data != null? data.getAddress() : msg.src();
log.warn("cluster_name (%s) or cluster_name of header (%s) is null; passing up discovery " +
"request from %s, but this should not be the case", cluster_name, hdr.cluster_name, msg.src());
log.warn("%s: discarding discovery request for cluster '%s' from %s; " +
"our cluster name is '%s'. Please separate your clusters properly",
logical_addr, hdr.cluster_name, msg.src(), cluster_name);
return null;
discoveryRequestReceived(msg.getSrc(), data.getLogicalName(), data.getPhysicalAddr());
addResponse(data, false);
if(addr.equals(local_addr) || (view != null && view.containsMember(addr))) {
PhysicalAddress physical_addr=entry.getValue();
sendDiscoveryResponse(addr, physical_addr, NameCache.get(addr), msg.getSrc(), isCoord(addr));
boolean drop_because_of_rank=use_ip_addrs && max_rank_to_reply > 0 && hdr.initialDiscovery() && Util.getRank(view, local_addr) > max_rank_to_reply;
if(drop_because_of_rank || (mbrs != null && !mbrs.contains(local_addr)))
return null;
PhysicalAddress physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
sendDiscoveryResponse(local_addr, physical_addr, NameCache.get(local_addr), msg.getSrc(), is_coord);
return null;
代码示例来源:origin: wildfly/wildfly
protected void getStateFromApplication(Address requester, Digest digest) {
StateTransferInfo rsp=(StateTransferInfo)up_prot.up(new Event(Event.GET_APPLSTATE));
byte[] state=rsp.state;
if(stats) {
num_state_reqs.increment();
if(state != null)
num_bytes_sent.add(state.length);
avg_state_size=num_bytes_sent.doubleValue() / num_state_reqs.doubleValue();
}
Message state_rsp=new Message(requester, state).putHeader(this.id, new StateHeader(StateHeader.STATE_RSP, digest));
log.trace("%s: sending state to %s (size=%s)", local_addr, state_rsp.getDest(), Util.printBytes(state != null? state.length : 0));
down_prot.down(state_rsp);
}
代码示例来源:origin: wildfly/wildfly
protected void broadcastUnuspectMessage(Address mbr) {
if(mbr == null) return;
log.debug("%s: broadcasting unsuspect(%s)", local_addr, mbr);
// 1. Send a SUSPECT message right away; the broadcast task will take some time to send it (sleeps first)
FdHeader hdr=new FdHeader(FdHeader.UNSUSPECT).mbrs(Collections.singleton(mbr));
Message suspect_msg=new Message().setFlag(Message.Flag.INTERNAL).putHeader(this.id, hdr);
down_prot.down(suspect_msg);
}
代码示例来源:origin: wildfly/wildfly
Address sender=msg.getSrc();
FragmentationTable frag_table=fragment_list.get(sender);
if(frag_table == null) {
byte[] buf=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg.getBuffer());
if(buf == null)
return null;
Message assembled_msg=new Message(false);
assembled_msg.readFrom(in);
assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
if(log.isTraceEnabled()) log.trace("assembled_msg is " + assembled_msg);
num_received_msgs++;
return assembled_msg;
log.error(Util.getMessage("FailedUnfragmentingAMessage"), e);
return null;
内容来源于网络,如有侵权,请联系作者删除!