読者です 読者をやめる 読者になる 読者になる

Ruby の Timeout の仕組み

Ruby

Ruby で長い時間掛かるかも知れない処理のタイムアウトを行うにはこんな感じにします。

require 'timeout'

begin
  Timeout.timeout(3) do # 3秒でタイムアウト
    hoge                # 何かの処理
  end
rescue Timeout::Error
  puts 'なげーよ'       # タイムアウト発生時の処理
end

Timeout.timeout はブロック開始時にスレッドを作成し、そのスレッドで指定された秒数だけ sleep して、sleep から復帰してもまだブロックが終わってなければ作成元のスレッドに対して Timeout::Error 例外を発生させます。

指定時間以内に処理が終わる場合:

timeout(X)
    │
スレッド作成 ─┐
    │         │
ブロック実行  sleep X
    │         │
スレッドkill→ 🕱
    │
timeout復帰

指定時間以内に処理が終わらない場合:

timeout(X)
    │
スレッド作成 ─┐
    │         │
ブロック実行  sleep X
    :          │
    :  ← 元スレッドに Timeout::Error
    :

仕組みはシンプルです。ですが、実際に timeout.rb を読んでみると中では結構複雑なことをしてました。

たとえば、次のスクリプトを実行すると、'hoge' ではなく 'main' が出力されます。

require 'timeout'

def hoge
  sleep 5
rescue Timeout::Error
  p 'hoge'              # こっちは表示されない
end

begin
  Timeout.timeout(1) do
    hoge
  end
rescue Timeout::Error
  p 'main'              # こっちが表示される
end

hoge 内の sleep 中で Timeout::Error が発生したなら、普通は hoge 内の rescue で処理されるはずなのに、実際には main 側の rescue で処理されています。

確かに使う側からしたらこっちの方が便利なんですけど、Timeout は内部的にどのようにしてこのような処理を実現しているのでしょうか。

例外オブジェクトを作成して、それを元スレッドに対して raise してるのは合ってるんですけど、元スレッドで例外が発生しようとしたら無理やり throw に切り替えてました。

timeout.rb のやってることを簡単に書くとこんな感じです(実際の処理とは異なります):

class Timeout::Error < RuntimeError
  def exception(*)    # 例外発生時には Exception#exception が呼ばれるので
    throw self        # それをフックして代わりに throw する
  end
end

def timeout(sec)
  begin
    x = Thread.current  # 元のスレッド(timeout を呼び出したのと同じ)
    err = Timeout::Error.new
    y = Thread.new do
      sleep sec
      x.raise err       # 元のスレッドに対して例外発生(実際には throw される)
    end
    catch(err) do
      return yield      # ブロックが復帰したら return
    end
    raise err           # タイムアウト時はここで改めて例外発生
  ensure
    y.kill              # タイムアウト用スレッド終了
  end
end

raise - rescue と違って、throw - catch は まったく同じオブジェクト(object_id が同じもの)でないと catch しないので、別の timeout メソッドが作った例外オブジェクトはスルーするんですね。若干トリッキー感が否めませんが。

今回の学び

  • Thread#raise で別のスレッドに対して例外を発生させることができる。
  • Exception#exception を上書きすれば例外発生時に任意の処理を実行できる。

Sequelの罠

Ruby

Sequel の罠っぽい挙動にハマったのでメモ。

次のようなテーブル a, b, c, d がありまして、

mysql> select * from a;
+------+
| id   |
+------+
|    1 |
+------+
mysql> select * from b;
+------+
| id   |
+------+
|    2 |
+------+
mysql> select * from c;
+------+
| id   |
+------+
|    3 |
+------+
mysql> select * from d;
+------+
| id   |
+------+
|    4 |
+------+

このテーブルを結合して、こういう結果を取り出そうと思って、

mysql> select a.id,b.id,c.id,d.id from a join b join c join d;
+------+------+------+------+
| id   | id   | id   | id   |
+------+------+------+------+
|    1 |    2 |    3 |    4 |
+------+------+------+------+

Sequel で書いてみたんですよ。ところがこんな結果に。

