Wednesday, 2 July 2014

A Basic Twitter Message Queue Service using ActiveMQ and WebSocket

I've been planning to write this for a long time, since I got so fascinated by the simplicity and elegance of message queue, and the performance boost it could bring to various systems. So here we are.

TL;DR

If you just want to see an example, or make sure you get the right dependencies: boom, here you go!

What is Message Queue?

Message Queue is the storage area of a mechanism, which allows distributed applications to communicate asynchronously by sending messages between the applications

Why Message Queue?

I think you can probably google a dozen reasons why you should do it. Speaking from my experience working in various integration projects, message queue, is the pursue for high performance, high scalability, high resilience and low coupling, while accomplishing asynchronous communication, buffering and filtering at the same time.

What Choices Do I Have?

Rather than trying to implement you own message queueing, here is some most notable MQ  implementations: ActiveMQ, RabbitMQ and ZeroMQ. 

Blah Blah Blah, Give Me an Example...

The example I put together is a middle layer between twitter API and our applications, using ActiveMQ and WebSocket to implement the messaging. 

Its purpose is to make sure we are able to deliver tweets across different applications in a consistent and timely fashion. While to be precise, it is essentially to ensure we have a mechanism to filter out some pottery mouthes, control and monitor traffics to our applications.

1. Install ActiveMQ

Use brew install, then type in command 'activemq start'. Job done.

To verify the service is actually running, type in command 'netstat -an | grep 61616', or browse to 'http://localhost:8161'

As basic usage, we are only interested in 'Queues', where the number of consumers, message queued and dequeued are displayed in the dashboard.

2. Java 8

Java in Mac is a pain in the neck, to make sure you can use Java 8, go to oracle, download the package and install it.

Slightly trickier to set the configure the Mac. In terminal, type in 

'cd /System/Library/Frameworks/JavaVM.framework/Versions/' 

which should bring you to a list of JDK available on your Mac.

Remove 'CurrentJDK' and symlink it to the version you want. Using the command: 

'rm CurrentJDK'

'ln -s cd /Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/ CurrentJDK'

3. Producer and Consumer

The whole message queueing fits in producer-consumer pattern perfectly. On one hand, the producer will stock the new tweets into our container, ActiveMQ. On the other, the consumer will fetch the messages from the container and process them.

The producer in the simplest form:

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; // Configure to use localhost
    private static String subject = "TWEETQUEUE"; // This is the name of the queue you will see in ActiveMQ dashboard

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory =
            new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(subject);
        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("A new message");
        producer.send(message);

        connection.close();
    }
}

The consumer in its simplest form:

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static String subject = "TWEETQUEUE";

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory
            = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(subject);
        MessageConsumer consumer = session.createConsumer(destination);
        Message message = consumer.receive();

        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("Message: "
                + textMessage.getText());
        }

        connection.close();
    }
}

4. Twitter Integration

To make devs' life easier, Twitter provides a nice HTTP client, called hbc. The Twitter API is an authorisation-based, which means you either provide your account and password in the configuration, or create a Twitter App, then use the token provided. I'd strongly recommend the later, because I can't get the account and password working, right? Surely I won't tell you even if that's true :) Google 'Why OAuth' if you are wondering why.

The exact example I referenced can be found in hbc, yet I blended in the Producer in this case, so that we are actually injecting tweets into ActiveMQ.

import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Tweet {
private static final String consumerKey = ""; 
private static final String consumerSecret = ""; 
private static final String token = ""; 
private static final String secret = "";
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "TWEETQUEUE";
    public static void main(String[] args) throws InterruptedException, JMSException {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(subject);
        MessageProducer producer = session.createProducer(destination);
        
    BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
        
        endpoint.followings(Lists.newArrayList(871686942L)); // @BBCOne
        
        Authentication auth = new OAuth1(consumerKey, consumerSecret, token, secret);
        
        BasicClient client = new ClientBuilder()
        .name("sampleExampleClient")
        .hosts(Constants.STREAM_HOST)
        .endpoint(endpoint)
        .authentication(auth)
        .processor(new StringDelimitedProcessor(queue))
        .build();
        
        client.connect();
        
        for (int msgRead = 0; msgRead < 1000; msgRead++) {
        if (client.isDone()) {
                System.out.println("Client connection closed unexpectedly: " + client.getExitEvent().getMessage());
                break;
        }
       
        String msg = queue.poll(5, TimeUnit.SECONDS);
        if (msg == null) {
                System.out.println("Did not receive a message in 5 seconds");
            } else {
                System.out.println(msg);
                TextMessage message = session.createTextMessage(msg);
                producer.send(message);
            }
        }
        
        client.stop();
    }
}

5. Send via WebSocket

Once the queue started filling with junks, errr, tweets. We need to find a way to let the messages out. There are quite a number of broadcasting mechanisms we can use, but bearing speed and performance in mind in the case of leaving network connection open and message wrapping, WebSocket seems to be a very trendy and fashionable choice :)

WARNING: if you DO need to deal with ancient browsers (prior 2011, like I am kidding, right), make sure WebSocket is supported!

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
import org.glassfish.tyrus.server.Server;
import org.json.JSONObject;

@ServerEndpoint(value = "/tweets")
public class TweetFeedServer {
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static String subject = "TWEETQUEUE";
    private Logger logger = Logger.getLogger(this.getClass().getName());
    
    @OnOpen
    public void onOpen(Session session) {
        logger.info("Connected ... " + session.getId());
    }

    @OnMessage
    public void onMessage(String message, Session session) throws Exception {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        javax.jms.Session jmsSession = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
        Destination destination = jmsSession.createQueue(subject);
        MessageConsumer consumer = jmsSession.createConsumer(destination);

        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);

        for (int msgRead = 0; msgRead < 1000; msgRead++) {
            String msg = queue.poll(5, TimeUnit.SECONDS);

            Message jmsMessage = consumer.receive();
            if (jmsMessage instanceof TextMessage){
                TextMessage textMessage = (TextMessage) jmsMessage;

                try {
                    JSONObject receivedMessage = new JSONObject(textMessage.getText());
                    JSONObject processedMessage = new JSONObject();

                    System.out.println("Processed message: " + receivedMessage);

                    processedMessage.put("name", receivedMessage.getJSONObject("user").getString("name"));
                    processedMessage.put("icon", receivedMessage.getJSONObject("user").getString("profile_image_url"));
                    processedMessage.put("message", receivedMessage.getString("text"));

                    System.out.println("Processed message: " + processedMessage);
                    session.getBasicRemote().sendObject(processedMessage);
                } catch(Exception e) {
                    System.out.println(e.getMessage());
                }
            }

        }

        connection.close();
    }

    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        logger.info(String.format("Session %s closed because of %s", session.getId(), closeReason));
    }
    
    // For testing only
    public static void main(String[] args) {
        Server server = new Server("localhost", 8025, "/websockets", TweetFeedServer.class);

        try {
            server.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            System.out.print("Please press a key to stop the server.");
            reader.readLine();
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            server.stop();
        }
    }
}

Conclusion

The setup of ActiveMQ and Twitter Integration are pretty easily done, yet finding the way to produce and consume messages took a while to try, also deciding what and how to use implement the broadcasting part is quite fun.

The truly potential of Message Queue is surely more than this, the live async update, fast and lightweight queueing provide a limitless space for big scale systems integration.

I hope this blog does give you some ideas to think about.

PS: the all-in-one solution can be found on my GitHub.

No comments:

Post a Comment