Archives

Mining Twitter Data With Ruby, MongoDB and Map-Reduce

When is the best time to tweet? If you care about reaching a lot of users, the best time probably is when your followers are also tweeting. In this exercise,we will try to figure out the day and time users are the most active. Since there is no way for us to do this for all users in the twitterverse, we will only use the users we follow as our sample.

What do we need

  • mongodb
  • tweetstream gem
  • awesome_print gem for awesome printing of Ruby objects
  • oauth credentials

Visit http://dev.twitter.com to get your oauth credentials. You just need to login, create an app, and the oauth credentials you need will be there. Copy the oauth settings to the twitter.yml file because that is where our sample code will be looking.

Collect status updates

We use the Tweetstream gem to access the Twitter Streaming APIs which allows our program to receive updates as they occur without the need to regularly poll Twitter.

# Collects user tweets and saves them to a mongodb
require 'bundler'
require File.dirname(__FILE__) + '/tweetminer'

Bundler.require

# We use the TweetStream gem to access Twitter's Streaming API
# https://github.com/intridea/tweetstream

TweetStream.configure do |config|
  settings = YAML.load_file File.dirname(__FILE__) + '/twitter.yml'

  config.consumer_key       = settings['consumer_key']
  config.consumer_secret    = settings['consumer_secret']
  config.oauth_token        = settings['oauth_token']
  config.oauth_token_secret = settings['oauth_token_secret']
end

settings = YAML.load_file File.dirname(__FILE__) + '/mongo.yml'
miner = TweetMiner.new(settings)stream = TweetStream::Client.new

stream.on_error do |msg|
  puts msg
end

stream.on_timeline_status do |status|
  miner.insert_status status
  print '.'
end

# Do not forget this to trigger the collection of tweets
stream.userstream

The code above handles the collection of status updates. The actual saving to mongodb is handled by the TweetMiner module.

# tweetminer.rb

require 'mongo'

class TweetMiner
  attr_writer :db_connector
  attr_reader :options

  def initialize(options)
    @options = options
  end

  def db
    @db ||= connect_to_db
  end

  def insert_status(status)
    statuses.insert status
  end

  def statuses
    @statuses ||= db['statuses']
  end

  private

  def connect_to_db
    db_connector.call(options['host'], options['port']).db(options['database'])
  end

  def db_connector
    @db_connector ||= Mongo::Connection.public_method :new
  end

 end

We will be modifying our code along the way and if you want follow each step, you can view this commit at github.

Depending on how active the people you follow, it may take a while before you get a good sample of tweets. Actually, it would be interesting if you could run the collection for several days.

Assuming we have several days' worth of data, let us proceed with the “data mining” part. Data mining would not be fun without a mention of map reduce - a strategy for data mining popularized by Google. The key innovation with map reduce is its ability to take a query over a data set, divide it, and run it in parallel over many nodes. “Counting”, for example, is a task that fits nicely with the map reduce framework. Imagine you and your friends are counting the number of people in a football stadium. First, you divide yourselves into 2 groups - group A counts the people in the lower deck while group B does the upper deck. Group A in turn divides the task into north, south, and endzones. When group A is done counting, they tally all their results. After group B is done, they combine the results with group A for which the total gives us the number of people in the stadium. Dividing your friends is the “map” part while the tallying of results is the “reduce” part.

Updates per user

First, let us do a simple task. Let us count the number of updates per user. We introduce a new module ‘StatusCounter’ which we include in our TweetMiner module. We also add a new program to execute the map reduce task.

# counter.rb

require 'bundler'
Bundler.require
require File.dirname(__FILE__) + '/tweetminer'
settings = YAML.load_file File.dirname(__FILE__) + '/mongo.yml'

miner = TweetMiner.new(settings)

results = miner.status_count_by_user
ap results

Map reduce commands in mongodb are written in Javascript. When writing Javascript, just be conscious about string interpolation because Ruby sees it as a bunch of characters and nothing else. For the example below, we use the here document which interprets backslashes. In our later examples, we switch to single quotes when we use regular expressions within our Javascript.

module StatusCounter
  class UserCounter
    def map_command
      <<-EOS
        function() {
          emit(this.user.id_str, 1);
        }
      EOS
    end

    def reduce_command
      <<-EOS
        function(key, values) {
          var count = 0;
          for(i in values) {
            count += values[i]
          }

          return count;
        }
      EOS
    end
  end

  def status_count_by_user
    counter = UserCounter.new
    statuses.map_reduce(counter.map_command, counter.reduce_command, default_mr_options)
  end

  def default_mr_options
    {:out => {:inline => 1}, :raw => true }
  end
 end