DB[:a].join(:b).join(:c).join(:d).get([:a__id, :b__id, :c__id, :d__id])
#=> [4, 4, 4, 4]

発行されているクエリはまともでした。

SELECT `a`.`id`, `b`.`id`, `c`.`id`, `d`.`id` FROM `a` INNER JOIN `b` INNER JOIN `c` INNER JOIN `d` LIMIT 1

どうやら、配列としてレコードを取り出す場合でも、内部的にはカラム名をキーとした Hash が使われてるのが原因のようでした。

なので、カラムにちゃんと別名をつけてやればうまくいきます。

DB[:a].join(:b).join(:c).join(:d).get([:a__id___a, :b__id___b, :c__id___c, :d__id___d])
#=> [1, 2, 3, 4]

Hash として取り出すときには気をつけてたんですけど、配列として取り出す時にもカラム名が影響するとは…。

で、過去に書いたコードを見てみたらちゃんと別名つけてたりしたんで知ってたはずなんだけど、しばらくかかないと忘れちゃうなぁ。

要素数ができるだけ均等になるように配列を分割する

Ruby

例えば10個の要素を持つ配列があって、これを3つに分割したい時に、

a = [1,2,3,4,5,6,7,8,9,10]
n = 3
m = Rational(a.size, n).ceil
a.each_slice(m).to_a            #=> [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10]]

みたいにすると、3つには分割できるんですが、要素数が 4, 4, 2 と偏ってしまいます。これを例えば 4, 3, 3 のようにしたい。

a = [1,2,3,4,5,6,7,8,9,10]
n = 3
n.times.map{|i| a[a.size*i/n ... a.size*(i+1)/n]}
  #=> [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]

…のようにやればできました。

なお、a.size よりも n の方が大きいと、結果に空配列を含みます。

a = [1,2,3]
n = 6
n.times.map{|i| a[a.size*i/n ... a.size*(i+1)/n]}
  #=> [[], [1], [], [2], [], [3]]

これが嫌な場合は最後に空配列を除外する感じで。

a = [1,2,3]
n = 6
n.times.map{|i| a[a.size*i/n ... a.size*(i+1)/n]}.reject(&:empty?)
  #=> [[1], [2], [3]]

追記

これは私が考えたわけではなくて、数年前に若い人に教えてもらったものです。

はじめ見た時、なんでこれでいいのかわからなかったんですが、単純にn分割した時の境界の小数点以下を切り捨てて整数にしてるだけです。

f:id:tmtms:20160728215136p:plain

図は10個を6分割した時の例です。

頭いいなー。

Rubyで指定バイト数を超えない文字列の取得

Ruby

文字エンコーディングにUTF-8を使用した場合、1文字は1バイト〜4バイトです。

ある文字列の先頭からn文字の文字列を取り出すには次のようにできます。

str = "本日は晴天なり"
n = 3
str[0, n]    #=> "本日は"

先頭からnバイトを超えない最大の文字列を取り出す方法を考えてみました。

愚直に数える

str = "本日は晴天なり"
n = 10
out = ""
str.each_char do |c|
  break if out.bytesize + c.bytesize > n
  out.concat c
end
out   #=> "本日は"

inject を使うと少しかっこ良くなるかもしれない

str = "本日は晴天なり"
n = 10
str.each_char.inject(''){|a,b| break a if (a+b).bytesize > n; a+b}  #=> "本日は"

バイナリデータとしてnバイト抜き出した後に不正なバイト列を消す

str = "本日は晴天なり"
n = 10
str.b[0, n].force_encoding("utf-8").scrub("")   #=> "本日は"

ちなみに最後の scrub がないと "本日は\xE6" になってしまいます。 最後だけでなく途中に文字として不正なバイトがあった場合はそれも消えます。

[追記]

Twitter で byteslice というメソッドがあるのを教えてもらいました。考え方は同じですが、こっちの方がすっきり書けますね。

StringIO#gets を使う

str = "本日は晴天なり"
n = 10

require 'stringio'
StringIO.new(str).gets(nil, 10)  #=> "本日は晴"
s.chop! if s.bytesize > n
s                                #=> "本日は"

gets は指定バイト数が文字境界にない場合は文字境界まで読むので1文字多くなることがあるので、最後に調整してます。


