require 'elasticsearch' require 'date' require 'pp' module Ag::Storage module_function def index_name(list) 'ml-' + list end def index_status(indexname) $es.cluster.health(level: :indices)['indices'][indexname]['status'] || 'FAIL' end def index_ready?(indexname) # watch out if you rewrite this # if you call index_status twice, you might get different results! [:green, :yellow].include? index_status(indexname).downcase.to_sym end # throws Elasticsearch::Transport::Transport::Errors::NotFound # if the list does not exist def delete_index(list) $es.indices.delete(index: index_name(list)) end # Create an index for list # sleep in 5ms intervals until it is ready def create_index(list, sleeptime: 0.005) indexname = index_name(list) $es.indices.create( index: indexname, body: { mappings: { message: { properties: { attachments: { properties: { filename: { type: 'string', index: 'not_analyzed' }, mime: { type: 'string', index: 'not_analyzed' } } }, received: { properties: { hop: { type: 'string', index: 'not_analyzed' }, date: { type: 'date', format: 'dateOptionalTime' } } }, cc: { type: 'string' }, content: { type: 'string' }, date: { type: 'date', format: 'dateOptionalTime' }, from: { type: 'string' }, from_realname: { type: 'string' }, month: { type: 'integer' }, parent: { type: 'string', index: 'not_analyzed' }, raw_message_id: { type: 'string', index: 'not_analyzed' }, raw_filename: { type: 'string', index: 'not_analyzed' }, raw_parent: { type: 'string' }, subject: { type: 'string' }, to: { type: 'string' }, hidden: { type: 'boolean' }, comment: { type: 'string', index: 'not_analyzed' } } } } }) # Give elasticsearch some time to process the new index sleep sleeptime until index_ready?(indexname) end def get_content(message, filename) content = 'Cannot parse MIME/contents.' begin raw_content = Ag::Rendering::HTMLizer.HTMLize(message) content = Ag::Utils.fix_encoding(raw_content || '').strip if content == '' $stderr.puts "#{message.message_id}: Content empty?" if $options.debug end rescue => e $stderr.puts "Cannot render message #{message.message_id} (file: #{filename}): #{e}" if $options.debug end content end def resolve_by_field(list, field, value) return nil if value.nil? result = $es.search( index: index_name(list), body: { query: { filtered: { filter: { term: { field => value } } } }, fields: ['_id'] } ) return nil if result['hits']['total'] == 0 result['hits']['hits'].first['_id'] end def resolve_message_id(list, message_id = nil) resolve_by_field(list, :raw_message_id, message_id) end def resolve_filename(list, filename) resolve_by_field(list, :raw_filename, filename) end def resolve_hash(list, hash = nil) resolve_by_field(list, :_id, hash) end def store(list, message, filename) content = get_content(message, filename) identifier = nil begin identifier = message['X-Archives-Hash'].value rescue NoMethodError raise 'No archives hash found in the message headers' end raw_parent = Ag::Threading.get_parent_message_id(message) from = Ag::Utils.resolve_address_header(message, :from).first from_realname = Ag::Utils.get_sender_displayname(message) to = Ag::Utils.resolve_address_header(message, :to) cc = Ag::Utils.resolve_address_header(message, :cc) subject = Ag::Utils.fix_encoding(message.subject) date = [message.received].flatten.first.field.date_time received = [] [message.received].flatten.each do |hop| begin received << { hop: hop.field.info, date: hop.field.date_time } rescue => e next end end attachments = [] if message.has_attachments? message.attachments.each do |attachment| attachments << { filename: attachment.filename, mime: attachment.mime_type } end end $es.index( index: index_name(list), type: 'message', id: identifier, body: { raw_message_id: message.message_id, subject: subject, to: to, cc: cc, from: from, from_realname: from_realname, date: date, month: ('%i%02i' % [date.year, date.month]).to_i, # this is a sortable number! content: content, attachments: attachments, received: received, raw_parent: raw_parent, raw_filename: filename } ) end def fix_threading(list, pass) result = $es.search( index: index_name(list), size: 5000, body: { size: 5000, query: { filtered: { filter: { and: [ { missing: { field: 'parent' } }, { exists: { field: 'raw_parent' } } ] } } } } ) opts = { in_processes: Ag::Utils.proc_count } opts[:progress] = "Calculating Threading (Pass #{pass})" if $options.progress Parallel.each(result['hits']['hits'], opts) do |hit| msg = resolve_message_id(list, hit['_source']['raw_parent']) update(list, hit['_id'], doc: { parent: msg }) unless msg.nil? end result['hits']['total'] end def delete(list, id) $es.delete(index: index_name(list), type: 'message', id: id) end def _hide_unhide(list, id, hide_status, comment) doc = { hidden: hide_status } doc[:comment] = comment if comment # Only modify it if it exists update(list, id, doc: doc, detect_noop: true) end def hide(list, id, comment) hide_unhide(list, id, true, comment) end def unhide(list, id, comment) _hide_unhide(list, id, false, comment) end def update(list, id, doc_changes) raise "Invalid update for #{list}/#{id} #{doc_changes}" unless doc_changes.is_a?(Hash) $es.update( index: index_name(list), type: 'message', id: id, body: doc_changes ) end def get(list, id) result = $es.search( index: index_name(list), size: 1, body: { query: { filtered: { filter: { term: { _id: id } } } } } ) return nil if result['hits']['total'] == 0 result['hits']['hits'].first end end # vim: ts=2 sts=2 et ft=ruby: