Mining Twitter Data With Ruby, MongoDB and Map-Reduce
When is the best time to tweet? If you care about reaching a lot of users, the best time probably is when your followers are also tweeting. In this exercise,we will try to figure out the day and time users are the most active. Since there is no way for us to do this for all users in the twitterverse, we will only use the users we follow as our sample.
What do we need
- mongodb
- tweetstream gem
- awesome_print gem for awesome printing of Ruby objects
- oauth credentials
Visit http://dev.twitter.com to get your oauth credentials. You just need to login, create an app, and the oauth credentials you need will be there. Copy the oauth settings to the twitter.yml file because that is where our sample code will be looking.
Collect status updates
We use the Tweetstream gem to access the Twitter Streaming APIs which allows our program to receive updates as they occur without the need to regularly poll Twitter.
# Collects user tweets and saves them to a mongodb
require 'bundler'
require File.dirname(__FILE__) + '/tweetminer'
Bundler.require
# We use the TweetStream gem to access Twitter's Streaming API
# https://github.com/intridea/tweetstream
TweetStream.configure do |config|
settings = YAML.load_file File.dirname(__FILE__) + '/twitter.yml'
config.consumer_key = settings['consumer_key']
config.consumer_secret = settings['consumer_secret']
config.oauth_token = settings['oauth_token']
config.oauth_token_secret = settings['oauth_token_secret']
end
settings = YAML.load_file File.dirname(__FILE__) + '/mongo.yml'
miner = TweetMiner.new(settings)stream = TweetStream::Client.new
stream.on_error do |msg|
puts msg
end
stream.on_timeline_status do |status|
miner.insert_status status
print '.'
end
# Do not forget this to trigger the collection of tweets
stream.userstream
The code above handles the collection of status updates. The actual saving to mongodb is handled by the TweetMiner module.
# tweetminer.rb
require 'mongo'
class TweetMiner
attr_writer :db_connector
attr_reader :options
def initialize(options)
@options = options
end
def db
@db ||= connect_to_db
end
def insert_status(status)
statuses.insert status
end
def statuses
@statuses ||= db['statuses']
end
private
def connect_to_db
db_connector.call(options['host'], options['port']).db(options['database'])
end
def db_connector
@db_connector ||= Mongo::Connection.public_method :new
end
end
We will be modifying our code along the way and if you want follow each step, you can view this commit at github.
Depending on how active the people you follow, it may take a while before you get a good sample of tweets. Actually, it would be interesting if you could run the collection for several days.
Assuming we have several days' worth of data, let us proceed with the “data mining” part. Data mining would not be fun without a mention of map reduce - a strategy for data mining popularized by Google. The key innovation with map reduce is its ability to take a query over a data set, divide it, and run it in parallel over many nodes. “Counting”, for example, is a task that fits nicely with the map reduce framework. Imagine you and your friends are counting the number of people in a football stadium. First, you divide yourselves into 2 groups - group A counts the people in the lower deck while group B does the upper deck. Group A in turn divides the task into north, south, and endzones. When group A is done counting, they tally all their results. After group B is done, they combine the results with group A for which the total gives us the number of people in the stadium. Dividing your friends is the “map” part while the tallying of results is the “reduce” part.
Updates per user
First, let us do a simple task. Let us count the number of updates per user. We introduce a new module ‘StatusCounter’ which we include in our TweetMiner module. We also add a new program to execute the map reduce task.
# counter.rb
require 'bundler'
Bundler.require
require File.dirname(__FILE__) + '/tweetminer'
settings = YAML.load_file File.dirname(__FILE__) + '/mongo.yml'
miner = TweetMiner.new(settings)
results = miner.status_count_by_user
ap results
Map reduce commands in mongodb are written in Javascript. When writing Javascript, just be conscious about string interpolation because Ruby sees it as a bunch of characters and nothing else. For the example below, we use the here document which interprets backslashes. In our later examples, we switch to single quotes when we use regular expressions within our Javascript.
module StatusCounter
class UserCounter
def map_command
<<-EOS
function() {
emit(this.user.id_str, 1);
}
EOS
end
def reduce_command
<<-EOS
function(key, values) {
var count = 0;
for(i in values) {
count += values[i]
}
return count;
}
EOS
end
end
def status_count_by_user
counter = UserCounter.new
statuses.map_reduce(counter.map_command, counter.reduce_command, default_mr_options)
end
def default_mr_options
{:out => {:inline => 1}, :raw => true }
end
end
Follow this commit to view the changes from our previous examples.
When you run ‘ruby counter.rb’, you should see a similar screenshot as the one below:
Tweets per Hour
Now, let’s do something a little bit harder than the previous example. This time, we want to know how many tweets are posted per hour. Every tweet has a created_at field of type String. We then use a regular expression to extract the hour component.
created_at: 'Tue Sep 04 22:04:40 +0000 2012'
regex: (\d{2,2}):\d{2,2}:\d{2,2}
match: 22
The only significant change is the addition of a new map command. Note the reduce command did not change from the previous example. See the commit.
class HourOfDayCounter
def map_command
'function() {
var re = /(\d{2,2}):\d{2,2}:\d{2,2}/;
var hour = re.exec(this.created_at)[1];
emit(hour, 1);
}'
end
def reduce_command
<<-EOS
function(key, values) {
var count = 0;
for(i in values) {
count += values[i]
}
return count;
}
EOS
end
end
def status_count_by_hday
counter = HourOfDayCounter.new
statuses.map_reduce(counter.map_command, counter.reduce_command, default_mr_options)
end
Now run ‘ruby counter.rb’ in the console with the new method and the result should be something like the one below.
Filtering records
Our examples so far include every status since the beginning of time, which is pretty much useless. What we want is to apply the counting tasks to statuses posted the past 7 days, for example. MongoDB allows you to pass a query to your map-reduce so you can filter the data where the map-reduce is applied. One problem though: created_at field is a string. To get around this, we introduce a new field created_at_dt which is of type Date. You can hook it up in the insert_status method but since we already have our data, we instead run a query (using MongoDB console) to update our records. Please note the collection we are using is statuses and the new field is created_at_dt.
var cursor = db.statuses.find({ created_at_dt: { $exists: false } });
while (cursor.hasNext()) {
var doc = cursor.next();
db.statuses.update({ _id : doc._id }, { $set : { created_at_dt : new Date(doc.created_at) } } )
}
Now, that we have a Date field, let’s modify our method to include a days_ago parameter and a query in our map reduce.
def status_count_by_hday(days_ago = 7)
date = Date.today - days_ago
days_ago = Time.utc(date.year, date.month, date.day)
query = { 'created_at_dt' => { '$gte' => days_ago } }
options = default_mr_options.merge(:query => query)
counter = HourOfDayCounter.new
statuses.map_reduce(counter.map_command, counter.reduce_command, options)
end
Since we’re now getting the hang of it, why don’t we add another complexity. This time, let us count by day of the week and include a breakdown per hour. Luckily for us, the day of the week is also included in the created_at field and it is just a matter of extracting it. Of course, if Twitter decides to change the format, this will break. Let’s visit rubular.com and try our regular expression.
Now that we have our regex working, let’s include this in our new map command.
def map_command
'function() {
var re = /(^\w{3,3}).+(\d{2,2}):\d{2,2}:\d{2,2}/;
var matches = re.exec(this.created_at);
var wday = matches[1],
hday = matches[2];
emit(wday, { count: 1, hdayBreakdown: [{ hday: hday, count: 1 }] });
}'
end
Note the difference in the emit function from our previous examples. Before, we only emit a single numeric value that is why our reduce command is simple array loop. This time, our reduce command requires more work.
def reduce_command
'function(key, values) {
var total = 0,
hdays = {},
hdayBreakdown;
for(i in values) {
total += values[i].count
hdayBreakdown = values[i].hdayBreakdown;
for(j in hdayBreakdown) {
hday = hdayBreakdown[j].hday;
count = hdayBreakdown[j].count;
if( hdays[hday] == undefined ) {
hdays[hday] = count;
} else {
hdays[hday] += count;
}
}
}
hdayBreakdown = [];
for(k in hdays) {
hdayBreakdown.push({ hday: k, count: hdays[k] })
}
return { count: total, hdayBreakdown: hdayBreakdown }
}'
end
In our previous examples, the values parameter is a simple array of numeric values. Now, it becomes an an array of properties. On top of that, one of the properties (i.e. hdayBreakdown) is also an array. If everything works according to plan, you should see something like the image below when you run collect.rb.
Did you have fun? I hope so :)