なんかどれもいまいちな気がします。もっといい方法ありますかねー。

MySQL X Protocol を解析してみる

MySQL

前回 MySQL X Protocol で使用している Protbuf について書きましたが、それだけでは MySQL のプロトコルは解析できません。

TCP を流れるデータは区切りがないので、書き込み側が Protbuf データをただ垂れ流しても、読み込む側がどう読んで良いのかわかりません。

書き込むデータの大きさと、書き込む Protbuf データの型を相手に伝える必要があります。

MySQL X Protocol のパケットは次のようになっているようです。

┌────┬─────────────
│size(4) │type(1) + Protbuf(size-1)
└────┴─────────────

最初の4バイト(リトリエンディアン)で続くデータ部のサイズを示します。 データ部の先頭1バイトは Protbuf データの型を示します。

Protobuf データの型は、クライアントから送るデータは ClientMessages::Type で、サーバーから送るデータは ServerMessages::Type に enum で定義されています。

TCP 上を流れるデータの形式がわかったので、あとは、どの型のデータがどのタイミングでサーバー/クライアントのどちらから送られるかがわかればいいです。

ドキュメント https://dev.mysql.com/doc/internals/en/x-protocol.html もありますが、実際に mysqlsh の通信を見てみるのが手っ取り早いかもしれません。

次のような MySQL X Protocol を中継するプログラムを作って動かしてみました。

require 'mysqlx.pb'
require 'socket'

ClientMessage = {}
Mysqlx::ClientMessages::Type.constants.each do |c|
  v = Mysqlx::ClientMessages::Type.const_get(c)
  if v.is_a? Protobuf::Enum
    ClientMessage[v.to_i] = c
  end
end

ServerMessage = {}
Mysqlx::ServerMessages::Type.constants.each do |c|
  v = Mysqlx::ServerMessages::Type.const_get(c)
  if v.is_a? Protobuf::Enum
    ServerMessage[v.to_i] = c
  end
end

localport, host, port = ARGV

def relay(r, w, from)
  while true
    head = r.read(5)
    break unless head && head.length == 5
    size, type = head.unpack('VC')
    if from == :client
      puts "C: #{ClientMessage[type] || type}"
    else
      puts "S: #{ServerMessage[type] || type}"
    end
    data = r.read(size-1)
    break unless data && data.length == size-1
    w.write(head + data)
  end
rescue => e
  p e
end

Socket.tcp_server_loop(localport) do |client, _addrinfo|
  server = TCPSocket.new(host, port)
  Thread.new(client) do |_client|
    relay(_client, server, :client)
  end
  Thread.new(client) do |_client|
    relay(server, _client, :server)
  end
end

33061 ポートで待ち受けて 127.0.0.1 の 33060 に中継するように動かします。

% ruby -I. ./mysqlx-relay.rb 33061 127.0.0.1 33060

別の端末から mysqlsh を次のように起動します。

% mysqlsh --uri mysql://hoge@127.0.0.1:33061/test --sql
Creating a Node Session to hoge@127.0.0.1:33061/test
Enter password: 
Default schema `test` accessible through db.

mysql-sql> プロンプトが出るまでのパケット。結構多い…。

C: CON_CAPABILITIES_GET
S: CONN_CAPABILITIES
C: SESS_AUTHENTICATE_START
S: SESS_AUTHENTICATE_CONTINUE
C: SESS_AUTHENTICATE_CONTINUE
S: NOTICE
S: SESS_AUTHENTICATE_OK
C: SQL_STMT_EXECUTE
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_ROW
S: RESULTSET_ROW
S: RESULTSET_FETCH_DONE
S: NOTICE
S: SQL_STMT_EXECUTE_OK
C: SQL_STMT_EXECUTE
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_ROW
S: RESULTSET_FETCH_DONE
S: NOTICE
S: SQL_STMT_EXECUTE_OK
C: SQL_STMT_EXECUTE
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_ROW
S: RESULTSET_FETCH_DONE
S: NOTICE
S: SQL_STMT_EXECUTE_OK
C: SQL_STMT_EXECUTE
S: NOTICE
S: SQL_STMT_EXECUTE_OK
C: SQL_STMT_EXECUTE
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_ROW
S: RESULTSET_FETCH_DONE
S: NOTICE
S: SQL_STMT_EXECUTE_OK
C: SQL_STMT_EXECUTE
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_ROW
S: RESULTSET_FETCH_DONE
S: SQL_STMT_EXECUTE_OK
C: SQL_STMT_EXECUTE
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_ROW
S: RESULTSET_FETCH_DONE
S: NOTICE
S: SQL_STMT_EXECUTE_OK
C: SQL_STMT_EXECUTE
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_ROW
S: RESULTSET_FETCH_DONE
S: SQL_STMT_EXECUTE_OK

