WikipediaのデータをLuceneのindexに入れるコード

以前書いたけどいつもjavaXMLライブラリの使い方とか忘れるので備忘録用に上げておく

import java.io.File;

import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;


public class CreateIndex {
  static void addXMLFile(String file) throws Exception{
    SAXParserFactory spf = SAXParserFactory.newInstance();
    SAXParser parser = spf.newSAXParser();    
    File wikiFile = new File(file);
    if(!wikiFile.exists()){
      System.err.println(wikiFile +" not exist");
      return ;
    }
    Directory dir = FSDirectory.open(new File("enwiki-index"));
    Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_35);
    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, analyzer);
    IndexWriter writer = new IndexWriter(dir , conf);
    WikiDataHandler wph = new WikiDataHandler(writer);
    parser.parse(wikiFile, 
        new WikiXMLHandler(wph));
    System.out.println("add " + wph.adddoc +" document");
    try{
      writer.close();      
    }catch (OutOfMemoryError e) {
      writer.close();
    }
  }
  public static void main(String[] args) throws Exception{
    addXMLFile(args[0]);
  }
}
import org.xml.sax.Attributes;

import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;



public class WikiXMLHandler extends DefaultHandler{
  boolean isTitle;
  boolean isText;
  StringBuilder titleBuffer;
  StringBuilder textBuffer;
  WikiDataHandler wphandler;
  public WikiXMLHandler(WikiDataHandler h) {
    isTitle = false;
    isText = false;    
    titleBuffer = new StringBuilder();
    textBuffer = new StringBuilder();
    wphandler = h;
  }

  @Override
  public void characters(char[] ch, int start, int length) throws SAXException {
    String s = new String(ch ,start , length);
    if(isTitle){
      titleBuffer.append(s);
    }else if(isText){
      textBuffer.append(s);
    }
  }
  
  @Override
  public void startElement(String uri, String localName, String qName,
      Attributes attributes) throws SAXException {
    if(qName.equals("title")){
      isTitle = true;
      titleBuffer = new StringBuilder();
    }else if(qName.equals("text")){
      isText = true;
      textBuffer = new StringBuilder();
    }
  }
  
  @Override
  public void endElement(String uri, String localName, String qName)
      throws SAXException {
    if(qName.equals("title")){
      isTitle = false;
    }else if(qName.equals("text")){
      isText = false;
      String title = titleBuffer.toString();
      String text = textBuffer.toString();
      wphandler.handle(title, text);
    }
  }
}
import org.apache.lucene.document.Document;

import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.IndexWriter;


public class WikiDataHandler {
  int adddoc = 0;
  IndexWriter writer;
  public WikiDataHandler(IndexWriter writer) {
    this.writer = writer;
  }
  private boolean isNotMainSpaces(String title){
    String[] prefixes = {
        "Category:",
        "File:",
        "Template:",
        "Talk:",
        "Wikipedia:",
        "User:",
        "User talk:",
        "Help:",
        "Special:",
        "MediaWiki:"
    };
    for(String prefix : prefixes){
      if(title.startsWith(prefix))return true;
    }
    return false;
  }
  private boolean isRedirectPage(String text){
    return text.toLowerCase().startsWith("#redirect");
  }
  public void handle(String title , String text){
    if(isNotMainSpaces(title))return ;
    if(isRedirectPage(text))return ;
    adddoc++;
    Document doc = new Document();
    doc.add(new Field("title", title, Store.YES, Index.NO));
    doc.add(new Field("content", text, Store.NO, Index.ANALYZED));
    try{
      writer.addDocument(doc);      
    }catch (Exception e) {
      e.printStackTrace();
    }
  }
}

wat-arrayを使った2次元探索プログラム

岡野原氏の作成したwavelet木を使った高速配列処理ライブラリwat-arrayを利用して、2次元探索のプログラムを書いてみた。
なお、自分はwavelet木のアルゴリズムについては全く分かってないですが、wat-arrayでは配列に対して、操作を行うインターフェイスがしっかり与えられているのでそれを見ながら作りました。

問題定義