Follow this commit to view the changes from our previous examples.

When you run ‘ruby counter.rb’, you should see a similar screenshot as the one below:

Tweets per Hour

Now, let’s do something a little bit harder than the previous example. This time, we want to know how many tweets are posted per hour. Every tweet has a created_at field of type String. We then use a regular expression to extract the hour component.

created_at:  'Tue Sep 04 22:04:40 +0000 2012'
regex:  (\d{2,2}):\d{2,2}:\d{2,2}
match: 22

The only significant change is the addition of a new map command. Note the reduce command did not change from the previous example. See the commit.

class HourOfDayCounter
  def map_command
    'function() {
      var re = /(\d{2,2}):\d{2,2}:\d{2,2}/;
      var hour = re.exec(this.created_at)[1];

      emit(hour, 1);
    }'
  end

  def reduce_command
    &lt;&lt;-EOS
      function(key, values) {
        var count = 0;

        for(i in values) {
          count += values[i]
        }

        return count;
      }
    EOS
  end

end

def status_count_by_hday
  counter = HourOfDayCounter.new
  statuses.map_reduce(counter.map_command, counter.reduce_command, default_mr_options)
end

Now run ‘ruby counter.rb’ in the console with the new method and the result should be something like the one below.

Filtering records

Our examples so far include every status since the beginning of time, which is pretty much useless. What we want is to apply the counting tasks to statuses posted the past 7 days, for example. MongoDB allows you to pass a query to your map-reduce so you can filter the data where the map-reduce is applied. One problem though: created_at field is a string. To get around this, we introduce a new field created_at_dt which is of type Date. You can hook it up in the insert_status method but since we already have our data, we instead run a query (using MongoDB console) to update our records. Please note the collection we are using is statuses and the new field is created_at_dt.

var cursor = db.statuses.find({ created_at_dt: { $exists: false } });
while (cursor.hasNext()) {
  var doc = cursor.next();
  db.statuses.update({ _id : doc._id }, { $set : { created_at_dt : new Date(doc.created_at) } } )
}

Now, that we have a Date field, let’s modify our method to include a days_ago parameter and a query in our map reduce.

def status_count_by_hday(days_ago = 7)
  date     = Date.today - days_ago
  days_ago = Time.utc(date.year, date.month, date.day)
  query = { 'created_at_dt' => { '$gte' => days_ago } }

  options = default_mr_options.merge(:query => query)

  counter = HourOfDayCounter.new
  statuses.map_reduce(counter.map_command, counter.reduce_command, options)
end

Since we’re now getting the hang of it, why don’t we add another complexity. This time, let us count by day of the week and include a breakdown per hour. Luckily for us, the day of the week is also included in the created_at field and it is just a matter of extracting it. Of course, if Twitter decides to change the format, this will break. Let’s visit rubular.com and try our regular expression.

Now that we have our regex working, let’s include this in our new map command.

def map_command
  'function() {
    var re = /(^\w{3,3}).+(\d{2,2}):\d{2,2}:\d{2,2}/;
    var matches = re.exec(this.created_at);

    var wday = matches[1],
        hday = matches[2];

    emit(wday, { count: 1, hdayBreakdown: [{ hday: hday, count: 1 }] });
  }'
end

Note the difference in the emit function from our previous examples. Before, we only emit a single numeric value that is why our reduce command is simple array loop. This time, our reduce command requires more work.

def reduce_command
  'function(key, values) {
     var total = 0,
         hdays = {},
         hdayBreakdown;

     for(i in values) {
       total += values[i].count

       hdayBreakdown = values[i].hdayBreakdown;

       for(j in hdayBreakdown) {
         hday  = hdayBreakdown[j].hday;
         count = hdayBreakdown[j].count;

         if( hdays[hday] == undefined ) {
           hdays[hday] = count;
         } else {
           hdays[hday] += count;
         }
       }
     }

     hdayBreakdown = [];
     for(k in hdays) {
       hdayBreakdown.push({ hday: k, count: hdays[k] })
     }

     return { count: total, hdayBreakdown: hdayBreakdown }
   }'
end

In our previous examples, the values parameter is a simple array of numeric values. Now, it becomes an an array of properties. On top of that, one of the properties (i.e. hdayBreakdown) is also an array. If everything works according to plan, you should see something like the image below when you run collect.rb.

Did you have fun? I hope so :)