SELECT したり、

mysql-sql> SELECT * FROM t;
C: SQL_STMT_EXECUTE
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_COLUMN_META_DATA
S: RESULTSET_ROW
S: RESULTSET_ROW
S: RESULTSET_ROW
S: RESULTSET_FETCH_DONE
S: NOTICE
S: SQL_STMT_EXECUTE_OK

INSERT したり、

mysql-sql> INSERT INTO t (id, value) VALUES (1, 'abc'),(2,'def');
C: SQL_STMT_EXECUTE
S: NOTICE
S: NOTICE
S: SQL_STMT_EXECUTE_OK

いい感じに動いてるようなので、あとは色々試してみます。

RubyからProtobufを使う

Ruby MySQL

MySQL 5.7.12 から追加された X Protocol は Protobuf というのを使ってるらしいです。 Protobuf というのをそこで初めて知ったので、とりあえず Ruby から Protobuf を利用する方法を調べてみました。

Protobuf はデータ構造をバイト列にエンコードしたり、その逆にバイト列をデータ構造にデコードしたりするライブラリのようです。

Ubuntu で protobuf を使うには、protobuf-compiler パッケージをインストールします。

% sudo apt-get install protobuf-compiler

Ruby から Protobuf を使うには、protobuf gem をインストールします。

% gem install protobuf

データ構造は .proto という拡張子のファイルで定義するようです。

MySQL 5.7.12 では rapid/plugin/x/protocol ディレクトリに置かれていました。

% cd mysql-5.7.12/rapid/plugin/x/protocol
% ls
mysqlx.proto             mysqlx_expect.proto     mysqlx_session.proto
mysqlx_connection.proto  mysqlx_expr.proto       mysqlx_sql.proto
mysqlx_crud.proto        mysqlx_notice.proto
mysqlx_datatypes.proto   mysqlx_resultset.proto

Ruby 用にコンパイル(?)します。

% mkdir /tmp/x
% protoc -I . --ruby_out /tmp/x mysqlx.proto
[libprotobuf WARNING google/protobuf/descriptor.cc:5411] Warning: Unused import: "mysqlx.proto" imports "mysqlx_resultset.proto" which is not used.
[libprotobuf WARNING google/protobuf/descriptor.cc:5411] Warning: Unused import: "mysqlx.proto" imports "mysqlx_session.proto" which is not used.
[libprotobuf WARNING google/protobuf/descriptor.cc:5411] Warning: Unused import: "mysqlx.proto" imports "mysqlx_sql.proto" which is not used.
[libprotobuf WARNING google/protobuf/descriptor.cc:5411] Warning: Unused import: "mysqlx.proto" imports "mysqlx_connection.proto" which is not used.
[libprotobuf WARNING google/protobuf/descriptor.cc:5411] Warning: Unused import: "mysqlx.proto" imports "mysqlx_expect.proto" which is not used.
[libprotobuf WARNING google/protobuf/descriptor.cc:5411] Warning: Unused import: "mysqlx.proto" imports "mysqlx_crud.proto" which is not used.
[libprotobuf WARNING google/protobuf/descriptor.cc:5411] Warning: Unused import: "mysqlx.proto" imports "mysqlx_notice.proto" which is not used.
Suppress tag warning output with PB_NO_TAG_WARNINGS=1.
[WARN] .Mysqlx.Datatypes.Scalar object should have 9 tags (1..9), but found 8 tags.
[WARN] .ColumnMetaData.FieldType object should have 18 tags (1..18), but found 11 tags.
[WARN] .Mysqlx.Crud.Find object should have 10 tags (2..11), but found 9 tags.
[WARN] .SessionStateChanged.Parameter object should have 11 tags (1..11), but found 10 tags.
[WARN] .ClientMessages.Type object should have 25 tags (1..25), but found 14 tags.
[WARN] .ServerMessages.Type object should have 19 tags (0..18), but found 13 tags.