2次元座標の集合P={(x,y)}が与えられる。Queryとして(xs,xe,ys,ye)が与えられたときにPの中でxs <= x < x, ys <= y < yeを満たす点の数を答えるというものを考える。

なお、Pの内容は途中で変化したりすることはないものとする(変更が加わった場合は一から作り直す)。

インターフェースとしては次のようになる、2次元座標の表現にはpairを用いることにする。

namespace wat2DSearch{
  typedef std::pair<uint64_t , uint64_t> Point;
  class Wat2DSearch{
    public:
      Wat2DSearch(const std::vector<Point> & P);
      //Compute the frequency of points where xs <= x < xe and ys <= y < ye
      uint64_t FreqRange2D(uint64_t xs , uint64_t xe , uint64_t ys , uint64_t ye);
  };
}

使い方の例

int main(){
  vector<wat2DSearch::Point> P;
  P.push_back(make_pair(1,2));
  P.push_back(make_pair(2,1));
  P.push_back(make_pair(2,4));
  P.push_back(make_pair(4,2));
  wat2DSearch::Wat2DSearch wat(P);
  cout << wat.FreqRange2D(0 , 5 , 0 , 5) << endl;
  cout << wat.FreqRange2D(2 , 3 , 2 , 4) << endl;
  cout << wat.FreqRange2D(2 , 3 , 2 , 5) << endl;
  return 0;
}

以降でどのようにしてこのFreqRange2Dを実現するかについて述べる。

アルゴリズム

Pの値をxの昇順でソートする。たとえばP={(2,1),(4,2),(1,3),(2,4)}のときソート後は

index 0 1 2 3
座標 (1,3) (2,1) (2,3) (4,2)

となる。これをx座標とy座標に分解すると

index 0 1 2 3
x座標 1 2 2 4
y座標 3 1 4 2

となる。

たとえばxs=2,xe=3のとき、x座標が昇順に並んでいることからy座標の配列のうちindexが[1ー3)までの範囲に関してys <= y < yeを満たすものの数を求めることと同じであることが分かる。
与えられた部分配列の中で指定されたRangeに収まるような要素の数を求めるのはwatArray::FreqRangeを用いて求めることができる。
また、与えられたxs,xeに対してindexを求める部分もxが昇順に並んでいることに着目するとxsに対応するindexはx座標の配列の中でxsより小さいものの数であることが分かり、
これはwatArray::FreqSumを用いて簡単に求まる。xeについても同様である。

プログラム

以上を踏まえて作ったのが以下のコードである、watArrayを使うことにより簡潔に書けていることが分かる。

ヘッダ

#include <wat_array/wat_array.hpp>
#include <utility>
#include <vector>
namespace wat2DSearch{
  typedef std::pair<uint64_t , uint64_t> Point;
  class Wat2DSearch{
    public:
      Wat2DSearch(const std::vector<Point> & P);
      uint64_t FreqRange2D(uint64_t xs , uint64_t xe , uint64_t ys , uint64_t ye);
    private:
      wat_array::WatArray xwat;
      wat_array::WatArray ywat;
  };
}

実装部分(2011/1/11 FreqRangeの境界条件での仕様に対応)

#include "Wat2DSearch.hpp"
#include <algorithm>
using namespace std;
namespace wat2DSearch{
  Wat2DSearch::Wat2DSearch(const vector<Point> & P): xwat(), ywat(){
    vector<Point> tmp(P);
    sort(tmp.begin() , tmp.end());
    vector<uint64_t> xArray , yArray;
    for(vector<Point>::iterator it = tmp.begin() ; it != tmp.end() ; ++it){
      xArray.push_back(it->first);
      yArray.push_back(it->second);
    }
    xwat.Init(xArray);
    ywat.Init(yArray);
  }
  uint64_t Wat2DSearch::FreqRange2D(uint64_t xs , uint64_t xe , uint64_t ys , uint64_t ye){
    if(xs >= xwat.alphabet_num() || ys >= ywat.alphabet_num())return 0;
    if(xs >= xe || ys >= ye)return 0;
    xe = min(xe , xwat.alphabet_num());
    ye = min(ye , ywat.alphabet_num());
    uint64_t xsPos = xwat.FreqSum(0 , xs);
    uint64_t xePos = xwat.FreqSum(0 , xe);
    if(ye == ywat.alphabet_num()){ // ye >= ywat.alphabet_num()のときNOT_FOUNDが返ってくるため
      return ywat.FreqRange(ys , ye - 1, xsPos , xePos) +
        ywat.Rank(ye - 1 , xePos) - ywat.Rank(ye - 1 , xsPos);
    }
    return ywat.FreqRange(ys , ye , xsPos , xePos);
  }
}

