An important framework for computing with large data sets is MapReduce. MapReduce involves splitting your data into chunks, using parallel processing to compute on each chunk individually, then combining the results for each chunk to something revealing about the dataset as a whole. See the diagram below.
In this notebook, we'll create our own implementation of MapReduce, and use this framework to search wikipedia documents.
Overall we were able to:
The dataset used in this notebook is a folder containing raw .html for 1,000 wikipedia pages. This data can be obtained directly from wikipedia, or using a basic scraper to download the pages.
Using the builtin os package, we can list the files the the wiki directory, which is where the raw .html files are stored.
import os
file_names = os.listdir('wiki')
Let's check how many files there are.
print(f"The number of files in the wiki folder is {len(file_names)}")
The number of files in the wiki folder is 999
To get an idea of each document, we can look at the raw html.
with open(os.path.join('wiki/',file_names[0])) as file:
lines = [line for line in file.readlines()]
['<html class="client-nojs" lang="en" dir="ltr">\n', '<head>\n', '<meta charset="UTF-8"/>\n', '<title>Pictogram - Wikipedia</title>\n']
Before looking any more at the dataset, let's implement MapReduce.
First, let's make a function that splits our dataset into a list of batches.
import math
def make_batches(data, num_batches):
batch_size = math.ceil(len(data) / num_batches)
return [data[i:i+batch_size] for i in range(0, len(data), batch_size)]
Now, we're ready to implement map_reduce. Here are some notes about the implementation:
import functools
from multiprocessing import Pool
def map_reduce(data, num_processes, mapper, reducer):
batches = make_batches(data, num_processes) # make batches
pool = Pool(num_processes) # use parallel processing
batch_results =, batches) # map
return functools.reduce(reducer, batch_results) # reduce
Before implementing a document search, let's use MapReduce to count the total number of lines across all documents.
# map function: counts the number of lines in a chunk of filenames
def line_count_mapper(file_name_chunk):
total = 0
for file_name in file_name_chunk:
with open(os.path.join('wiki/',file_name)) as file:
lines = [line for line in file.readlines()]
total += len(lines)
return total
# reduce function: adds the results for two batches together
def line_count_reducer(total1,total2):
return total1 + total2
line_number = map_reduce(file_names,8,line_count_mapper,line_count_reducer)
print(f"total number of lines: {line_number}")
total number of lines: 499797
Let's implement an exact match search using MapReduce. First, we define the mapper function, which receives a string and a batch of file names, and returns a dictionary mapping the filename to a list of lines where a match occurs.
def exact_match_mapper(string,file_name_chunk):
matches = {}
for file_name in file_name_chunk:
# open each file
with open(os.path.join('wiki/',file_name)) as file:
lines = [line for line in file.readlines()]
for i,line in enumerate(lines):
# whenever the given search string is in the line, add matching line to list
if string in line:
if file_name not in matches:
matches[file_name] = []
matches[file_name] += [i]
return matches
Now we can define the reduce function. This should take in two dictionaries with disjoint sets of keys, and return a single dictionary containing keys from both. This is accompished by the builtin Dictionary.update() method.
def exact_match_reducer(matches1,matches2):
return matches1
Now, let's build an exact match function. This should take a string, and search the document list for matches. We note that to use MapReduce with exact_match_mapper, we need to make this a function only of the dataset (i.e., uses a fixed string). We'll use functools.partial to take care of this.
def exact_match(string,num_cores = 8):
mapper = functools.partial(exact_match_mapper,string)
results = map_reduce(file_names,num_cores,mapper,exact_match_reducer)
return results
Let's search the document folder for matches of "data."
results_old = exact_match("data")
A more relevant matcher should be case insensitive - I.e., a search for "data" should also match "Data." This can be fixed easily by using the lower() string method.
def exact_match_mapper(string,file_name_chunk):
matches = {}
for file_name in file_name_chunk:
# open each file
with open(os.path.join('wiki/',file_name)) as file:
lines = [line.lower() for line in file.readlines()] # add in lower for case insensitive matching
for i,line in enumerate(lines):
# whenever the given search string is in the line, add matching line to list
if string in line:
if file_name not in matches:
matches[file_name] = []
matches[file_name] += [i]
return matches
def exact_match(string,num_cores = 8):
mapper = functools.partial(exact_match_mapper,string)
results = map_reduce(file_names,num_cores,mapper,exact_match_reducer)
return results
results_new = exact_match("data")
Let's count how many matches we added for each filename by using case insensitive matches.
for file in results_old:
if len(results_new[file]) > len(results_old[file]):
print("Filename: " + file + f". Number of new matches: {len(results_new[file]) - len(results_old[file])}")
Let's see an example of a new match to make sure everything is working properly.
(index,) = set(results_new['Pictogram.html']).difference(set(results_old['Pictogram.html'])) # subtract old matche indices from new matche indices
with open(os.path.join('wiki/','Pictogram.html')) as file:
lines = [line for line in file.readlines()]
'<li><a href="/wiki/Data_visualization" title="Data visualization">Data visualization</a></li>\n'
Now that this is working, let's add the index of the match in each line, to make our matches more specific. First, we need a helper function which returns the indices of all matches of a string in a line.
def find_matches(line,string):
indices = []
index = line.find(string) # find index of first match in line
while index != -1:
index = line.find(string,index + 1) # find index of match after last match
return indices
def exact_match_mapper(string,file_name_chunk):
matches = {}
for file_name in file_name_chunk:
# open each file
with open(os.path.join('wiki/',file_name)) as file:
lines = [line.lower() for line in file.readlines()] # case insensitive matching
for i,line in enumerate(lines):
# if a match is found,
if string in line:
if file_name not in matches:
matches[file_name] = []
# search line for all matches
indices = find_matches(line,string)
matches[file_name] += [(i,j) for j in indices]
return matches
def exact_match(string,num_cores = 8):
mapper = functools.partial(exact_match_mapper,string)
results = map_reduce(file_names,num_cores,mapper,exact_match_reducer)
return results
Let's test it out.
results = exact_match("data")
So far, it seems our function has the desired functionality. Let's save our results to a .csv file. OUr csv file will contain the filename of each match, the line where the match occured, the index where the match occured, as well as the surrounding characters of the match, which we call "Context."
import csv
data = [["File","Line","Index","Context"]]
for file_name in results:
with open(os.path.join('wiki/',file_name)) as file:
lines = [line for line in file.readlines()]
for (line,index) in results[file_name]:
line_length = len(lines[line])
# handle context for matches at beginning / end of lines
context_min = max(0,index-15)
context_max = min(line_length,index+15)
context = lines[line][context_min:context_max]
with open('results.csv',mode = 'w') as file:
writer = csv.writer(file)
Let's turn this code into a function which saves the results as a specified filename.
def write_results(results,name):
data = [["File","Line","Index","Context"]]
for file_name in results:
with open(os.path.join('wiki/',file_name)) as file:
lines = [line for line in file.readlines()]
for (line,index) in results[file_name]:
line_length = len(lines[line])
context_min = max(0,index-15)
context_max = min(line_length,index+15)
context = lines[line][context_min:context_max]
with open(name,mode = 'w') as file:
writer = csv.writer(file)
Let's add regex pattern matching into our matcher function.
import re
def match_mapper(pattern,file_name_chunk):
matches = {}
for file_name in file_name_chunk:
# open each file
with open(os.path.join('wiki/',file_name)) as file:
lines = [line for line in file.readlines()]
for i,line in enumerate(lines):
# find a set of lines matching
match_set = set(re.findall(pattern,line)) # set of strings which match the pattern in given line
if len(match_set) > 0:
if file_name not in matches:
matches[file_name] = [] # initialize list for new matches
indices = []
for match in match_set:
indices += find_matches(line,match) # create list of indices of all matches
matches[file_name] += [(i,j) for j in indices]
return matches
def matcher(pattern, num_cores = 8):
mapper = functools.partial(match_mapper,pattern)
results = map_reduce(file_names,num_cores,mapper,exact_match_reducer)
return results
Lets test it out by finding case insensitive matches using regex pattern (?i) to indicate case insensitivity.
results = matcher('(?i)dAta')
Now we can write the results to a csv, and load it in as DataFrame.
import pandas as pd
regex_results = pd.read_csv('regex_results.csv')
File | Line | Index | Context | |
0 | Pictogram.html | 44 | 102 | plainlinks metadata ambox ambo |
1 | Pictogram.html | 47 | 441 | ew.svg.png 2x" data-file-width |
2 | Pictogram.html | 47 | 463 | le-width="512" data-file-heigh |
3 | Pictogram.html | 57 | 406 | -Pismo.jpg 2x" data-file-width |
4 | Pictogram.html | 57 | 428 | le-width="340" data-file-heigh |
In conclusion, we were able to use the map reduce framework to implement a regex pattern matching search on files in a given directory. This was executed using a first principles implementation of MapReduce, and demonstrated on directory of raw html files from wikipedia.