日期:2014-05-19  浏览次数:20821 次

我用 java写的 akka zeromq 看不到效果,pubSocket发送的message subSocket 不能够收到,请高手解答一下。
我用 java写的 akka zeromq 看不到效果,pubSocket发送的message subSocket 不能够收到,请高手解答一下。
代码如下

package com.hantek.akka.zeromq;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.text.SimpleDateFormat;
import java.util.Date;

import scala.Serializable;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.util.Duration;
import akka.zeromq.Bind;
import akka.zeromq.Connect;
import akka.zeromq.Frame;
import akka.zeromq.Listener;
import akka.zeromq.Subscribe;
import akka.zeromq.ZMQMessage;
import akka.zeromq.ZeroMQ;
import akka.zeromq.ZeroMQExtension;

public class TestZeroMQ1 {

public static final Object TICK = "TICK";

public static class Heap
implements Serializable {
/**

*/
private static final long serialVersionUID = 2807062186812700532L;

public final long timestamp;

public final long used;

public final long max;

public Heap(long timestamp, long used, long max){
this.timestamp = timestamp;
this.used = used;
this.max = max;
}
}

public static class Load
implements Serializable {
/**

*/
private static final long serialVersionUID = 3321800432516503175L;

public final long timestamp;

public final double loadAverage;

public Load(long timestamp, double loadAverage) {
this.timestamp = timestamp;
this.loadAverage = loadAverage;
}
}

public static class HealthProbe
extends UntypedActor {

ActorRef pubSocket = ZeroMQExtension.get(getContext().system()).newPubSocket(new Bind("tcp://127.0.0.1:1237"));

MemoryMXBean memory = ManagementFactory.getMemoryMXBean();

OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();

Serialization ser = SerializationExtension.get(getContext().system());
int i=0;
@Override
public void preStart() {
System.out.println("_____pubSocket____开始");
getContext().system().scheduler().schedule(Duration.parse("1 second"),
Duration.parse("1 second"), getSelf(), TICK);
}

@Override
public void postRestart(Throwable reason) {
// don't call preStart, only schedule once
}

@Override
public void onReceive(Object message) {
if (message.equals(TICK)) {
i++;
if(i==10){
System.exit(1);
}
MemoryUsage currentHeap = memory.getHeapMemoryUsage();
long timestamp = System.currentTimeMillis();

byte[] heapPayload = ser.serializerFor(Heap.class).toBinary(
new Heap(timestamp, currentHeap.getUsed(), currentHeap.getMax()));
pubSocket.tell(new ZMQMessage(new Frame("health.heap"),new Frame(heapPayload)));
byte[] loadPayload = ser.serializerFor(Load.class).toBinary(
new Load(timestamp, os.getSystemLoadAverage()));
pubSocket.tell(new ZMQMessage(new Frame("health.load"),new Frame(loadPayload)));
//
System.out.println((i)+"_____pubSocket__TIKE__onReceive :"+message);
}
else {
unhandled(message);
System.out.println("_____pubSocket__UNHAND :"+me