備考

なお、今回の実装は特に論文とか読んだわけではないので、論文とかでやってる二次元探索の方法とは違う可能性があります。たとえばこの実装だと3次元以上のときの拡張とかが難しかったり、wat_arrayを2つ使ってますがもっと少なくてもいいかもしれません。あとx,yの範囲Rが大きければドキュメントにかかれてあることが確かであれば点数によらずO(log R)で計算量が増えるので適宜座標圧縮的なことが必要かもしれません。

Hadoopを使わずにWikipediaのテキスト処理を400倍高速化

タイトルは釣りです。id:mamorukさんの書いたHadoop で Wikipedia のテキスト処理を900倍高速化 - 武蔵野日記を読んで、そもそも1G程度のデータの単語頻度を数えるのに858分もかかるんだっけと思い、id:nokunoさんの資料を読んでみると単語頻度を求める際に

a
b
a
a

みたいなデータを

a 3
b 1

に変形するのにsortしたファイルをuniq -cで処理するということをやっていた。これはあまり効率のよい方法ではなくて行数をNとしたときにO(N log N)の計算時間となる(文字列比較はO(1)でやれることにする)。
これに対して、単語の頻度をハッシュ表で保存すると理想的な条件の元ではO(N)の計算時間で頻度を求めることが出来、より高速に計算することが可能となることが期待される。
また、単語数をWとしたとき、C++のmapのような二分探索木を使ってもO(N log W)とsortしてuniqするのに比べ、より低い計算量を達成できる。

単語の頻度表を求めるコードをC++のunordered_map(gccのバージョンによっては使えない場合があります、その場合は適宜boost/unordered_mapなどに置き換えてください)を使って書くと以下のようになる

#include <unordered_map>
#include <string>
#include <iostream>
using namespace std;
int main(){
  unordered_map<string , int> m;
  string line;
  while(getline(cin , line))
    m[line]++;
  unordered_map<string , int>::iterator it;
  for(it = m.begin(); it != m.end(); ++it)
    cout << it->first << " " << it->second << endl;
  return 0;
}

これを利用して日本語Wikideiaの生のunigramデータから単語頻度を求めると1分50秒で終了。コア数は1(CPUはCore i7 920, 2.67GHzでメモリが3G)でやってるので大体400倍ぐらい高速になっている計算になる。ちなみにunordered_mapをstd::mapに置き換えた場合4分49秒となった。

計算機を大量に並べてHadoopで分散処理というのも非常に価値のあることとは思いますが、会社によってはサーバを1台買うのにも非常に煩雑な手続きが必要で、しかも注文してから半年ぐらい来なかったり、開発環境のサーバが1台しかなかったりするので、こういった1台のサーバで処理を高速化するテクニックもまだまだ重要なのではないかと思いました。

あと、結局サーバを並べるといってもデータが2倍に増えたときにサーバが4倍必要になるとか言う状況だと、いつかは破綻するので計算量の見積りは大変重要です。

追記

id:n_shuyoさんに900倍を越えてほしかったといわれたので多少の高速化を試みてみた。データが1G程度しかないのでファイルの中身を全部メモリに読み込む+getlineを使わずに自力で改行をパースするという方針で書いた