いくつか Warning が出てますが、よくわからないので無視します。

/tmp/x に .proto に対応する .pb.rb ファイルが出来ました。

% cd /tmp/x
% ls
mysqlx.pb.rb             mysqlx_expect.pb.rb     mysqlx_session.pb.rb
mysqlx_connection.pb.rb  mysqlx_expr.pb.rb       mysqlx_sql.pb.rb
mysqlx_crud.pb.rb        mysqlx_notice.pb.rb
mysqlx_datatypes.pb.rb   mysqlx_resultset.pb.rb

mysql.x.pb.rb 中にある Mysqlx::Error で試してみます。

module Mysqlx
...
  class Error < ::Protobuf::Message
    optional ::Mysqlx::Error::Severity, :severity, 1, :default => ::Mysqlx::Error::Severity::ERROR
    required :uint32, :code, 2
    required :string, :sql_state, 4
    required :string, :msg, 3
  end
end
require "mysqlx.pb"

e1 = Mysqlx::Error.new(code: 123, sql_state: "XXXXX", msg: "hoge")
s = e.encode  # => "\x10{\x1A\x04hoge\"\x05XXXXX"

e2 = Mysqlx::Error.decode(s)
e2.severity   # => #<Protobuf::Enum(Mysqlx::Error::Severity)::ERROR=0>
e2.code       # => 123
e2.sql_state  # => "XXXXX"
e2.msg        # => "hoge"

Mysqlx::Error オブジェクトがバイト列にエンコード(シリアライズ)されて、バイト列からオブジェクトにデコード(デシリアライズ)されたことがわかります。

Mysqlx::Error は severity が optional で、その他の code, sql_state, msg が required とされています。

required メンバーを指定せずにシリアライズするとエラーになります。

e = Mysqlx::Error.new(code: 123)
e.encode  # => Required field Mysqlx::Error#msg does not have a value. (Protobuf::SerializationError)

また、定義と異なる型を設定しようとしてもエラーになります。

e = Mysqlx::Error.new
e.code = 123456789    # => Ok
e.code = 12345678901  # => Unacceptable value 12345678901 for field code of type Protobuf::Field::Uint32Field (TypeError)

Mysqlx::Error クラスは Protobuf::Message の継承クラスですが、Protobuf::Message は上記のように型チェックのある構造体のように使えます。

プロトコルのためのデータ構造なので、厳密に型をチェックしているのですね。

とりあえずここまで。

Ruby の文字列データの複製について

Ruby

Ruby で String オブジェクトを複製しても、文字列データは複製されません。

data = "a"*10*1024*1024
system "grep ^VmSize /proc/#$$/status"
t1 = Time.now
a = []
100.times do |i|
  a.push data.dup
end
t2 = Time.now
system "grep ^VmSize /proc/#$$/status"
printf "%.6f\n", t2-t1

実際に10MBの文字列を作って、100回dupする前後でプロセスのメモリサイズを比較してみても変わってません。

% ruby hoge.rb
VmSize:   56140 kB
VmSize:   56140 kB
0.000164

複製後に文字列を変更すると、そこで文字列データも複製されます。

data = "a"*10*1024*1024
system "grep ^VmSize /proc/#$$/status"
t1 = Time.now
a = []
100.times do |i|
  s = data.dup
  s[0] = 'a'
  a.push s
end
t2 = Time.now
system "grep ^VmSize /proc/#$$/status"
printf "%.6f\n", t2-t1

プロセスサイズが増えてるのが確認できます。10MBオブジェクトが100個なので1GBほど増えてます。

VmSize:   56140 kB
VmSize: 1080540 kB
0.337337