#include <unordered_map>
#include <map>
#include <string>
#include <iostream>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
using namespace std;
int main(){
  struct stat st;
  char * fileName = "unigram_raw.txt";
  if(stat(fileName , &st)){
    perror("stat");
    return 1;
  }
  int fd = open(fileName  , O_RDONLY);
  char * buffer = new char[st.st_size];
  if(read(fd , buffer , st.st_size) != st.st_size){
    return 1;
  }
  int prev = 0;
  unordered_map<string , int> m;
  for(size_t i = 0 ; i < st.st_size ; ++i){
    if(buffer[i] == '\n'){
      string s(buffer + prev , buffer + i);
      m[s]++;
      prev = i + 1;
    }
  }
  unordered_map<string , int>::iterator it;
  for(it = m.begin(); it != m.end(); ++it)
    cout << it->first << " " << it->second << endl;
  return 0;
}

これだと手元の環境では62秒で終わり、800倍以上(900は行かなかったorz)の高速化になっている。

追追記

bufferを全部持っている以上、string => intのテーブルを持つのではなく、(文字列の開始位置,文字列の終了位置)をキーにしたほうが効率が良さそうと思い実装。

struct myeq : std::binary_function<pair<int , int> , pair<int , int> , bool>{
  char * buf;
  myeq(char * b) : buf(b) {}
  bool operator() (const pair<int , int> & x , const pair<int , int> & y) const{
    char * s = buf + x.first;
    char * t = buf + y.first;
    char * se = buf + x.second; // *se == '\n'
    for( ; *s == *t && s != se ; ++s , ++t)
      ;
    // *s != *t or (*s == '\n' => *t == '\n')
    return *s == *t;
  }
};

struct myhash : std::unary_function<pair<int , int> , size_t>{
  char * buf;
  myhash(char * b) : buf(b) {}
  size_t operator()(const pair<int , int> & p) const{
    size_t h = 0;
    for(int i = p.first ; i < p.second ; ++i){
      h = h * 31 + buf[i];
    }
    return h;
  }
};

int main(){
  struct stat st;
  char * fileName = "jawiki_unigram_raw.txt";
  if(stat(fileName , &st)){
    perror("stat");
    return 1;
  }
  int fd = open(fileName  , O_RDONLY);
  char * buffer = new char[st.st_size];
  if(read(fd , buffer , st.st_size) != st.st_size){
    return 1;
  }
  int prev = 0;
  myhash h(buffer);
  myeq   e(buffer);
  unordered_map<pair<int , int> , int  , myhash , myeq> m(3 , h , e);
  for(size_t i = 0 ; i < st.st_size ; ++i){
    if(buffer[i] == '\n'){
      pair<int , int> p(prev , i);
      unordered_map< pair<int , int> , int  , myhash , myeq>::iterator it = m.find(p); 
      if(it != m.end()){
        it->second++;
      }else{
        m.insert(pair<pair<int,int> , int>(p , 1));
      }
      prev = i + 1;
    }
  }
  unordered_map<pair<int , int> , int>::iterator it;
  for(it = m.begin(); it != m.end(); ++it){
    string s(buffer + it->first.first , buffer + it->first.second);
    cout << s << " " << it->second << endl;
  }
  return 0;
}

これだと手元の環境では49秒で終わり、無事900倍達成できたのであった。めでたし。

JavaでPDFから文章を抽出

プログラム上からPDFの文章を取り出したいと思うことがあったので、方法を調べてみた。
PDFBoxというツールを使うと結構いい感じに抽出できた。
以下に簡単なサンプルプログラムを示す。

import java.io.*;

import org.apache.pdfbox.pdfparser.PDFParser;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.util.PDFTextStripper;

public class ExtractPDF {
  private static String extractText(String filePath)
      throws FileNotFoundException, IOException {
    FileInputStream pdfStream = new FileInputStream(filePath);
    PDFParser parser = new PDFParser(pdfStream);
    parser.parse();
    PDDocument pdf = parser.getPDDocument();
    PDFTextStripper stripper = new PDFTextStripper();
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    stripper.writeText(pdf, new BufferedWriter(new OutputStreamWriter(out)));
    return out.toString();
  }
  public static void main(String[] args) throws Exception {
    String pdfFile = "test.pdf";
    String data = extractText(pdfFile);
    System.out.println(data);
  }
}

手元の適当な論文に対する出力結果

Online EM for Unsupervised Models
Percy Liang Dan Klein
Computer Science Division, EECS Department
University of California at Berkeley
Berkeley, CA 94720
fpliang,kleing@cs.berkeley.edu
Abstract
The (batch) EM algorithm plays an important
role in unsupervised induction, but it some-
(中略)
EM sEM‘ EM sEM‘
POS 56:2  1:36 58:8  0:73; 1:41  6:01  6:09
DOC 41:2  1:97 51:4  0:97; 2:82  7:93  7:88
SEG(en) 80:5  0:0 81:0  0:0; 0:42  4:1  4:1
SEG(ch) 78:2  0:0 77:2  0:0; 0:04  7:26  7:27
ALIGN 79:0  0:14 78:8  0:14; 0:25  5:04  5:11
Table 2: Mean and standard deviation over different ran-
dom seeds. For EM and sEM, the first number after  
(以下略)

参考

Luceneと併用して全文検索を行なうことも可能なよう

残念ながら日本語のPDFには対応してないようですが、パッチを当てれば読める場合もあるようです

マルチスレッドでのechoサーバ

マルチスレッドでのechoサーバのメモ

server.cc

#include <iostream>
#include <sstream>
#include <pthread.h>

#include "ServerSocket.h"
#include "Queue.h"
using namespace std;

string readLine(int sock){
  char c;
  stringstream buffer;
  while(1){
    int ret = read(sock, &c , 1);
    if(ret == -1){
      perror("read");
      break;
    }else if(ret == 0){
      break;
    }
    buffer << c;
    if(c == '\n'){
      break;
    }
  }
  return buffer.str();
}

void * handleRequest(void * arg){
  Queue<int> * Q = static_cast< Queue<int> *> (arg);
  while(1){
    int conn = Q->Pop();
    while(1){
      string line = readLine(conn);
      if(line.size() == 0)break;
      //注:1度のwriteで必ずしもソケットにすべて書けるわけではない
      write(conn , line.c_str() , line.size());
    }
    close(conn);
  }
  return NULL;
}

int main(){
  int numThread = 10;
  ServerSocket sock(8080 , 128);
  int r = sock.Listen();
  cout << r << endl;
  Queue<int> Q(numThread);
  for(int i = 0 ; i < numThread ; i++){
    pthread_t thread;
    pthread_create(&thread , NULL , handleRequest , &Q);
  }

  while(1){
    int conn = sock.Accept();
    int res = Q.Push(conn);
    if(res != 0){
      cerr << "Queue is full" << endl;
      close(conn);
    }
  }
}

Queue.h

#include <pthread.h>

template<typename T> class Queue{
  public:
    Queue(int capacity)
      :capacity_(capacity) , size_(0), head_(0) , tail_(0){
        pthread_mutex_init(&mutex_ , NULL);
        pthread_cond_init(&not_empty, NULL);
        buffer_ = new T[capacity];
      }
    ~Queue(){
      delete[] buffer_;
      pthread_mutex_destroy(&mutex_);
      pthread_cond_destroy(&not_empty);
    }
    int Push(T element);
    T Pop();
  private:
    pthread_mutex_t mutex_;
    pthread_cond_t not_empty;
    int capacity_;
    int size_;
    int head_;
    int tail_;
    T * buffer_;
};


template<typename T>
int Queue<T>::Push(T element){
  pthread_mutex_lock(&mutex_);
  if(size_ >= capacity_){
    pthread_mutex_unlock(&mutex_);
    return -1;
  }
  buffer_[tail_++] = element;
  if(tail_ == capacity_){
    tail_ = 0;
  }
  size_++;
  pthread_mutex_unlock(&mutex_);
  pthread_cond_signal(&not_empty);
  return 0;
}

template<typename T>
T Queue<T>::Pop(){
  T res;
  pthread_mutex_lock(&mutex_);
  while(!size_){
    pthread_cond_wait(&not_empty , &mutex_);
  }
  res = buffer_[head_++];
  if(head_ == capacity_){
    head_ = 0;
  }
  size_--;
  pthread_mutex_unlock(&mutex_);
  return res;
}