まあ、中身を変更したら複製されるのは当然なのですが、実は部分文字列を取り出すだけでも複製されてしまいます。

10MBの文字列のうち、先頭1MBを100回取り出します。

data = "a"*10*1024*1024
system "grep ^VmSize /proc/#$$/status"
t1 = Time.now
a = []
100.times do |i|
  a.push data[0, 1024*1024]
end
t2 = Time.now
system "grep ^VmSize /proc/#$$/status"
printf "%.6f\n", t2-t1

100MBほどサイズが増えてしまいました。

VmSize:   56104 kB
VmSize:  158904 kB
0.044682

なんでこんなことが起きるかというと、Ruby の String オブジェクトが内部で保持してる文字列データは NUL(\0) 終端されているからです。部分文字列の次のバイトを NUL にすると元の文字列が変わってしまうので、複製する必要があるのでした。

ちなみに、文字列末尾の取り出しでは複製されません。文字列末尾は NUL が次にあるからです。

data = "a"*10*1024*1024
system "grep ^VmSize /proc/#$$/status"
t1 = Time.now
a = []
100.times do |i|
  a.push data[-1024*1024, 1024*1024]
end
t2 = Time.now
system "grep ^VmSize /proc/#$$/status"
printf "%.6f\n", t2-t1
VmSize:   56136 kB
VmSize:   56136 kB
0.000061

イマイチだなーとツイートしたら、教えてもらえました。

SHARABLE_MIDDLE_SUBSTRING は Ruby 2.2 で導入されたようです。

ということで、SHARABLE_MIDDLE_SUBSTRING=1 を設定してコンパイルしてみた Ruby で試してみます。

% cflags=-DSHARABLE_MIDDLE_SUBSTRING=0 ./configure
% make install
VmSize:   56232 kB
VmSize:   56232 kB
0.000072

おおー、メモリサイズは増えないし時間も掛かってないです。すばらしい。

もうこれデフォルトでいいのでは? と思ったらまた教えてもらいました。

Rubyの拡張ライブラリ中では RSTRING_PTR() とか StringValuePtr() で String オブジェクトから文字列データの先頭ポインタを取り出すことができるのですが、それが NUL 終端されていると仮定している拡張ライブラリがあるかもしれなくて、それが動かなくなってしまうからってことですね。確かにありそうです。

ということで、行儀のいい拡張ライブラリだけ使ってることが確実なのであれば、SHARABLE_MIDDLE_SUBSTRING=1 を使うと、もしかするとメモリサイズが小さくなって速くなる…ことがあるかもしれません。

追記

Ruby 2.3.1 で SHARABLE_MIDDLE_SUBSTRING=1 でコンパイルした Ruby で gem install が動きませんでした。 調べてみたら、ホスト名からIPアドレスを求める部分に問題があるようで、

TCPSocket.new("rubygems.global.ssl.fastly.net", 80)

は動くんだけど、

TCPSocket.new("rubygems.global.ssl.fastly.netX".chop, 80)

は動きませんでした。(getaddrinfo: Name or service not known)

該当部分のソースはこんな感じです。

[raddrinfo.c]

        name = RSTRING_PTR(host);
        if (!name || *name == 0 || (name[0] == '<' && strcmp(name, "<any>") == 0)) {
            make_inetaddr(INADDR_ANY, hbuf, hbuflen);
            if (flags_ptr) *flags_ptr |= AI_NUMERICHOST;
        }
        else if (name[0] == '<' && strcmp(name, "<broadcast>") == 0) {
            make_inetaddr(INADDR_BROADCAST, hbuf, hbuflen);
            if (flags_ptr) *flags_ptr |= AI_NUMERICHOST;
        }
        else if (strlen(name) >= hbuflen) {
            rb_raise(rb_eArgError, "hostname too long (%"PRIuSIZE")",
                strlen(name));
        }
        else {
            strcpy(hbuf, name);
        }

RSTRING_PTR() で得られたポインタに対して strcmp(), strlen(), strcpy() とか NUL終端文字列を期待している関数を使っちゃってます。

まさか Ruby 本体に罠があるとは思いませんでした。今のところ人柱覚悟で使った方が良いかもしれません。