ServerSocket.h

class ServerSocket{
  public:
    ServerSocket(int port , int backlog)
      :port_(port), backlog_(backlog), listenfd_(-1){}
    ~ServerSocket(){
      Close();
    }
    int Listen();
    int Accept();
    int Close();
  private:
    int port_;
    int backlog_;
    int listenfd_;
};

ServerSocket.cpp

#include <iostream>
#include <sstream>
#include <string>
#include <cstdio>
#include <cstdlib>
#include <cstring>

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>

#include "ServerSocket.h"

using namespace std;

int ServerSocket::Listen(){
  struct addrinfo hints;
  struct addrinfo *res , *rp;
  memset(&hints , 0 , sizeof(hints));
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;
  {
    stringstream ss;
    ss << port_;
    getaddrinfo(NULL , ss.str().c_str(), &hints , &res);
  }
  int sockfd = -1;
  for(rp = res ; rp != NULL ; rp = rp->ai_next){
    sockfd = socket(rp->ai_family , rp->ai_socktype , rp->ai_protocol);
    if(sockfd == -1)
      continue;
    if(bind(sockfd , rp->ai_addr , rp->ai_addrlen) == 0)
      break;
    close(sockfd);
  }
  if(rp == NULL){
    cerr << "ServerSocket::Listen() Bind Fail" << endl;
    return -1;
  }
  freeaddrinfo(res);
  int lres = listen(sockfd , backlog_);
  if(lres == -1){
    perror("ServerSocket::Listen()");
    return -1;
  }
  listenfd_ = sockfd;
  return 0;
}

int ServerSocket::Accept(){
  int connfd = accept(listenfd_, NULL , NULL);
  if(connfd < 0){
    perror("ServerSocket::Accept()");
    return -1;
  }
  return connfd;
}

int ServerSocket::Close(){
  if(listenfd_ >= 0){
    int res = close(listenfd_);
    if(res == -1){
      perror("ServerSocket::Close()");
      return -1;
    }
  }
  return 0;
}

簡易のビットエンコーダ

圧縮のプログラムを書くときにはbit単位でエンコーディングをする必要があるため、bit単位でエンコードをするBitEncoderというものを書いてみた。
動作としては1byte変数にbitをバッファリングしていっぱいになったらファイルに書き出すという感じです。

#include <cstdio>
#include <iostream>
struct BitEncoder{
  FILE *fp;
  unsigned char bval , bindex;
  bool buffered;
  BitEncoder(const char * fileName){
    fp = fopen(fileName , "wb");
    buffered = false;
    bindex = 7;
    bval = 0;
  }
  ~BitEncoder(){
    if(buffered){
      flush();
    }
    fclose(fp);
  }
  void encode(bool b){
    if(bindex){
      if(b){
        bval |= 1 << bindex;
      }
      --bindex;
      buffered = true;
    }else{
      if(b){
        bval |= 1;
      }
      flush();
    }
  }
  void flush(){
    fwrite(&bval , 1 , 1 , fp);
    buffered = false;
    bval = 0;
    bindex = 7;
  }
};


void encode(BitEncoder & enc , unsigned char val){
  for(int i = 7 ; i >= 0 ; i--){
    if(val & (1 <<i)){
      enc.encode(true);
    }else{
      enc.encode(false);
    }
  }
}
int main(){
  {
    BitEncoder enc("out");
    enc.encode(true);
    enc.encode(false);
    enc.encode(true);
    enc.encode(true);
  }
  /*
   * od -tx1 out
   * 0000000 b0
   * 0000001 
   *
   * /
}

Netflixのレーティングデータを扱う(1)

Grand Prizeが達成されたNetflix Prizeですが、データの公開が停止されたりすると困るので登録してデータを確保した。

Netflixのデータフォーマットは展開先のフォルダの下にtraining_setというフォルダができ、その中にmv_0000001.txt ...という形式で17770個の映画のレーティングデータが入っている。

フォーマットは

(映画のID):
(ユーザのID),(レーティング),(レーティングをつけた日(YYYY-MM-DDの形式))
...
(ユーザのID),(レーティング),(レーティングをつけた日(YYYY-MM-DDの形式))

となっている。

ここでレーティングの数は約1億個でたとえば一つのレーティングを

public class Rating {
  int user;
  int item;
  int rate;
  Rating(int u , int i , int r){
    user = u;
    item = i;
    rate = r;
  }
}

のようなクラスに格納すると必要なメモリ量は12バイト*1億=1.2GBであり、最近のPCでは全データをメモリに乗せることはできなくはないがレーティングの日付などを入れようとしたり、データが増えていったときにメモリが足りなくなる。

ただ、実際の協調フィルタリングの学習において、SVDのような手法を用いる場合、

for(Rating r : 全レートデータ){
  再急方向のベクトルを更新
}
再急方向に移動

のようなロジックが用いられることが多く、全レートデータをシーケンシャルに読むことができればよい。Javaでそういうことをしたい場合はOracle Technology Network for Java Developers | Oracle Technology Network | Oracleを継承したクラスを作成する、以下に参考のコードを示す

import java.io.*;
import java.util.*;

public class NetflixDataIterator implements Iterator<Rating>{
  static final String trainDirName = 
    "E:/netflix/download/training_set";
  File trainFiles[];
  private int currentIndex;
  private int currentMovieID;
  private Scanner currentInputStream;
  public NetflixDataIterator() {
    File dir = new File(trainDirName);
    trainFiles = dir.listFiles();
    currentIndex = 0;
    try{
      currentInputStream = new Scanner(trainFiles[currentIndex]);      
      setMovieID();
    }catch (Exception e) {
      throw new RuntimeException(e.getMessage());
    }
  }  

  @Override
  public boolean hasNext() {
    if(currentIndex == trainFiles.length){
      return false;
    }
    if(currentInputStream.hasNext()){
      return true;
    }else{
      return currentIndex < trainFiles.length - 1;
    }
  }
  
  private void setMovieID(){
    String firstLine = currentInputStream.nextLine();
    currentMovieID = Integer.parseInt(firstLine.substring(0, firstLine.length() - 1));    
  }

  @Override
  public Rating next(){
    if(!hasNext()){
      throw new NoSuchElementException();
    }
    if(!currentInputStream.hasNext()){
      try {
        currentInputStream = new Scanner(trainFiles[++currentIndex]);
        setMovieID();
      } catch (FileNotFoundException e) {
        throw new RuntimeException(e.getMessage());
      }      
    }
    String rating = currentInputStream.nextLine();
    String sep[] = rating.split(",");
    int user = Integer.parseInt(sep[0]);
    int rate = Integer.parseInt(sep[1]);
    Rating res = new Rating(user , currentMovieID , rate);
    return res;
  }
  
  @Override
  public void remove() {
    throw new UnsupportedOperationException();
  }
}

クライアントから実行する際には

  public static void main(String[] args) {
    Iterator<Rating> it = new NetflixDataIterator();
    long time = System.currentTimeMillis();
    int cnt = 0;
    int totalRate = 0;
    while(it.hasNext()){
      Rating r = it.next();
      totalRate += r.rate;
      ++cnt;
    }     
    long t = System.currentTimeMillis() - time;
    System.out.println(cnt);
    System.out.println("ave rate:"+(totalRate * 1.0 / cnt)+" "+t+"ms");
  }

のようなコードを書く。

実行結果

100480507
ave rate:3.604289964420661 638641ms

なお余談ではあるがこのJavaのコードは現在の状態をクラスのメンバ変数を用いて管理する必要がある。別の言語、例えばrubyではブロック付きメソッドといって

def readData
   while データをすべて読み終わる
      yield(次のレーティングデータ)
   end
end

totalRate = 0.0
readData(){|rating|
  totalRate += rating.rate
}

のように書くことができる。

追記:(07/26)

あと全部読むのに636秒かかっていますがこの部分はファイルを一つにまとめて、ScannerからBufferedReaderに変えて、レーティング情報をbit列でファイルに保存しておけば30秒ぐらいには縮まります。